In [None]:
!pip install tensorflow_io
!pip install pyyaml h5py  # Required to save models in HDF5 format

In [None]:
import tensorflow as tf
import random
import numpy as np
import matplotlib.pyplot as plt
import os
import pandas
import tensorflow_datasets as tfds
import time
import librosa.display as lidp
import tensorflow_io as tfio
from tensorflow import keras
from tensorflow.keras import backend
from tensorflow.keras import layers
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import Conv1DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.constraints import Constraint
from numpy import expand_dims
from numpy import mean
from numpy import ones
from numpy.random import randn
from numpy.random import randint
from IPython import display

In [None]:
import json

In [None]:
data, info = tfds.load('nsynth', try_gcs=True, split='train', with_info=True)
assert isinstance(data, tf.data.Dataset)
#get data

In [None]:
# image size
sound_size = 16384
channels = 1
sound_shape = (sound_size, channels)    # (16384,3)

# z(latent variable) size
z_dim = 100
z_shape = (z_dim,)

# gradient penalty coefficient "λ"
penaltyLambda = 10    

# critic(discriminator) iterations per generator iteration
trainRatio = 5

batch_size = 64


In [None]:
dataset = data.shuffle(batch_size*16).batch(batch_size)
total = len(dataset)
dataset = data.shuffle(batch_size*16).batch(batch_size).repeat()
db_iter = iter(dataset)

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
#login your dirve so that you can save model data for later training

In [None]:
def conv_block(
    x,
    filters,
    activation,
    kernel_size=25,
    strides=4,
    padding="same",
    use_bias=True,
    use_bn=False,
    use_dropout=False,
    drop_value=0.5,
):
    x = layers.Conv1D(
        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
    )(x)
    if use_bn:
        x = layers.BatchNormalization()(x)
    x = activation(x)
    if use_dropout:
        x = layers.Dropout(drop_value)(x)
    return x

In [None]:
def get_discriminator_model(d,c):
    input = layers.Input(shape=sound_shape)
    x = conv_block(
        input,
        d,
        kernel_size=25,
        strides=4,
        use_bn=False,
        use_bias=True,
        activation=layers.LeakyReLU(0.2),
        use_dropout=False,
        drop_value=0.3,
    )
    x = conv_block(
        x,
        2*d,
        kernel_size=25,
        strides=4,
        use_bn=False,
        activation=layers.LeakyReLU(0.2),
        use_bias=True,
        use_dropout=False,
        drop_value=0.3,
    )
    x = conv_block(
        x,
        4*d,
        kernel_size=25,
        strides=4,
        use_bn=False,
        activation=layers.LeakyReLU(0.2),
        use_bias=True,
        use_dropout=False,
        drop_value=0.3,
    )
    x = conv_block(
        x,
        8*d,
        kernel_size=25,
        strides=4,
        use_bn=False,
        activation=layers.LeakyReLU(0.2),
        use_bias=True,
        use_dropout=False,
        drop_value=0.3,
    )
    x = conv_block(
        x,
        16*d,
        kernel_size=25,
        strides=4,
        use_bn=False,
        activation=layers.LeakyReLU(0.2),
        use_bias=True,
        use_dropout=False,
        drop_value=0.3,
    )
    x = layers.Flatten()(x)
    x = layers.Dense(1)(x)

    d_model = keras.models.Model(input, x, name="discriminator")
    return d_model

In [None]:
d_model = get_discriminator_model(64,1)
d_model.summary()

In [None]:
def upsample_block(
    x,
    filters,
    activation,
    kernel_size=25,
    strides=4,
    up_size=(2, 2),
    padding="same",
    use_bn=False,
    use_bias=True,
    use_dropout=False,
    drop_value=0.3,
):
    x = layers.Conv1DTranspose(
        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
    )(x)

    if use_bn:
        x = layers.BatchNormalization()(x)
    if activation:
        x = activation(x)
    if use_dropout:
        x = layers.Dropout(drop_value)(x)
    return x

In [None]:
def get_generator_model(d,c):
    noise = layers.Input(shape=(z_dim,))
    x = layers.Dense(d * 256)(noise)
    # x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.2)(x)

    x = layers.Reshape((16, 16*d))(x)
    x = upsample_block(
        x,
        8*d,
        layers.LeakyReLU(0.2),
        strides=4,
        use_bias=True,
        use_bn=False,
        padding="same",
        use_dropout=False,
    )
    x = upsample_block(
        x,
        4*d,
        layers.LeakyReLU(0.2),
        strides=4,
        use_bias=True,
        use_bn=False,
        padding="same",
        use_dropout=False,
    )
    x = upsample_block(
        x,
        2*d,
        layers.LeakyReLU(0.2),
        strides=4,
        use_bias=True,
        use_bn=False,
        padding="same",
        use_dropout=False,
    )
    x = upsample_block(
        x,
        d,
        layers.LeakyReLU(0.2),
        strides=4,
        use_bias=True,
        use_bn=False,
        padding="same",
        use_dropout=False,
    )
    x = upsample_block(
        x, c, layers.Activation("tanh"), strides=4, use_bias=True, use_bn=False
    )

    g_model = keras.models.Model(noise, x, name="generator")
    return g_model

In [None]:
g_model = get_generator_model(64,1)
g_model.summary()

In [None]:
def generate_and_save_images(model, epoch):
  #checkpoint.save(file_prefix = checkpoint_prefix)

  test_input = tf.random.normal(shape=(16, 100))
  predictions = model(test_input, training=False)

  fig, axes = plt.subplots(16,1,figsize=(15, 30))
  for i in range(predictions.shape[0]):
    x = np.linspace(0,16384,16384)
    #print(predictions[i,:,0])
    k = np.reshape(predictions[i,:,0],(16384))
    display.display(display.Audio(k, rate=16000))
    axes[i].plot(x,k)

  plt.savefig('/content/gdrive/My Drive/WGANGP/sound_at_epoch_{:04d}.png'.format(epoch))
  plt.show()
#check output data

In [None]:
def plot_history(d_hist, g_hist, epoch):
  # plot history
  plt.plot(d_hist, label='crit')
  plt.plot(g_hist, label='gen')
  plt.legend()
  plt.savefig('/content/gdrive/My Drive/WGANGP/plot_line_plot_loss_{:04d}.png'.format(epoch))
  plt.close()
  with open("/content/gdrive/My Drive/WGANGP/derror.txt", "w") as fp:  
    json.dump(d_hist, fp)
  with open("/content/gdrive/My Drive/WGANGP/gerror.txt", "w") as fp:  
    json.dump(g_hist, fp)

In [None]:
d_hist, g_hist = list(), list()

In [None]:
with open("/content/gdrive/My Drive/WGANGP/derror.txt", "r") as fp:  
  d_hist = json.load(fp)
with open("/content/gdrive/My Drive/WGANGP/gerror.txt", "r") as fp:  
  g_hist = json.load(fp)

In [None]:
class WGAN(keras.Model):
    def __init__(
        self,
        discriminator,
        generator,
        latent_dim,
        discriminator_extra_steps=3,
        gp_weight=10.0,
    ):
        super(WGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.d_steps = discriminator_extra_steps
        self.gp_weight = gp_weight

    def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):
        super(WGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.d_loss_fn = d_loss_fn
        self.g_loss_fn = g_loss_fn

    def gradient_penalty(self, batch_size, real, fake):
        alpha = tf.random.normal([batch_size, 1, 1], 0.0, 1.0)
        diff = fake - real
        interpolated = real + alpha * diff
        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = self.discriminator(interpolated, training=True)
        grads = gp_tape.gradient(pred, [interpolated])[0]
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train(self, db_iter, b_size=64, n_epoch=2, total=4519):

        # Get the batch size
        batch_size = b_size
        bat_per_epo = total
        n_steps = bat_per_epo * n_epoch

        # 1. Train the generator and get the generator loss
        # 2. Train the discriminator and get the discriminator loss
        # 3. Calculate the gradient penalty
        # 4. Multiply this gradient penalty with a constant weight factor
        # 5. Add gradient penalty to the discriminator loss
        # 6. Return generator and discriminator losses as a loss dictionary.

        for j in range(n_steps): 
            for i in range(self.d_steps):
                batch = next(db_iter)
                sound = batch["audio"]
                sound = sound[:,0:16384]
                real_sound = tf.reshape(sound,[sound.shape[0],16384,1])

                # Get the latent vector
                random_latent_vectors = tf.random.normal(
                  shape=(sound.shape[0], self.latent_dim)
                )
                with tf.GradientTape() as tape:
                  fake_sound = self.generator(random_latent_vectors, training=True)
                  fake_logits = self.discriminator(fake_sound, training=True)
                  real_logits = self.discriminator(real_sound, training=True)

                  # Calculate discriminator loss using fake and real logits
                  d_cost = self.d_loss_fn(real=real_logits, fake=fake_logits)
                  # Calculate the gradient penalty
                  gp = self.gradient_penalty(sound.shape[0], real_sound, fake_sound)
                  # Add the gradient penalty to the original discriminator loss
                  d_loss = d_cost + gp * self.gp_weight

                # Get the gradients w.r.t the discriminator loss
                d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
                # Update the weights of the discriminator using the discriminator optimizer
                self.d_optimizer.apply_gradients(
                  zip(d_gradient, self.discriminator.trainable_variables)
                )

            # Train the generator now.
            # Get the latent vector
            random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
            with tf.GradientTape() as tape:
                generated_sound = self.generator(random_latent_vectors, training=True)
                gen_sound_logits = self.discriminator(generated_sound, training=True)
                g_loss = self.g_loss_fn(gen_sound_logits)

            # Get the gradients w.r.t the generator loss
            gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
            # Update the weights of the generator using the generator optimizer
            self.g_optimizer.apply_gradients(
                zip(gen_gradient, self.generator.trainable_variables)
            )
            d_hist.append(float(d_loss.numpy()))
            g_hist.append(float(g_loss.numpy()))
            print("d_loss: %f , g_loss: %f" %(d_loss, g_loss))
            # plot_history(d_hist,g_hist,j+1) 
            if (j+1) % (bat_per_epo//3) == 0:
              generate_and_save_images(self.generator,len(d_hist))
              plot_history(d_hist,g_hist,len(d_hist)) 

In [None]:
# Optimizer for both the networks
# learning_rate=0.0002, beta_1=0.5 are recommened
generator_optimizer = keras.optimizers.Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)

In [None]:
discriminator_optimizer = keras.optimizers.Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)

In [None]:
# Define the loss functions to be used for discrimiator
# This should be (fake_loss - real_loss)
# We will add the gradient penalty later to this loss function
def discriminator_loss(real, fake):
    real_loss = tf.reduce_mean(real)
    fake_loss = tf.reduce_mean(fake)
    return fake_loss - real_loss


In [None]:
# Define the loss functions to be used for generator
def generator_loss(fake):
    return -tf.reduce_mean(fake)

In [None]:
# Epochs to train
epochs = 1

# Get the wgan model
wgan = WGAN(
    discriminator=d_model,
    generator=g_model,
    latent_dim=z_dim,
    discriminator_extra_steps=5,
)

# Compile the wgan model
wgan.compile(
    d_optimizer=discriminator_optimizer,
    g_optimizer=generator_optimizer,
    g_loss_fn=generator_loss,
    d_loss_fn=discriminator_loss,
)

# wgan.train(db_iter)

In [None]:
checkpoint_dir = '/content/gdrive/My Drive/WGANGP'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(wgan=wgan)

In [None]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
checkpoint.save(file_prefix = checkpoint_prefix)

In [None]:
print(wgan.d_optimizer)
print(wgan.g_optimizer)
print(wgan.discriminator)
print(wgan.generator)
print(wgan.latent_dim)
print(wgan.d_steps)
print(wgan.gp_weight)
print(wgan.g_loss_fn)
print(wgan.d_loss_fn)
print(wgan.train)

In [None]:
wgan.generator.save("/content/gdrive/My Drive/WGANGP/cmodel")

In [None]:
plot_history(d_hist,g_hist,-1)

In [None]:
generate_and_save_images(wgan.generator,-2)

In [None]:
wgan.train(db_iter,n_epoch=5)