<a href="https://colab.research.google.com/github/Nabeel06022002/ML01/blob/main/ML01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import glob
import os as os
import os.path as osp
import random
import numpy as np
import json
from tqdm import tqdm
import matplotlib as mpl
mpl.use('Agg')# AGG(Anti-Grain Geometry engine)
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision
from torchvision import models,transforms
import torch.nn.init as init
from torch.autograd import Function
import torch.nn.functional as F

import xml.etree.ElementTree as ET
from itertools import product
from math import sqrt
import time
import librosa
import soundfile as sf
import hashlib

In [None]:
#encoding:utf-8


class PhaseShuffle(nn.Module):
	#Definition of layers to perform phaseshuffle
	def __init__(self,n):
		super().__init__()
		self.n = n#The range of how to shift is defined as [-n, n] in the paper.

	def forward(self, x):
		#If N is 0, it is equivalent to phaseshuffle in the first place
		if self.n == 0:
			return x
		#The integer belonging to [-n, n] is randomly generated and shift
		shift = torch.Tensor(x.shape[0]).random_(-self.n,self.n+1).type(torch.int)
		#Store the result of applying phaseshuffle to X in x_shuffled and return it as a return value.
		x_shuffled = x.clone()
		for i,shift_num in enumerate(shift):
			if(shift_num==0): continue
			dim = len(x_shuffled[i].size()) - 1
			origin_length = x[i].shape[dim]
			if shift_num > 0:
				left = torch.flip(torch.narrow(x[i],dim,1,shift_num),[dim])
				right = torch.narrow(x[i],dim,0,origin_length-shift_num)
			else:
				shift_num = -shift_num
				left = torch.narrow(x[i],dim,shift_num,origin_length-shift_num)
				right = torch.flip(torch.narrow(x[i],dim,origin_length-shift_num-1,shift_num),[dim])
			x_shuffled[i] = torch.cat([left,right],dim)

		return x_shuffled

#Functions that require the "Gradient_penalty" function required for calculating the gradient constraints of Discripor
#In WGAN-GP, the loss function of DISRIMINATOR is represented as E [Judgment Results of Real Voice] -E [Judgment Results of False Vehical Vehicle]+gradient constraints.
#In Generator, it is described as E [Judgment Results of False Voice]
def gradient_penalty(netD,real,fake,batch_size,gamma=1):
	device = real.device
	#For Tensor where Requires_grad is valid, the backward method can be called and can automatically calculate the differentiation.
	alpha = torch.rand(batch_size,1,1,requires_grad=True).to(device)
	#Mix the real and fake at a random ratio
	x = alpha*real + (1-alpha)*fake
	#Put it in Discriminator and make the result D_
	d_ = netD.forward(x)
	#Output D_ and input x
	#It is known that if the L2 norm calculated from the inclination becomes 1, it will produce good results.
	#Therefore, calculate Gradient_penalty so that this can be learned so that this approaches 1
	g = torch.autograd.grad(outputs=d_, inputs=x,
							grad_outputs=torch.ones(d_.shape).to(device),
							create_graph=True, retain_graph=True,only_inputs=True)[0]
	g = g.reshape(batch_size, -1)
	return ((g.norm(2,dim=1)/gamma-1.0)**2).mean()

In [None]:
#encoding:utf-8


def make_datapath_list(target_path):
	#Read the dataset
	path_list = []#Create a list of dataset file paths and return
	for path in glob.glob(target_path,recursive=True):
		path_list.append(path)
		##If you need to display all reading paths, remove the comment out
		#print(path)
	#Display the number of audio data to be read
	print("sounds : " + str(len(path_list)))
	return path_list

class GAN_Sound_Dataset(data.Dataset):
	#Voice dataset class
	def __init__(self,file_list,device,batch_size,sound_length=65536,sampling_rate=16000,dat_threshold=1100):
		#file_list     : List of voice paths to read
		#device        : Decide whether to process with GPU
		#batch_size    : Batch size
		#sound_length  : Length of sound used for learning
		#sampling_rate : Sampling rate when reading audio
		#dat_threshold : If the total number of files in the dataset is below Dat_threshold, hold the file content
		self.file_list = file_list
		self.device = device
		self.batch_size = batch_size
		self.sound_length = sound_length
		self.sampling_rate = sampling_rate
		self.dat_threshold = dat_threshold
		#If the total number of files in the dataset is below Dat_threshold, hold the file content
		if(len(self.file_list)<=dat_threshold):
			self.file_contents = []
			for file_path in self.file_list:
				#Sound is Numpy.ndarray, and the data of the chronological sound is stored.
				sound,_ = librosa.load(file_path,sr=self.sampling_rate)
				self.file_contents.append(sound)

	#Returns the larger batch size and the total number of files
	def __len__(self):
		return max(self.batch_size, len(self.file_list))
	#Get data in Tensor format with pre -processed audio
	def __getitem__(self,index):
		if(len(self.file_list)<=self.dat_threshold):
			sound = self.file_contents[index%len(self.file_list)]
		else:
			#Take out one from the list of paths
			sound_path = self.file_list[index%len(self.file_list)]
			#Sound is Numpy.ndarray, and the data of the chronological sound is stored.
			sound,_ = librosa.load(sound_path,sr=self.sampling_rate)
		#Convert to Tensor format
		sound = (torch.from_numpy(sound.astype(np.float32)).clone()).to(self.device)
		#If there is an element that is larger than 1 in the time series sound data, it is normalized so that it will be 1.
		max_amplitude = torch.max(torch.abs(sound))
		if max_amplitude > 1:
			sound /= max_amplitude
		#Make the length of the loaded sound as LOADED_SOUND_LENGTH
		loaded_sound_length = sound.shape[0]
		#If the length of the loaded sound is below Sound_length,
		#Fill the front and rear of the sound by 0 and align the length to Self.sound_length
		if loaded_sound_length < self.sound_length:
			padding_length = self.sound_length - loaded_sound_length
			left_zeros = torch.zeros(padding_length//2).to(self.device)
			right_zeros = torch.zeros(padding_length - padding_length//2).to(self.device)
			sound = torch.cat([left_zeros,sound,right_zeros],dim=0).to(self.device)
			loaded_sound_length = self.sound_length
		#Choose a random part from the readable sound for the length of the sound used for learning and cut it out.
		if loaded_sound_length > self.sound_length:
			#Select the starting point randomly
			start_index = torch.randint(0,(loaded_sound_length-self.sound_length)//2,(1,1))[0][0].item()
			end_index = start_index + self.sound_length
			sound = sound[start_index:end_index]
		#At this point, Sound.shape is TORCH.SIZE ([3, Self.Sound_length]),
		#Convert this to torch.size ([3, 1, self.sound_length])
		sound = sound.unsqueeze(0)
		return sound

#Produced voice output function
def save_sounds(path,sounds,sampling_rate):
	now_time = time.time()
	for i,sound in enumerate(sounds):
		sound = sound.squeeze(0)
		sound = sound.to('cpu').detach().numpy().copy()
		hash_string = hashlib.md5(str(now_time).encode()).hexdigest()
		file_path = os.path.join(path,f"generated_sound_{i}_{hash_string}.wav")
		print(file_path)
		sf.write(file_path,sound,sampling_rate,format="WAV")

#Operation confirmation
# train_wav_list = make_datapath_list('../dataset/**/*.wav')

# batch_size = 3
# dataset = GAN_Sound_Dataset(file_list=train_wav_list,device="cpu",batch_size=batch_size)

# dataloader = torch.utils.data.DataLoader(dataset,batch_size=batch_size,shuffle=True)

# batch_iterator = iter(dataloader)
# sounds = next(batch_iterator)
# save_sounds(sounds,16000)





In [None]:
#encoding:utf-8


class Generator(nn.Module):
	def __init__(self,model_size=32,z_dim=20):
		super().__init__()
		self.model_size = model_size #The value that is described as D in the dissertation

		self.full_connection_1 = nn.Linear(z_dim,512*model_size)

		self.layer_1 = nn.Sequential(
				nn.ConvTranspose1d(in_channels=32*model_size,\
									out_channels=16*model_size,\
									kernel_size=25,\
									stride=4,\
									padding=11,\
									output_padding=1),
				nn.ReLU(inplace=True))
		self.layer_2 = nn.Sequential(
				nn.ConvTranspose1d(in_channels=16*model_size,\
									out_channels=8*model_size,\
									kernel_size=25,\
									stride=4,\
									padding=11,\
									output_padding=1),
				nn.ReLU(inplace=True))
		self.layer_3 = nn.Sequential(
				nn.ConvTranspose1d(in_channels=  8*model_size,\
									out_channels=4*model_size,\
									kernel_size=25,\
									stride=4,\
									padding=11,\
									output_padding=1),
				nn.ReLU(inplace=True))
		self.layer_4 = nn.Sequential(
				nn.ConvTranspose1d(in_channels=  4*model_size,\
									out_channels=2*model_size,\
									kernel_size=25,\
									stride=4,\
									padding=11,\
									output_padding=1),
				nn.ReLU(inplace=True))
		self.layer_5 = nn.Sequential(
				nn.ConvTranspose1d(in_channels=  2*model_size,\
									out_channels=  model_size,\
									kernel_size=25,\
									stride=4,\
									padding=11,\
									output_padding=1),
				nn.ReLU(inplace=True))
		self.layer_6 = nn.Sequential(
				nn.ConvTranspose1d(in_channels=model_size,\
									out_channels=1,\
									kernel_size=25,\
									stride=4,\
									padding=11,\
									output_padding=1),
				nn.Tanh())

	def forward(self, x):
		x = self.full_connection_1(x).view(-1,32*self.model_size,16)
		x = F.relu(x)
		x = self.layer_1(x)
		x = self.layer_2(x)
		x = self.layer_3(x)
		x = self.layer_4(x)
		x = self.layer_5(x)
		output = self.layer_6(x)
		return output









In [None]:
#encoding:utf-8


class Discriminator(nn.Module):
	def __init__(self,model_size=32,shift_factor=2):
		super().__init__()
		self.model_size = model_size #The value that is described as D in the dissertation
		self.shift_factor = shift_factor  #n How to shake

		self.layer_1 = nn.Sequential(
				nn.Conv1d(           1,   model_size,kernel_size=25,stride=4,padding=11),
				nn.LeakyReLU(0.2,inplace=True),
				PhaseShuffle(shift_factor)
				)
		self.layer_2 = nn.Sequential(
				nn.Conv1d(  model_size, 2*model_size,kernel_size=25,stride=4,padding=11),
				nn.LeakyReLU(0.2,inplace=True),
				PhaseShuffle(shift_factor)
				)
		self.layer_3 = nn.Sequential(
				nn.Conv1d(2*model_size, 4*model_size,kernel_size=25,stride=4,padding=11),
				nn.LeakyReLU(0.2,inplace=True),
				PhaseShuffle(shift_factor)
				)
		self.layer_4 = nn.Sequential(
				nn.Conv1d(4*model_size, 8*model_size,kernel_size=25,stride=4,padding=11),
				nn.LeakyReLU(0.2,inplace=True),
				PhaseShuffle(shift_factor)
				)
		self.layer_5 = nn.Sequential(
				nn.Conv1d(8*model_size,16*model_size,kernel_size=25,stride=4,padding=11),
				nn.LeakyReLU(0.2,inplace=True),
				PhaseShuffle(shift_factor)
				)
		self.layer_6 = nn.Sequential(
				nn.Conv1d(16*model_size,32*model_size,kernel_size=25,stride=4,padding=11),
				nn.LeakyReLU(0.2,inplace=True),
				PhaseShuffle(shift_factor)
				)

		self.full_connection_1 = nn.Linear(512*model_size,1)

	def forward(self, x):
		x = self.layer_1(x)
		x = self.layer_2(x)
		x = self.layer_3(x)
		x = self.layer_4(x)
		x = self.layer_5(x)
		x = self.layer_6(x)
		x = x.view(-1,512*self.model_size)
		output = self.full_connection_1(x)
		return output


In [None]:
#encoding:utf-8


#Formatting the path to each data to make_dataPath_list for datasets
dataset_path = '/content/dataset/SM1_F1_A01.wav'
#Batch size
batch_size = 16
#The size of the random number to enter
z_dim = 20
#Number of epochs
num_epochs = 500
#Learning rate used for Optimizer
lr = 0.0001
#Input and output sound sampling rate
sampling_rate = 16000
#How many times to learn Discripor per study at GENERATOR
D_updates_per_G_update = 5
#Generate_sounds_interval [Epoch] Outputs the learning status every time you learn
generate_sounds_interval = 20

#Check if GPU is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("device:",device)

#Read training data, create dataset
train_sound_list = make_datapath_list(dataset_path)
train_dataset = GAN_Sound_Dataset(file_list=train_sound_list,device=device,batch_size=batch_size)
#generator用
dataloader_for_G = torch.utils.data.DataLoader(train_dataset,batch_size=batch_size,shuffle=True)
#discriminator用
dataloader_for_D = torch.utils.data.DataLoader(train_dataset,batch_size=batch_size,shuffle=True)

# #Functions for initializing networks
def weights_init(m):
	if isinstance(m, nn.Conv1d) or isinstance(m, nn.ConvTranspose1d) or isinstance(m,nn.Linear):
		nn.init.kaiming_normal_(m.weight.data)

#Generate Generator instance
netG = Generator(z_dim=z_dim)
#Move the network to the device
netG = netG.to(device)
#Initialization of network
netG.apply(weights_init)

#Generate DISCRIMINATOR instance
netD = Discriminator()
#Move the network to the device
netD = netD.to(device)
#Initialization of network
netD.apply(weights_init)

#Set the optimization method to ADAM
beta1 = 0.5
beta2 = 0.9
optimizerD = optim.Adam(netD.parameters(),lr=lr,betas=(beta1,beta2))
optimizerG = optim.Adam(netG.parameters(),lr=lr,betas=(beta1,beta2))

#Start of learning
#Variables to follow the learning process
G_losses = []
D_losses = []
iters = 0
#Noise to enter Generator to follow the learning process
generating_num = 5#How many sounds do you want to output?
z_sample = torch.Tensor(generating_num,z_dim).uniform_(-1,1).to(device)

print("Starting Training")

#Save the learning start time
t_epoch_start = time.time()
#Loop for each epoch
for epoch in range(num_epochs):
	#BATCH_SIZE Take out and learn from the dataset
	for generator_i,real_sound_for_G in enumerate(dataloader_for_G, 0):
		#-------------------------
 		#Learning of DISCRIMINATOR
		#-------------------------
		#Loss function e [Real Voice Judgment Results] -E [Judgment Results of False Voice]+Learn to maximize the gradient constraints of the gradient
		#Learn D_Updates_per_g_Update times Discriminator per study of Generator
 		#-------------------------
		errD_loss_sum = 0#Variables for taking the average of losses during learning
		for discriminator_i,real_sound_for_D in enumerate(dataloader_for_D, 0):
			if(discriminator_i==D_updates_per_G_update): break
			#Number of audio data actually taken out
			minibatch_size = real_sound_for_D.shape[0]
			#If the number of mini batches taken out is 1, it will be an error in the process of finding the gradient, so skip the processing.
			if(minibatch_size==1): continue
			#If you can use GPU, transfer to GPU
			real_sound_for_D = real_sound_for_D.to(device)
			#Generate noise and make Z
			z = torch.Tensor(minibatch_size,z_dim).uniform_(-1,1).to(device)
			#Put noise in GENERATOR and generate fake sounds and make Fake_sound.
			fake_sound = netG.forward(z)
			#Judge the real sound and store the results in D
			d_real = netD.forward(real_sound_for_D)
			#Judge the false sound and store the result in D_
			d_fake = netD.forward(fake_sound)

			#Take the average of the judgment results for each mini batch
			loss_real = d_real.mean()#-E. Calculate [Real Voice Judgment Results]
			loss_fake = d_fake.mean()#-E. Calculate [Judgment Results of False Vehicle]
			#Calculation of gradient constraints
			loss_gp = gradient_penalty(netD,real_sound_for_D.data,fake_sound.data,minibatch_size)
			beta_gp = 10.0
			#E[Real audio judgment result] -E [Judgment result of false audio]+gradient constraints calculation
			errD = -loss_real + loss_fake + beta_gp*loss_gp
			#The inclination calculated in the previous itelation has remained, so reset it.
			optimizerD.zero_grad()
			#Calculate the inclination of the loss
			errD.backward()
			#Actually propagate errors
			optimizerD.step()
			#Record Loss to take the average later
			errD_loss_sum += errD.item()
		
		#-------------------------
 		#Generator learning
		#-------------------------
		#Loss function -E Learn to maximize [Judgment Results of False Voice]
 		#-------------------------
		#Number of audio data actually taken out
		minibatch_size = real_sound_for_G.shape[0]
		#If the number of mini batches taken out is 1, it will be an error in the process of finding the gradient, so skip the processing.
		if(minibatch_size==1): continue
		#If you can use GPU, transfer to GPU
		real_sound_for_G = real_sound_for_G.to(device)
		#Generate noise
		z = torch.Tensor(minibatch_size,z_dim).uniform_(-1,1).to(device)
		#Enter the noise into the Generator and make the output audio as Fake_sound.
		fake_sound = netG.forward(z)
		#Output audio fake_sound is inferred or fake sound in Discriminator
		d_fake = netD.forward(fake_sound)

		# WGAN_GP takes an average for all inference results in the mini batch and use it for erroneous propagation.
		errG = -d_fake.mean()#E Calculate the [Judgment Results of False Voice]
		#The inclination calculated in the previous itelation has remained, so reset it.
		optimizerG.zero_grad()
		#Calculate the inclination of the loss
		errG.backward()
		#Actually propagate errors
		optimizerG.step()

		#Record Loss to output to the graph later
		G_losses.append(errG.item())
		D_losses.append(errD_loss_sum/D_updates_per_G_update)

		iters += 1
		#Break for testing
		#break
	
	#Output the learning status
	if (epoch%generate_sounds_interval==0 or epoch==num_epochs-1):
		print('[%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\t'
				% (epoch, num_epochs,
					errD_loss_sum/D_updates_per_G_update, errG.item()))
		#Create if there is no output directory
		output_dir = "./output/train/generated_epoch_{}".format(epoch)
		if not os.path.exists(output_dir):
			os.makedirs(output_dir)
		#Output of generated audio
		with torch.no_grad():
			generated_sound = netG(z_sample)
			save_sounds(output_dir,generated_sound,sampling_rate)

#-------------------------
#Output of execution result
#-------------------------

#Output the time spent on learning
#Record the time at the end of learning
t_epoch_finish = time.time()
total_time = t_epoch_finish - t_epoch_start
with open('./output/train/time.txt', mode='w') as f:
	f.write("total_time: {:.4f} sec.\n".format(total_time))
	f.write("dataset size: {}\n".format(len(train_sound_list)))
	f.write("num_epochs: {}\n".format(num_epochs))
	f.write("batch_size: {}\n".format(batch_size))

#Output a learned Generator model (for CPU)
torch.save(netG.to('cpu').state_dict(),"./output/generator_trained_model_cpu.pth")

#Output Loss graph
plt.clf()
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.savefig('./output/train/loss.png')

print("data generated.")


device: cuda:0
sounds : 1
Starting Training
[0/500]	Loss_D: 0.4501	Loss_G: 10.6582	
./output/train/generated_epoch_0/generated_sound_0_b0439f866b0e8fbbc8f079f7dabebf2e.wav
./output/train/generated_epoch_0/generated_sound_1_b0439f866b0e8fbbc8f079f7dabebf2e.wav
./output/train/generated_epoch_0/generated_sound_2_b0439f866b0e8fbbc8f079f7dabebf2e.wav
./output/train/generated_epoch_0/generated_sound_3_b0439f866b0e8fbbc8f079f7dabebf2e.wav
./output/train/generated_epoch_0/generated_sound_4_b0439f866b0e8fbbc8f079f7dabebf2e.wav
[20/500]	Loss_D: -1.3978	Loss_G: 10.9386	
./output/train/generated_epoch_20/generated_sound_0_b7432aeea3499001e922753c16a631d9.wav
./output/train/generated_epoch_20/generated_sound_1_b7432aeea3499001e922753c16a631d9.wav
./output/train/generated_epoch_20/generated_sound_2_b7432aeea3499001e922753c16a631d9.wav
./output/train/generated_epoch_20/generated_sound_3_b7432aeea3499001e922753c16a631d9.wav
./output/train/generated_epoch_20/generated_sound_4_b7432aeea3499001e922753c16

In [None]:
#encoding:utf-8


#Number of audio files to be output
sample_size = 16
#The size of the random number to enter
z_dim = 20
#Sampling rate of voice to handle
sampling_rate = 16000

#Read the learned model
netG = Generator(z_dim=z_dim)
trained_model_path = "./output/generator_trained_model_cpu.pth"
netG.load_state_dict(torch.load(trained_model_path))
#Switch to inference mode
netG.eval()
#Noise generation
noise = torch.Tensor(sample_size,z_dim).uniform_(-1,1)
#Enter GENERATOR and get output image
generated_sound = netG(noise)
#Create if there is no output directory
output_dir = "./output/inference"
if not os.path.exists(output_dir):
	os.makedirs(output_dir)
#Output of audio file
save_sounds("./output/inference/",generated_sound,sampling_rate)

./output/inference/generated_sound_0_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_1_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_2_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_3_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_4_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_5_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_6_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_7_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_8_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_9_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_10_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_11_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_12_127b4c17d3a5606aa9f5f448f761c528.wav
./output/inference/generated_sound_