In [None]:
!nvidia-smi # Check GPU type, Tesla T4 (good) and P100 (best) are better

In [2]:
# mount google drive
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [3]:
# clone the speech dataset
%cd drive/My Drive/Courses/CSC412
# !git clone --recursive https://github.com/Jakobovski/free-spoken-digit-dataset.git # clone the dataset into local

/content/drive/My Drive/Courses/CSC412


In [4]:
import os, sys, json
import numpy as np
import pandas as pd
import keras.backend as K
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.io.wavfile as wav
import torch

from torch.utils.data import Dataset, DataLoader
from __future__ import division, print_function
from IPython import display
from shutil import copyfile
from scipy.spatial import distance as dist
from scipy import stats
from sklearn import preprocessing, manifold, decomposition, random_projection, neighbors, metrics, linear_model
from sklearn.model_selection import cross_val_score
from keras.layers import *
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam
from keras import losses
from keras.utils import to_categorical
from keras.models import load_model
from os import listdir
from os.path import isfile, join


sns.set_style('whitegrid')
sns.set_context('talk', font_scale=1.2)
np.random.seed(412)
tf.random.set_seed(412)
base_path = "/content/drive/MyDrive/Courses/CSC412/free-spoken-digit-dataset/"

# from tensorflow.examples.tutorials.mnist import input_data
# mnist = input_data.read_data_sets('MNIST_data', one_hot=False)

In [5]:
# Helper functions
def wav_to_spectrogram(audio_path, save_path, spectrogram_dimensions=(64, 64), noverlap=16, cmap='gray_r'):
    """ Creates a spectrogram of a wav file.

    :param audio_path: path of wav file
    :param save_path:  path of spectrogram to save
    :param spectrogram_dimensions: number of pixels the spectrogram should be. Defaults (64,64)
    :param noverlap: See http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.spectrogram.html
    :param cmap: the color scheme to use for the spectrogram. Defaults to 'gray_r'
    :return:
    """

    sample_rate, samples = wav.read(audio_path)

    fig = plt.figure()
    fig.set_size_inches((spectrogram_dimensions[0]/fig.get_dpi(), spectrogram_dimensions[1]/fig.get_dpi()))
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    ax.specgram(samples, cmap=cmap, Fs=2, noverlap=noverlap)
    ax.xaxis.set_major_locator(plt.NullLocator())
    ax.yaxis.set_major_locator(plt.NullLocator())
    fig.savefig(save_path, bbox_inches="tight", pad_inches=0)


def dir_to_spectrogram(audio_dir, spectrogram_dir, spectrogram_dimensions=(64, 64), noverlap=16, cmap='gray_r'):
    """ Creates spectrograms of all the audio files in a dir

    :param audio_dir: path of directory with audio files
    :param spectrogram_dir: path to save spectrograms
    :param spectrogram_dimensions: tuple specifying the dimensions in pixes of the created spectrogram. default:(64,64)
    :param noverlap: See http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.spectrogram.html
    :param cmap: the color scheme to use for the spectrogram. Defaults to 'gray_r'
    :return:
    """
    file_names = [f for f in listdir(audio_dir) if isfile(join(audio_dir, f)) and '.wav' in f]

    for file_name in file_names:
        print(file_name)
        audio_path = audio_dir + file_name
        spectogram_path = spectrogram_dir + file_name.replace('.wav', '.png')
        wav_to_spectrogram(audio_path, spectogram_path, spectrogram_dimensions=spectrogram_dimensions, noverlap=noverlap, cmap=cmap)


def sample_latent_space(model, n_dims, n=20, sample_type='uniform', scale=True):
    '''Sample the latent space of n_dims, 
    then generate data (images) using model to organize generated 
    data (images) into a squared canvas for plotting.
    model: need to have `generate` method
    n_dims: the dimension of the latent space
    n: number of images along the canvas
    '''
    dim = 64
    canvas = np.empty((dim*n, dim*n))
    if sample_type == 'uniform':
        zs_mu = np.random.uniform(-3, 3, n_dims * n**2)
    elif sample_type == 'normal':
        zs_mu = np.random.randn(n_dims * n**2)
    zs_mu = zs_mu.reshape(n**2, n_dims)
    
    xs_gen = model.generate(zs_mu)
    c = 0
    for i in range(n):
        for j in range(n):
            x = xs_gen[c]
            if scale:
                x = preprocessing.minmax_scale(x.T).T
            canvas[(n-i-1)*dim:(n-i)*dim, j*dim:(j+1)*dim] = x.reshape(dim, dim)
            c += 1
    return canvas

In [6]:
## Creat the directory for spectrograms
# audio_dir = "/content/drive/MyDrive/Courses/CSC412/free-spoken-digit-dataset/recordings/"
# spectrogram_dir = "/content/drive/MyDrive/Courses/CSC412/free-spoken-digit-dataset/spectrograms/"
# dir_to_spectrogram(audio_dir, spectrogram_dir)

In [7]:
class BiGAN():
    def __init__(self, g_n_layers=[784, 10], d_n_layers=[100, 10], learning_rate=0.001, build_model=True):
        '''
        BiGAN: Bidirectional Generative Adversarial Network
        g_n_layers(list): number of neurons for generator network, 
            the reverse is for the encoder network, the first element should be 
            the input dim, last element should be the latent dim.
        d_n_layers(list): number of hidden units, the first element is the first hidden layer, 
            the input dim will be g_n_layers[0] + g_n_layers[-1].
        '''
        self.g_n_layers = g_n_layers
        self.d_n_layers = d_n_layers
        self.input_shape = g_n_layers[0]
        self.latent_dim = g_n_layers[-1]
        self.learning_rate = learning_rate
        self.params = {
            'g_n_layers': g_n_layers,
            'd_n_layers': d_n_layers,
            'learning_rate': learning_rate
        }
        if build_model:
            self.build_gan()

    def build_gan(self):
        optimizer = Adam(self.learning_rate, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss=['binary_crossentropy'],
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # Build the encoder
        self.encoder = self.build_encoder()

        # The part of the bigan that trains the discriminator and encoder
        self.discriminator.trainable = False

        # Generate image from sampled noise
        z = Input(shape=(self.latent_dim, ))
        img_ = self.generator(z)

        # Encode image
        img = Input(shape=(self.input_shape, ))
        z_ = self.encoder(img)

        # Latent -> img is fake, and img -> latent is valid
        fake = self.discriminator([z, img_])
        valid = self.discriminator([z_, img])

        # Set up and compile the combined model
        # Trains generator to fool the discriminator
        self.bigan_generator = Model([z, img], [fake, valid])
        self.bigan_generator.compile(loss=['binary_crossentropy', 'binary_crossentropy'],
            optimizer=optimizer)


    def build_encoder(self):
        '''Encoder model encodes input to latent dim: E(x) = z.'''
        model = Sequential()

        for i, n_layer in enumerate(self.g_n_layers[1:]):
            if i == 0:
                model.add(Dense(n_layer, input_dim=self.input_shape))
            else:
                model.add(Dense(n_layer))
            model.add(LeakyReLU(alpha=0.2))
            model.add(BatchNormalization(momentum=0.8))
        
        model.summary()

        img = Input(shape=(self.input_shape, ))
        z = model(img)

        return Model(img, z)

    def build_generator(self):
        model = Sequential()
        for i, n_layer in enumerate(self.g_n_layers[::-1][1:]):
            if i == 0:
                model.add(Dense(n_layer, input_dim=self.latent_dim))
                model.add(LeakyReLU(alpha=0.2))
                model.add(BatchNormalization(momentum=0.8))
            elif i == len(self.g_n_layers) - 2: # last layer
                model.add(Dense(n_layer, activation='tanh'))
            else:
                model.add(Dense(n_layer)) 
                model.add(LeakyReLU(alpha=0.2))
                model.add(BatchNormalization(momentum=0.8))

        model.summary()

        z = Input(shape=(self.latent_dim,))
        gen_img = model(z)

        return Model(z, gen_img)

    def build_discriminator(self):

        z = Input(shape=(self.latent_dim, ))
        img = Input(shape=(self.input_shape, ))
        d_in = concatenate([z, img])

        for i, n_layer in enumerate(self.d_n_layers):
            if i == 0:
                model = Dense(n_layer)(d_in)
                model = LeakyReLU(alpha=0.2)(model)
                model = Dropout(0.5)(model)

            else:
                model = Dense(n_layer)(model)        
                model = LeakyReLU(alpha=0.2)(model)
                model = Dropout(0.5)(model)
        
        validity = Dense(1, activation="sigmoid")(model)

        return Model([z, img], validity)

    def partial_fit(self, x_batch):
        '''Train G, E, D using a batch of data.'''
        # Adversarial ground truths
        # batch_size = x_batch.shape[0]
        batch_size = len(x_batch)
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))
        # ---------------------
        #  Train Discriminator
        # ---------------------

        # Sample noise and generate img
        z = np.random.normal(size=(batch_size, self.latent_dim))
        x_batch_gen = self.generator.predict(z)

        # Select a random batch of images and encode
        z_ = self.encoder.predict(x_batch)

        # Train the discriminator (x_batch -> z_ is valid, z -> x_batch_gen is fake)
        d_loss_real = self.discriminator.train_on_batch([z_, x_batch], valid)
        d_loss_fake = self.discriminator.train_on_batch([z, x_batch_gen], fake)
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
        # ---------------------
        #  Train Generator
        # ---------------------

        # Train the generator (z -> x_batch is valid and x_batch -> z is is invalid)
        g_loss = self.bigan_generator.train_on_batch([z, x_batch], [valid, fake])
        
        accuracy = d_loss[1]
        # scalers for loss and accuracy from the discriminator
        return d_loss[0], accuracy, g_loss[0]

    def transform(self, X):
        '''Run encoder to get the latent embedding: z = E(x).'''
        return self.encoder.predict(X)

    def generate(self, z = None):
        if z is None:
            z = np.random.normal(size=[1, self.latent_dim])
        return self.generator.predict(z)

    def save(self, path):
        '''Save trained models to files'''
        self.generator.save(os.path.join(path, "generator.h5"))
        self.discriminator.save(os.path.join(path, "discriminator.h5"))
        self.encoder.save(os.path.join(path, "encoder.h5"))
        self.bigan_generator.save(os.path.join(path, "bigan_generator.h5"))
        json.dump( self.params, open(os.path.join(path, 'params.json'), 'w') ) 

    @classmethod
    def load(cls, path):
        params = json.load(open(os.path.join(path, 'params.json'), 'r'))
        params['build_model'] = False
        gan = cls(**params)
        gan.generator = load_model(os.path.join(path, "generator.h5"))
        gan.discriminator = load_model(os.path.join(path, "discriminator.h5"))
        gan.encoder = load_model(os.path.join(path, "encoder.h5"))
        gan.bigan_generator = load_model(os.path.join(path, "bigan_generator.h5"))
        return gan


In [8]:
# train-test split
# As per README:
# All files of iteration 0-4 move to testing-spectrograms
# All files of iteration 5-49 move to training-spectrograms

def separate(base):
    for filename in os.listdir(base + "spectrograms/" ):
        first_split = filename.rsplit("_", 1)[1]
        second_split = first_split.rsplit(".", 1)[0]
        if int(second_split) <= 4:
            copyfile(base + "spectrograms/" + filename, base + "testing-spectrograms" + "/" + filename)
        else:
            copyfile(base + "spectrograms/" + filename, base + "training-spectrograms" + "/" + filename)


# separate(base_path)

In [16]:
class SpeechData(Dataset):
  def __init__(self, data_path):
    self.path = data_path
    self.images = []
    self.labels = []
    for filename in os.listdir(data_path):
      label = int(filename[0])
      image = [plt.imread(data_path+"/"+filename)[:,:,0]]
    #   self.images.append(tf.convert_to_tensor(image))
    #   self.labels.append(tf.convert_to_tensor(label))
      self.images.append(image)
      self.labels.append(label)
      # self.images.append(np.array(image).flatten())
      # self.labels.append(np.array(label).flatten())
      self.len = len(self.images)
      # print(filename)

  def next_batch(self, step, batch_size):
    start = step * batch_size
    end = min(self.len, (step + 1) * batch_size)
    return np.array(self.images[start:end]), np.array(self.labels[start:end])
    # return self.images[start:end], self.labels[start:end]
    # return tf.convert_to_tensor(self.images[start:end]), tf.convert_to_tensor(self.labels[start:end])

  def __len__(self):
    return self.len

In [None]:
batch_size = 100
learning_rate=0.001
image_dim = 64 # each image is 64 * 64 dimension
n_samples = 2700 # total number of samples is 3000, 5 * 10 * 6 are used as test set

# model
bigan = BiGAN(g_n_layers=[image_dim * image_dim, 500, 500, 20],
            #   d_n_layers=[1000, 1000, 1000],
            d_n_layers=[1000, 1000, 100],
              learning_rate=learning_rate)

# data
train_dataset = SpeechData(base_path + "training-spectrograms/")
# test_dataset = SpeechData(base_path + "testing-spectrograms/")

In [None]:
# import tensorflow_datasets as tfds
# # Construct a tf.data.Dataset
# mnist = tfds.load(name="mnist", split=tfds.Split.TRAIN)

In [None]:
# Training loop
d_losses = []
accs = []
g_losses = []
training_epochs = 10
# display_step = 10

for epoch in range(training_epochs):
    total_batch = int(n_samples / batch_size)
    # Loop over all batches
    for i in range(total_batch):
        batch_xs, _ = train_dataset.next_batch(i, batch_size)
        # print(batch_xs.shape)
        # Fit training using batch data
        d_loss, acc, g_loss = bigan.partial_fit(batch_xs)
        # Display logs per epoch step
        # if epoch % display_step == 0:
    print ("Epoch %d: D loss = %.4f, G loss = %.4f, D accuracy = %.4f "% (epoch+1, d_loss, g_loss, acc))
    
    d_losses.append(d_loss)
    accs.append(acc)
    g_losses.append(g_loss)
    
    canvas = sample_latent_space(bigan, bigan.latent_dim, sample_type='uniform')
    fig, ax = plt.subplots(figsize=(10,10))
    ax.imshow(canvas, origin="upper", cmap="gray")
    ax.set_axis_off()
    plt.show()

In [None]:
# !mkdir -p trained_models/bigan_100
!mkdir -p trained_models/bigan_1000

In [None]:
# bigan.save('trained_models/bigan_100/')
bigan.save('trained_models/bigan_1000/')

In [None]:
fig, ax = plt.subplots()
ax.plot(d_losses, label='Discriminator loss')
ax.plot(g_losses, label='Generator loss')
ax.legend(loc='best')
ax.set_xlabel('Epochs')
ax.set_ylabel('Loss')

In [None]:
x_gen = bigan.generate()
x_gen.shape

In [None]:
# display_mnist_image(x_gen)