<a href="https://colab.research.google.com/github/hegde95/GAN-for-speech-spectrogram/blob/master/src/spectroGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

The CycleGAN code in this notebook is based on [this link](https://machinelearningmastery.com/cyclegan-tutorial-with-keras/)

# Complete Code in a cell

Change line 34-37 according to your preference

In [0]:
from google.colab import drive
drive.mount('/content/drive')

import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.utils import plot_model
from tensorflow_addons.layers import InstanceNormalization
import matplotlib.pyplot as plt
import cv2
import librosa
import time
from os import path
import os
import imageio
from scipy.io.wavfile import write

def mkdir(base, name):
  path = os.path.join(base, name)
  if not os.path.exists(path):
    os.makedirs(path)
  return path

# change the following 4 lines
main_dir = "/content/drive/My Drive/EE599/Project"
path_to_npz = "/content/drive/My Drive/EE599/Project/npzs/happy2sad.npz"
domain_a = "happy"
domain_b = "sad"

path_to_logs = mkdir(main_dir,"logs")
path_to_log = mkdir(path_to_logs,domain_a+'-'+domain_b+'_'+str(int(time.time())))

# define the discriminator model
def define_discriminator(image_shape):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# source image input
	in_image = Input(shape=image_shape)
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(in_image)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# second last output layer
	d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# patch output
	patch_out = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
	# define model
	model = Model(in_image, patch_out)
	# compile model
	model.compile(loss='mse', optimizer=Adam(lr=0.0002, beta_1=0.5), loss_weights=[0.5])
	plot_model(model, to_file='discriminator_model.png', show_shapes=True, show_layer_names=True)
	return model

# generator a resnet block
def resnet_block(n_filters, input_layer):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# first layer convolutional layer
	g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(input_layer)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# second convolutional layer
	g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	# concatenate merge channel-wise with input layer
	g = Concatenate()([g, input_layer])
	return g

# define the standalone generator model
def define_generator(image_shape, n_resnet=6):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# image input
	in_image = Input(shape=image_shape)
	# c7s1-64
	g = Conv2D(64, (7,7), padding='same', kernel_initializer=init)(in_image)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# d128
	g = Conv2D(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# d256
	g = Conv2D(256, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# R256
	for _ in range(n_resnet):
		g = resnet_block(256, g)
	# u128
	g = Conv2DTranspose(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# u64
	g = Conv2DTranspose(64, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# c7s1-3
	g = Conv2D(1, (7,7), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	out_image = Activation('tanh')(g)
	# define model
	model = Model(in_image, out_image)
	#plot_model(model, to_file='generator_model.png', show_shapes=True, show_layer_names=True)
	return model

# define a composite model for updating generators by adversarial and cycle loss
def define_composite_model(g_model_1, d_model, g_model_2, image_shape):
	# ensure the model we're updating is trainable
	g_model_1.trainable = True
	# mark discriminator as not trainable
	d_model.trainable = False
	# mark other generator model as not trainable
	g_model_2.trainable = False
	# discriminator element
	input_gen = Input(shape=image_shape)
	gen1_out = g_model_1(input_gen)
	output_d = d_model(gen1_out)
	# identity element
	input_id = Input(shape=image_shape)
	output_id = g_model_1(input_id)
	# forward cycle
	output_f = g_model_2(gen1_out)
	# backward cycle
	gen2_out = g_model_2(input_id)
	output_b = g_model_1(gen2_out)
	# define model graph
	model = Model([input_gen, input_id], [output_d, output_id, output_f, output_b])
	# define optimization algorithm configuration
	opt = Adam(lr=0.0002, beta_1=0.5)
	# compile model with weighting of least squares loss and L1 loss
	model.compile(loss=['mse', 'mae', 'mae', 'mae'], loss_weights=[1, 5, 10, 10], optimizer=opt)
	#	plot_model(model, to_file='composite_model.png', show_shapes=True, show_layer_names=True)
	return model

# load and prepare training images
def load_real_samples(filename):
	# load the dataset
	data = np.load(filename)
	# unpack arrays
	X1, X2 = data['arr_0'], data['arr_1']
	# scale from [0,255] to [-1,1]
	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	return [X1, X2]

# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
	# choose random instances
	ix = np.random.randint(0, dataset.shape[0], n_samples)
	# retrieve selected images
	X = dataset[ix]
	# generate 'real' class labels (1)
	y = np.ones((n_samples, patch_shape, patch_shape, 1))
	return X, y

# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, dataset, patch_shape):
	# generate fake instance
	X = g_model.predict(dataset)
	# create 'fake' class labels (0)
	y = np.zeros((len(X), patch_shape, patch_shape, 1))
	return X, y

# save the generator models to file
def save_models(step, g_model_AtoB, g_model_BtoA):
	path_to_step_folder = mkdir(path_to_log,str(step+1))

	# save the first generator model
	filename1 = 'g_model_AtoB_%06d.h5' % (step+1)
	g_model_AtoB.save(path.join(path_to_step_folder,filename1))
	# save the second generator model
	filename2 = 'g_model_BtoA_%06d.h5' % (step+1)
	g_model_BtoA.save(path.join(path_to_step_folder,filename2))
	print('>Saved: %s and %s' % (filename1, filename2))
 
def make_test_folder(X_in, X_out, name, step, n_samples=5):
	path_to_step_folder = mkdir(path_to_log,str(step+1))
	path_to_step_folder_name = mkdir(path_to_step_folder,name)
	for i in range(n_samples):
		plt.subplot(2, n_samples, 1 + i)
		plt.axis('off')
		plt.imshow(X_in[i].reshape(260,260), cmap='gray')
		imageio.imwrite(path.join(path_to_step_folder_name,str(i)+"_real_"+name.split('_')[0]+".jpg"), X_in[i].reshape(260,260))
		im = cv2.imread(path.join(path_to_step_folder_name,str(i)+"_real_"+name.split('_')[0]+".jpg"),-1)
		im = im[:257,:257]
		im = (im*80.0/255.0 ) -80.0
		im = librosa.db_to_amplitude(im)
		y2 = griffinlim(im,hop_length=256)
		write(path.join(path_to_step_folder_name,str(i)+"_real_"+name.split('_')[0]+".wav"), 16000, y2*1.5)
	# plot translated image
	for i in range(n_samples):
		plt.subplot(2, n_samples, 1 + n_samples + i)
		plt.axis('off')
		plt.imshow(X_out[i].reshape(260,260), cmap='gray')
		imageio.imwrite(path.join(path_to_step_folder_name,str(i)+"_generated_"+name.split('_')[2]+".jpg"), X_out[i].reshape(260,260))
		im = cv2.imread(path.join(path_to_step_folder_name,str(i)+"_generated_"+name.split('_')[2]+".jpg"),-1)
		im = im[:257,:257]
		im = (im*80.0/255.0 ) -80.0
		im = librosa.db_to_amplitude(im)
		y2 = griffinlim(im,hop_length=256)
		write(path.join(path_to_step_folder_name,str(i)+"_generated_"+name.split('_')[2]+".wav"), 16000, y2*1.5)
	# save plot to file
	filename1 = '%s_generated_plot_%06d.png' % (name, (step+1))
	plt.savefig(path.join(path_to_step_folder,filename1),dpi = 300)
	plt.close()  




# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, trainX, name, n_samples=5):
	# select a sample of input images
	X_in, _ = generate_real_samples(trainX, n_samples, 0)
	# generate translated images
	X_out, _ = generate_fake_samples(g_model, X_in, 0)
	# scale all pixels from [-1,1] to [0,1]
	X_in = (X_in + 1) / 2.0
	X_out = (X_out + 1) / 2.0
	make_test_folder(X_in, X_out, name, step, n_samples)

# update image pool for fake images
def update_image_pool(pool, images, max_size=50):
	selected = list()
	for image in images:
		if len(pool) < max_size:
			# stock the pool
			pool.append(image)
			selected.append(image)
		elif np.random.random() < 0.5:
			# use image, but don't add it to the pool
			selected.append(image)
		else:
			# replace an existing image and use replaced image
			ix = np.random.randint(0, len(pool))
			selected.append(pool[ix])
			pool[ix] = image
	return np.asarray(selected)
def griffinlim(S, n_iter=32, hop_length=None, win_length=None, window='hann',
		center=True, dtype=np.float32, length=None, pad_mode='reflect',
		momentum=0.99, init='random', random_state=None):

    if random_state is None:
        rng = np.random
    elif isinstance(random_state, int):
        rng = np.random.RandomState(seed=random_state)
    elif isinstance(random_state, np.random.RandomState):
        rng = random_state

    if momentum > 1:
        warnings.warn('Griffin-Lim with momentum={} > 1 can be unstable. '
                      'Proceed with caution!'.format(momentum))
    elif momentum < 0:
        raise ParameterError('griffinlim() called with momentum={} < 0'.format(momentum))

    # Infer n_fft from the spectrogram shape
    n_fft = 2 * (S.shape[0] - 1)

    # using complex64 will keep the result to minimal necessary precision
    angles = np.empty(S.shape, dtype=np.complex64)
    if init == 'random':
        # randomly initialize the phase
        angles[:] = np.exp(2j * np.pi * rng.rand(*S.shape))
    elif init is None:
        # Initialize an all ones complex matrix
        angles[:] = 1.0
    else:
        raise ParameterError("init={} must either None or 'random'".format(init))

    # And initialize the previous iterate to 0
    rebuilt = 0.

    for _ in range(n_iter):
        # Store the previous iterate
        tprev = rebuilt

        # Invert with our current estimate of the phases
        inverse = librosa.istft(S * angles, hop_length=hop_length, win_length=win_length,
                        window=window, center=center, dtype=dtype, length=length)

        # Rebuild the spectrogram
        rebuilt = librosa.stft(inverse, n_fft=n_fft, hop_length=hop_length,
                       win_length=win_length, window=window, center=center,
                       pad_mode=pad_mode)

        # Update our phase estimates
        angles[:] = rebuilt - (momentum / (1 + momentum)) * tprev
        angles[:] /= np.abs(angles) + 1e-16

    # Return the final phase estimates
    return librosa.istft(S * angles, hop_length=hop_length, win_length=win_length,
                 window=window, center=center, dtype=dtype, length=length)
    

# train cyclegan models
def train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset):
	# define properties of the training run
	n_epochs, n_batch, = 500, 1
	# determine the output square shape of the discriminator
	n_patch = d_model_A.output_shape[1]
	# unpack dataset
	trainA, trainB = dataset
	# prepare image pool for fakes
	poolA, poolB = list(), list()
	# calculate the number of batches per training epoch
	bat_per_epo = int(len(trainA) / n_batch)
	# calculate the number of training iterations
	n_steps = bat_per_epo * n_epochs
	# manually enumerate epochs
	for i in range(n_steps):
		# select a batch of real samples
		X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
		X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
		# generate a batch of fake samples
		X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
		X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
		# update fakes from pool
		X_fakeA = update_image_pool(poolA, X_fakeA)
		X_fakeB = update_image_pool(poolB, X_fakeB)
		# update generator B->A via adversarial and cycle loss
		g_loss2, _, _, _, _  = c_model_BtoA.train_on_batch([X_realB, X_realA], [y_realA, X_realA, X_realB, X_realA])
		# update discriminator for A -> [real/fake]
		dA_loss1 = d_model_A.train_on_batch(X_realA, y_realA)
		dA_loss2 = d_model_A.train_on_batch(X_fakeA, y_fakeA)
		# update generator A->B via adversarial and cycle loss
		g_loss1, _, _, _, _ = c_model_AtoB.train_on_batch([X_realA, X_realB], [y_realB, X_realB, X_realA, X_realB])
		# update discriminator for B -> [real/fake]
		dB_loss1 = d_model_B.train_on_batch(X_realB, y_realB)
		dB_loss2 = d_model_B.train_on_batch(X_fakeB, y_fakeB)
		# summarize performance
		print('>%d, dA[%.3f,%.3f] dB[%.3f,%.3f] g[%.3f,%.3f]' % (i+1, dA_loss1,dA_loss2, dB_loss1,dB_loss2, g_loss1,g_loss2))
		# evaluate the model performance every so often
		if (i+1) % (bat_per_epo * 1) == 0:
			# plot A->B translation
			summarize_performance(i, g_model_AtoB, trainA, domain_a+'_to_'+domain_b)
			# plot B->A translation
			summarize_performance(i, g_model_BtoA, trainB, domain_b+'_to_'+domain_a)
		if (i+1) % (bat_per_epo * 5) == 0:
			# save the models
			save_models(i, g_model_AtoB, g_model_BtoA)
   
# load image data
dataset = load_real_samples(path_to_npz)
#dataset[0] = dataset[0][:int(0.25*len(dataset[0]))]
#dataset[1] = dataset[1][:int(0.20*len(dataset[1]))]
print('Loaded', dataset[0].shape, dataset[1].shape)
# define input shape based on the loaded dataset
image_shape = dataset[0].shape[1:]

image_shape = (260,260,1)

# generator: A -> B
g_model_AtoB = define_generator(image_shape)
# generator: B -> A
g_model_BtoA = define_generator(image_shape)
# discriminator: A -> [real/fake]
d_model_A = define_discriminator(image_shape)
# discriminator: B -> [real/fake]
d_model_B = define_discriminator(image_shape)
# composite: A -> B -> [real/fake, A]
c_model_AtoB = define_composite_model(g_model_AtoB, d_model_B, g_model_BtoA, image_shape)
# composite: B -> A -> [real/fake, B]
c_model_BtoA = define_composite_model(g_model_BtoA, d_model_A, g_model_AtoB, image_shape)

# train models
train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset)

# Commented code to make required npz

In [0]:
#!gdown https://drive.google.com/uc?id=1NzLsOqGvYanh2zM8jcb8_CTIjxFBJXA8

In [2]:
# # #!unzip ravdess.zip
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
# import librosa
# import random
# import scipy.misc
# import imageio

# from PIL import Image


# import numpy as np

# from os import path,listdir

# path_to_audio = "/content/drive/My Drive/EE599/Project/ravdes/audio"
# path_to_spec = "/content/drive/My Drive/EE599/Project/ravdes/spec"
# emotions = ["anger","calm","disgust","fearful","happy","neutral","sad","surprised"]

# def get_audio(path_to_audio):
#     y, sr = librosa.load(path_to_audio,sr=16000)
#     return y,sr

# def get_spectrogram(y):
#     D = np.abs(librosa.stft(y,n_fft = 512,hop_length = 256))
#     amp_max = np.amax(D)
#     x = librosa.amplitude_to_db(D, ref=np.max)
#     return x

# def save_spec(array, name):
#     imageio.imwrite(name, array)

# for emotion in emotions:
#     for file_name in listdir(path.join(path_to_audio,emotion)):
#         file = path.join(*[path_to_audio,emotion,file_name])
#         y,sr = get_audio(file)
#         x = get_spectrogram(y)
#         save_spec(x,path.join(*[path_to_spec,emotion,file_name[:-4]+'.jpg']))

In [10]:
# from os import path,listdir
# import os
# import cv2
# import numpy as np
# max_len = 260

# # load and prepare training images
# def getListOfFiles(dirName):
#     # create a list of file and sub directories 
#     # names in the given directory 
#     listOfFile = os.listdir(dirName)
#     allFiles = list()
#     # Iterate over all the entries
#     for entry in listOfFile:
#         # Create full path
#         fullPath = os.path.join(dirName, entry)
#         # If entry is a directory then get the list of files in this directory 
#         if os.path.isdir(fullPath):
#             allFiles = allFiles + getListOfFiles(fullPath)
#         else:
#             allFiles.append(fullPath)
                
#     return allFiles
    
# def make_npz(emo1,emo2):
#   dirNameA = "/content/drive/My Drive/EE599/Project/ravdes/spec/"+emo1
#   dirNameB = "/content/drive/My Drive/EE599/Project/ravdes/spec/"+emo2
#   print("making {}".format(emo1+'2'+emo2+'.npz'))
#   #neutral
#   trainA = []
#   listA = getListOfFiles(dirNameA)
#   listA.sort()
#   for name in listA:
#     img = cv2.imread(name,-1)
#     img_f = np.zeros((260,260))
#     y_len = img.shape[1]
#     img = img[:,y_len//2-max_len//2:y_len//2+max_len//2]
#     img_f[:257,:257] = img[:257,:257]
#     trainA.append(img_f)
#   trainA = np.array(trainA)

#   trainB = []
#   listB = getListOfFiles(dirNameB)
#   listB.sort()
#   for name in listB:
#     img = cv2.imread(name,-1)
#     y_len = img.shape[1]
#     img = img[:,y_len//2-max_len//2:y_len//2+max_len//2]
#     img_f = np.zeros((260,260))
#     img_f[:257,:257] = img[:257,:257]
#     trainB.append(img_f)
#   trainB = np.array(trainB)
#   filename = '/content/drive/My Drive/EE599/Project/npzs2/'+emo1+'2'+emo2+'.npz'
#   np.savez_compressed(filename, trainA, trainB)

# emotions = ["calm","anger","disgust","fearful","happy","neutral","sad","surprised"]
# for i in range(len(emotions)):
#     for j in range(i+1,len(emotions)):
#         make_npz(emotions[i],emotions[j])

making calm2anger.npz
making calm2disgust.npz
making calm2fearful.npz
making calm2happy.npz
making calm2neutral.npz
making calm2sad.npz
making calm2surprised.npz
making anger2disgust.npz
making anger2fearful.npz
making anger2happy.npz
making anger2neutral.npz
making anger2sad.npz
making anger2surprised.npz
making disgust2fearful.npz
making disgust2happy.npz
making disgust2neutral.npz
making disgust2sad.npz
making disgust2surprised.npz
making fearful2happy.npz
making fearful2neutral.npz
making fearful2sad.npz
making fearful2surprised.npz
making happy2neutral.npz
making happy2sad.npz
making happy2surprised.npz
making neutral2sad.npz
making neutral2surprised.npz
making sad2surprised.npz


# Set Up

In [0]:
#@title Mount Drive - Please authourise if needed

from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
#@title Imports

import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.utils import plot_model
from tensorflow_addons.layers import InstanceNormalization
import matplotlib.pyplot as plt
import cv2
import librosa
import time
from os import path
import os
import imageio
from scipy.io.wavfile import write

In [0]:
#@title Data and Log Locations

import time
def mkdir(base, name):
  path = os.path.join(base, name)
  if not os.path.exists(path):
    os.makedirs(path)
  return path

#@markdown Give domain names (ensure the name of the npz is domain_a2domain_b).
domain_a = "happy" #@param {type:"string"}
domain_b = "sad" #@param {type:"string"}

#@markdown Give complete drive path to the npz file here.
path_to_npz = "/content/drive/My Drive/EE599/Project/npzs/happy2sad.npz" #@param {type:"string"}

 
#@markdown Give complete drive path to the npz file here.
main_dir = "/content/drive/My Drive/EE599/Project" #@param {type:"string"}
path_to_logs = mkdir(main_dir,"logs")
path_to_log = mkdir(path_to_logs,domain_a+'-'+domain_b+'_'+str(int(time.time())))

assert path_to_npz.split("/")[-1][:-4] == domain_a+"2"+domain_b

# Code

In [0]:
#@title GAN Util methods
# define the discriminator model
def define_discriminator(image_shape):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# source image input
	in_image = Input(shape=image_shape)
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(in_image)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# second last output layer
	d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# patch output
	patch_out = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
	# define model
	model = Model(in_image, patch_out)
	# compile model
	model.compile(loss='mse', optimizer=Adam(lr=0.0002, beta_1=0.5), loss_weights=[0.5])
	plot_model(model, to_file='discriminator_model.png', show_shapes=True, show_layer_names=True)
	return model

# generator a resnet block
def resnet_block(n_filters, input_layer):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# first layer convolutional layer
	g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(input_layer)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# second convolutional layer
	g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	# concatenate merge channel-wise with input layer
	g = Concatenate()([g, input_layer])
	return g

# define the standalone generator model
def define_generator(image_shape, n_resnet=6):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# image input
	in_image = Input(shape=image_shape)
	# c7s1-64
	g = Conv2D(64, (7,7), padding='same', kernel_initializer=init)(in_image)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# d128
	g = Conv2D(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# d256
	g = Conv2D(256, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# R256
	for _ in range(n_resnet):
		g = resnet_block(256, g)
	# u128
	g = Conv2DTranspose(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# u64
	g = Conv2DTranspose(64, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# c7s1-3
	g = Conv2D(1, (7,7), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	out_image = Activation('tanh')(g)
	# define model
	model = Model(in_image, out_image)
	#plot_model(model, to_file='generator_model.png', show_shapes=True, show_layer_names=True)
	return model

# define a composite model for updating generators by adversarial and cycle loss
def define_composite_model(g_model_1, d_model, g_model_2, image_shape):
	# ensure the model we're updating is trainable
	g_model_1.trainable = True
	# mark discriminator as not trainable
	d_model.trainable = False
	# mark other generator model as not trainable
	g_model_2.trainable = False
	# discriminator element
	input_gen = Input(shape=image_shape)
	gen1_out = g_model_1(input_gen)
	output_d = d_model(gen1_out)
	# identity element
	input_id = Input(shape=image_shape)
	output_id = g_model_1(input_id)
	# forward cycle
	output_f = g_model_2(gen1_out)
	# backward cycle
	gen2_out = g_model_2(input_id)
	output_b = g_model_1(gen2_out)
	# define model graph
	model = Model([input_gen, input_id], [output_d, output_id, output_f, output_b])
	# define optimization algorithm configuration
	opt = Adam(lr=0.0002, beta_1=0.5)
	# compile model with weighting of least squares loss and L1 loss
	model.compile(loss=['mse', 'mae', 'mae', 'mae'], loss_weights=[1, 5, 10, 10], optimizer=opt)
	#	plot_model(model, to_file='composite_model.png', show_shapes=True, show_layer_names=True)
	return model

# load and prepare training images
def load_real_samples(filename):
	# load the dataset
	data = np.load(filename)
	# unpack arrays
	X1, X2 = data['arr_0'], data['arr_1']
	# scale from [0,255] to [-1,1]
	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	return [X1, X2]

# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
	# choose random instances
	ix = np.random.randint(0, dataset.shape[0], n_samples)
	# retrieve selected images
	X = dataset[ix]
	# generate 'real' class labels (1)
	y = np.ones((n_samples, patch_shape, patch_shape, 1))
	return X, y

# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, dataset, patch_shape):
	# generate fake instance
	X = g_model.predict(dataset)
	# create 'fake' class labels (0)
	y = np.zeros((len(X), patch_shape, patch_shape, 1))
	return X, y

# save the generator models to file
def save_models(step, g_model_AtoB, g_model_BtoA):
	path_to_step_folder = mkdir(path_to_log,str(step+1))

	# save the first generator model
	filename1 = 'g_model_AtoB_%06d.h5' % (step+1)
	g_model_AtoB.save(path.join(path_to_step_folder,filename1))
	# save the second generator model
	filename2 = 'g_model_BtoA_%06d.h5' % (step+1)
	g_model_BtoA.save(path.join(path_to_step_folder,filename2))
	print('>Saved: %s and %s' % (filename1, filename2))
 
def make_test_folder(X_in, X_out, name, step, n_samples=5):
	path_to_step_folder = mkdir(path_to_log,str(step+1))
	path_to_step_folder_name = mkdir(path_to_step_folder,name)
	for i in range(n_samples):
		plt.subplot(2, n_samples, 1 + i)
		plt.axis('off')
		plt.imshow(X_in[i].reshape(260,260), cmap='gray')
		imageio.imwrite(path.join(path_to_step_folder_name,str(i)+"_real_"+name.split('_')[0]+".jpg"), X_in[i].reshape(260,260))
		im = cv2.imread(path.join(path_to_step_folder_name,str(i)+"_real_"+name.split('_')[0]+".jpg"),-1)
		im = im[:257,:257]
		im = (im*80.0/255.0 ) -80.0
		im = librosa.db_to_amplitude(im)
		y2 = griffinlim(im,hop_length=256)
		write(path.join(path_to_step_folder_name,str(i)+"_real_"+name.split('_')[0]+".wav"), 16000, y2*1.5)
	# plot translated image
	for i in range(n_samples):
		plt.subplot(2, n_samples, 1 + n_samples + i)
		plt.axis('off')
		plt.imshow(X_out[i].reshape(260,260), cmap='gray')
		imageio.imwrite(path.join(path_to_step_folder_name,str(i)+"_generated_"+name.split('_')[2]+".jpg"), X_out[i].reshape(260,260))
		im = cv2.imread(path.join(path_to_step_folder_name,str(i)+"_generated_"+name.split('_')[2]+".jpg"),-1)
		im = im[:257,:257]
		im = (im*80.0/255.0 ) -80.0
		im = librosa.db_to_amplitude(im)
		y2 = griffinlim(im,hop_length=256)
		write(path.join(path_to_step_folder_name,str(i)+"_generated_"+name.split('_')[2]+".wav"), 16000, y2*1.5)
	# save plot to file
	filename1 = '%s_generated_plot_%06d.png' % (name, (step+1))
	plt.savefig(path.join(path_to_step_folder,filename1),dpi = 300)
	plt.close()  




# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, trainX, name, n_samples=5):
	# select a sample of input images
	X_in, _ = generate_real_samples(trainX, n_samples, 0)
	# generate translated images
	X_out, _ = generate_fake_samples(g_model, X_in, 0)
	# scale all pixels from [-1,1] to [0,1]
	X_in = (X_in + 1) / 2.0
	X_out = (X_out + 1) / 2.0
	make_test_folder(X_in, X_out, name, step, n_samples)

# update image pool for fake images
def update_image_pool(pool, images, max_size=50):
	selected = list()
	for image in images:
		if len(pool) < max_size:
			# stock the pool
			pool.append(image)
			selected.append(image)
		elif np.random.random() < 0.5:
			# use image, but don't add it to the pool
			selected.append(image)
		else:
			# replace an existing image and use replaced image
			ix = np.random.randint(0, len(pool))
			selected.append(pool[ix])
			pool[ix] = image
	return np.asarray(selected)
def griffinlim(S, n_iter=32, hop_length=None, win_length=None, window='hann',
		center=True, dtype=np.float32, length=None, pad_mode='reflect',
		momentum=0.99, init='random', random_state=None):

    if random_state is None:
        rng = np.random
    elif isinstance(random_state, int):
        rng = np.random.RandomState(seed=random_state)
    elif isinstance(random_state, np.random.RandomState):
        rng = random_state

    if momentum > 1:
        warnings.warn('Griffin-Lim with momentum={} > 1 can be unstable. '
                      'Proceed with caution!'.format(momentum))
    elif momentum < 0:
        raise ParameterError('griffinlim() called with momentum={} < 0'.format(momentum))

    # Infer n_fft from the spectrogram shape
    n_fft = 2 * (S.shape[0] - 1)

    # using complex64 will keep the result to minimal necessary precision
    angles = np.empty(S.shape, dtype=np.complex64)
    if init == 'random':
        # randomly initialize the phase
        angles[:] = np.exp(2j * np.pi * rng.rand(*S.shape))
    elif init is None:
        # Initialize an all ones complex matrix
        angles[:] = 1.0
    else:
        raise ParameterError("init={} must either None or 'random'".format(init))

    # And initialize the previous iterate to 0
    rebuilt = 0.

    for _ in range(n_iter):
        # Store the previous iterate
        tprev = rebuilt

        # Invert with our current estimate of the phases
        inverse = librosa.istft(S * angles, hop_length=hop_length, win_length=win_length,
                        window=window, center=center, dtype=dtype, length=length)

        # Rebuild the spectrogram
        rebuilt = librosa.stft(inverse, n_fft=n_fft, hop_length=hop_length,
                       win_length=win_length, window=window, center=center,
                       pad_mode=pad_mode)

        # Update our phase estimates
        angles[:] = rebuilt - (momentum / (1 + momentum)) * tprev
        angles[:] /= np.abs(angles) + 1e-16

    # Return the final phase estimates
    return librosa.istft(S * angles, hop_length=hop_length, win_length=win_length,
                 window=window, center=center, dtype=dtype, length=length)

In [0]:
#@title GAN train method
# train cyclegan models
def train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset):
	# define properties of the training run
	n_epochs, n_batch, = 500, 1
	# determine the output square shape of the discriminator
	n_patch = d_model_A.output_shape[1]
	# unpack dataset
	trainA, trainB = dataset
	# prepare image pool for fakes
	poolA, poolB = list(), list()
	# calculate the number of batches per training epoch
	bat_per_epo = int(len(trainA) / n_batch)
	# calculate the number of training iterations
	n_steps = bat_per_epo * n_epochs
	# manually enumerate epochs
	for i in range(n_steps):
		# select a batch of real samples
		X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
		X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
		# generate a batch of fake samples
		X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
		X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
		# update fakes from pool
		X_fakeA = update_image_pool(poolA, X_fakeA)
		X_fakeB = update_image_pool(poolB, X_fakeB)
		# update generator B->A via adversarial and cycle loss
		g_loss2, _, _, _, _  = c_model_BtoA.train_on_batch([X_realB, X_realA], [y_realA, X_realA, X_realB, X_realA])
		# update discriminator for A -> [real/fake]
		dA_loss1 = d_model_A.train_on_batch(X_realA, y_realA)
		dA_loss2 = d_model_A.train_on_batch(X_fakeA, y_fakeA)
		# update generator A->B via adversarial and cycle loss
		g_loss1, _, _, _, _ = c_model_AtoB.train_on_batch([X_realA, X_realB], [y_realB, X_realB, X_realA, X_realB])
		# update discriminator for B -> [real/fake]
		dB_loss1 = d_model_B.train_on_batch(X_realB, y_realB)
		dB_loss2 = d_model_B.train_on_batch(X_fakeB, y_fakeB)
		# summarize performance
		print('>%d, dA[%.3f,%.3f] dB[%.3f,%.3f] g[%.3f,%.3f]' % (i+1, dA_loss1,dA_loss2, dB_loss1,dB_loss2, g_loss1,g_loss2))
		# evaluate the model performance every so often
		if (i+1) % (bat_per_epo * 1) == 0:
			# plot A->B translation
			summarize_performance(i, g_model_AtoB, trainA, domain_a+'_to_'+domain_b)
			# plot B->A translation
			summarize_performance(i, g_model_BtoA, trainB, domain_b+'_to_'+domain_a)
		if (i+1) % (bat_per_epo * 5) == 0:
			# save the models
			save_models(i, g_model_AtoB, g_model_BtoA)

In [0]:
#@title Load the dataset
# load image data
dataset = load_real_samples(path_to_npz)
print('Loaded', dataset[0].shape, dataset[1].shape)
# define input shape based on the loaded dataset
# image_shape = dataset[0].shape[1:]
image_shape = (260,260,1)

Loaded (202, 260, 260) (196, 260, 260)


In [0]:
#@title Initialize Cycle GAN
# generator: A -> B
g_model_AtoB = define_generator(image_shape)
# generator: B -> A
g_model_BtoA = define_generator(image_shape)
# discriminator: A -> [real/fake]
d_model_A = define_discriminator(image_shape)
# discriminator: B -> [real/fake]
d_model_B = define_discriminator(image_shape)
# composite: A -> B -> [real/fake, A]
c_model_AtoB = define_composite_model(g_model_AtoB, d_model_B, g_model_BtoA, image_shape)
# composite: B -> A -> [real/fake, B]
c_model_BtoA = define_composite_model(g_model_BtoA, d_model_A, g_model_AtoB, image_shape)

In [0]:
#@title Train the GAN
# train models
train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset)

# Fin