The following code was taken by the repository of Harry Stuart. https://github.com/harrystuart/tfworldhackathon.

This is an implementation of WaveGAN architecture to generate audio files of 4 seconds. In order to  generate audio for 10 genres, this scrit is supposed to be run 10 times, replacing the value of variable "INSTRUMENT" for respective  genre.

In [None]:
# Import modules
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Lambda, Dense, LSTM, Activation, Input, Bidirectional, Dropout
from tensorflow.keras.layers import Reshape, Conv2DTranspose, TimeDistributed, Conv1D, LeakyReLU, Layer, ReLU
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K

import tensorflow as tf
from tensorflow.keras.optimizers import Adam, RMSprop
import numpy as np
import librosa
import random
import os
import sys
import time
import soundfile as sf



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Defining the Generator of GAN
def Generator(d, num_samples, c=16):

    input_layer = Input(shape=(100,))

    # Upsampling

    # output shape = (None, 16, 16d)
    dense0 = Dense(16*c*d)(input_layer)
    reshape0 = Reshape((c, c*d))(dense0)
    relu0 = ReLU()(reshape0)
 
    # output shape = (None, 64, 8d)
    c //= 2
    expanded0 = Lambda(lambda x: K.expand_dims(x, axis=1))(relu0)
    conv0 = Conv2DTranspose(c*d, (1, 25), strides=(1, 4), padding='same')(expanded0)
    slice0 = Lambda(lambda x: x[:, 0])(conv0)
    relu1 = ReLU()(slice0)

    # output shape = (None, 256, 4d)
    c //= 2
    expanded1 = Lambda(lambda x: K.expand_dims(x, axis=1))(relu1)
    conv1 = Conv2DTranspose(c*d, (1, 25), strides=(1, 4), padding='same')(expanded1)
    slice1 = Lambda(lambda x: x[:, 0])(conv1)
    relu2 = ReLU()(slice1)

    # output shape = (None, 1024, 2d)
    c //= 2
    expanded2 = Lambda(lambda x: K.expand_dims(x, axis=1))(relu2)
    conv2 = Conv2DTranspose(c*d, (1, 25), strides=(1, 4), padding='same')(expanded2)
    slice2 = Lambda(lambda x: x[:, 0])(conv2)
    relu3 = ReLU()(slice2)

    # output shape = (None, 4096, d)
    c //= 2
    expanded3 = Lambda(lambda x: K.expand_dims(x, axis=1))(relu3)
    conv3 = Conv2DTranspose(c*d, (1, 25), strides=(1, 4), padding='same')(expanded3)
    slice3 = Lambda(lambda x: x[:, 0])(conv3)
    relu4 = ReLU()(slice3)

    # output shape = (None, 16384, d)
    expanded4 = Lambda(lambda x: K.expand_dims(x, axis=1))(relu4)
    conv4 = Conv2DTranspose(c*d, (1, 25), strides=(1, 4), padding='same')(expanded4)
    slice4 = Lambda(lambda x: x[:, 0])(conv4)
    relu5 = ReLU()(slice4)
 

    # output shape = (None, 65536, 1)
    expanded5 = Lambda(lambda x: K.expand_dims(x, axis=1))(relu5)
    conv5 = Conv2DTranspose(1, (1, 25), strides=(1, 4), padding='same')(expanded5)
    slice5 = Lambda(lambda x: x[:, 0])(conv5)

    #### num_samples == 65536

    # Squeeze values between (-1, 1)
    tanh0 = Activation('tanh')(slice5)

    model = Model(inputs=input_layer, outputs=tanh0)

    return model

In [None]:
# Implementation of PHase Shuffle
def _apply_phaseshuffle(x, rad=2, pad_type='reflect'):
    b, x_len, nch = x.get_shape().as_list()

    phase = tf.random.uniform([], minval=-rad, maxval=rad + 1, dtype=tf.int32)
    pad_l = tf.maximum(phase, 0)
    pad_r = tf.maximum(-phase, 0)
    phase_start = pad_r
    x = tf.pad(x, [[0, 0], [pad_l, pad_r], [0, 0]], mode=pad_type)

    x = x[:, phase_start:phase_start+x_len]
    x.set_shape([b, x_len, nch])

    return x

In [None]:
# Discriminator of the GAN
def Critic(d, num_samples, c=1):

    input = Input(shape=(num_samples, 1))

    # Downsampling

    # output shape = (None, 4096, d)
    conv0 = Conv1D(c*d, 25, strides=4, padding='same')(input)
    LReLU0 = LeakyReLU(alpha=0.2)(conv0)
    phaseshuffle0 = Lambda(lambda x: _apply_phaseshuffle(x))(LReLU0)

    # output shape = (None, 1024, 2d)
    c *= 2
    conv1 = Conv1D(c*d, 25, strides=4, padding='same')(phaseshuffle0)
    LReLU1 = LeakyReLU(alpha=0.2)(conv1)
    phaseshuffle1 = Lambda(lambda x: _apply_phaseshuffle(x))(LReLU1)

    # output shape = (None, 256, 4d)
    c *= 2
    conv2 = Conv1D(c*d, 25, strides=4, padding='same')(phaseshuffle1)
    LReLU2 = LeakyReLU(alpha=0.2)(conv2)
    phaseshuffle2 = Lambda(lambda x: _apply_phaseshuffle(x))(LReLU2)

    # output shape = (None, 64, 8d)
    c *= 2
    conv3 = Conv1D(c*d, 25, strides=4, padding='same')(phaseshuffle2)
    LReLU3 = LeakyReLU(alpha=0.2)(conv3)
    phaseshuffle3 = Lambda(lambda x: _apply_phaseshuffle(x))(LReLU3)

    # output shape = (None, 16, 16d)
    c *= 2
    conv4 = Conv1D(c*d, 25, strides=4, padding='same')(phaseshuffle3)
    LReLU4 = LeakyReLU(alpha=0.2)(conv4)

    #### num_samples == 65536

    # output shape = (None, 256d)
    reshape0 = Reshape((64*c*d,))(LReLU4)#

    # Output a critic score
    dense1 = Dense(1)(reshape0)

    model = Model(inputs=input_layer, outputs=dense1)

    return model

In [None]:
# Defining hyperparameters of audio file

MODEL_DIMS = 64   #Set the d values in Critic and generator
NUM_SAMPLES = 65536 
Fs = 16000  
NOISE_LEN = 100

# Defining Hyperparameters for Loss Function
GRADIENT_PENALTY_WEIGHT = 10.0

# Defining hyperparameters for training
D_UPDATES_PER_G_UPDATE = 5 # Decides how many time a GAN will update the generator for a particular batch is used for training a 
EPOCHS = 50
EPOCHS_PER_SAMPLE = 2  # To determing when to generate the audio file and save the model. It is generated at every even number of epochs
BATCH_SIZE = 64

DATA_DIR = "/content/drive/MyDrive/archive/Data/genres_original/blues"
INSTRUMENT = "rock"   # Change this according to the genre you want to develop


# Creating directories

paths = ["/content/drive/MyDrive/Logs/train", 
         f"/content/drive/MyDrive/Model/{INSTRUMENT}/js",
         f"/content/drive/MyDrive/Outputs/{INSTRUMENT}",]

for path in paths:
    if not os.path.exists(os.path.join(os.getcwd(), path)):
        os.makedirs(path)


Creating necessary directories


In [None]:
# Define class that contains GAN infrastructure

class GAN:
    def __init__(self, model_dims=MODEL_DIMS, num_samples=NUM_SAMPLES, 
                 gradient_penalty_weight=GRADIENT_PENALTY_WEIGHT, instrument=INSTRUMENT,
                 noise_len=NOISE_LEN, batch_size=BATCH_SIZE, sr=Fs):
        self.model_dims = model_dims
        self.num_samples = num_samples
        self.noise_dims = (noise_len,)
        self.batch_size = batch_size
        
        # self.G = GANModels.Generator(self.model_dims, num_samples)
        self.G = Generator(self.model_dims, num_samples)
        print(self.G.summary())

        # self.D = GANModels.Critic(self.model_dims, num_samples)
        self.D = Critic(self.model_dims, num_samples)
        print(self.D.summary())
        
        self.G_optimizer = Adam(learning_rate=1e-4, beta_1=0.5, beta_2=0.9)
        self.D_optimizer = Adam(learning_rate=1e-4, beta_1=0.5, beta_2=0.9)
        
        self.gradient_penalty_weight = gradient_penalty_weight
        
        self.sr = sr

        self.instrument = INSTRUMENT

    # Loss function for discriminator
    def _d_loss_fn(self, rlog, flog):
        fl = tf.reduce_mean(flog)
        rl = - tf.reduce_mean(rlog)
        return rl, fl
    
    # Loss function for generator
    def _g_loss_fn(self, flog):
        fl = - tf.reduce_meanflog)
        return fl

    # Calculates gradient penalty
    def _gradient_penalty(self, real, fake):
        # performs intrapolation
        def _interpolate(a, b):
            shape = [tf.shape(a)[0]] + [1] * (a.shape.ndims - 1)
            alpha = tf.random.uniform(shape=shape, minval=0., maxval=1.)
            inter = a + alpha * (b - a)
            inter.set_shape(a.shape)
            return inter
            
        x = _interpolate(real, fake)
        with tf.GradientTape() as t:
            t.watch(x)
            pred = self.D(x, training=True)
            
        grad = t.gradient(pred, x)
        norm = tf.norm(tf.reshape(grad, [tf.shape(grad)[0], -1]), axis=1)
        gp = tf.reduce_mean((norm - 1.)**2)

        return gp
        
    # Trains generator by keeping critic constant and returns the loss after traning
    @tf.function
    def train_G(self):
        with tf.GradientTape() as t:
            z = tf.random.normal(shape=(self.batch_size,) + self.noise_dims)
            x_fake = self.G(z, training=True)
            x_fake_d_logit = self.D(x_fake, training=True)
            G_loss = self._g_loss_fn(x_fake_d_logit)

        G_grad = t.gradient(G_loss, self.G.trainable_variables)
        self.G_optimizer.apply_gradients(zip(G_grad, self.G.trainable_variables))

        return {'g_loss': G_loss}

    # Trains critic by keeping generator constant
    @tf.function
    def train_D(self, x_real):
        with tf.GradientTape() as t:
            z = tf.random.normal(shape=(x_real.shape[0],) + self.noise_dims)
            x_fake = self.G(z, training=True)

            x_real_d_logit = self.D(x_real, training=True)
            x_fake_d_logit = self.D(x_fake, training=True)

            x_real_d_loss, x_fake_d_loss = self._d_loss_fn(x_real_d_logit, x_fake_d_logit)
            gp = self._gradient_penalty(x_real, x_fake)

            D_loss = (x_real_d_loss + x_fake_d_loss) + gp * self.gradient_penalty_weight

        D_grad = t.gradient(D_loss, self.D.trainable_variables)
        self.D_optimizer.apply_gradients(zip(D_grad, self.D.trainable_variables))

        return {'d_loss': x_real_d_loss + x_fake_d_loss, 'gp': gp}
        
    # Creates music samples and saves current generator model
    def sample(self, epoch, num_samples=50):
        self.G.save(f"models/{epoch}.h5")
        z = tf.random.normal(shape=(num_samples,) + self.noise_dims)
        result = self.G(z, training=False)
        for i in range(num_samples):
            audio = result[i, :, :]
            audio = np.reshape(audio, (self.num_samples,))
            sf.write(f"/content/drive/MyDrive/Outputs/{self.instrument}/{epoch}-{i}.wav",audio,samplerate=self.sr)



In [None]:
# Instantiate model
gan = GAN()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 100)]             0         
                                                                 
 dense (Dense)               (None, 16384)             1654784   
                                                                 
 reshape (Reshape)           (None, 16, 1024)          0         
                                                                 
 re_lu (ReLU)                (None, 16, 1024)          0         
                                                                 
 lambda (Lambda)             (None, 1, 16, 1024)       0         
                                                                 
 conv2d_transpose (Conv2DTra  (None, 1, 64, 512)       13107712  
 nspose)                                                         
                                                             

In [None]:
# Create training data

X_train = []
for file in os.listdir(DATA_DIR): ### Modify for your data directory
    with open(DATA_DIR + fr"/{file}", "rb") as f:
        samples, _ = librosa.load(f, sr = Fs)
        # Pad short audio files to NUM_SAMPLES duration
        if len(samples) < NUM_SAMPLES:
            audio = np.array([np.array([sample]) for sample in samples])
            padding = np.zeros(shape=(NUM_SAMPLES - len(samples), 1), dtype='float32')
            X_train.append(np.append(audio, padding, axis=0))
        # Create slices of length NUM_SAMPLES from long audio
        else:
            p = len(samples) // (NUM_SAMPLES)
            for i in range(p - 1):
                sample = np.expand_dims(samples[i*NUM_SAMPLES:(i+1)*NUM_SAMPLES], axis=1)
                X_train.append(sample)

print(f"X_train shape = {(len(X_train),) + X_train[0].shape}")

X_train shape = (600, 65536, 1)


In [None]:
# Save some random training data slices and create baseline generated data for comparison
for i in range(50):  
    sf.write(f"/content/drive/MyDrive/Outputs/{INSTRUMENT}/real-{i}.wav", X_train[random.randint(0, len(X_train) - 1)],samplerate=Fs)


In [None]:
# Save some random data slices as fake for comparison
gan.sample("fake")
train_summary_writer = tf.summary.create_file_writer("logs/train")



In [None]:
# Train GAN
with train_summary_writer.as_default():
    steps_per_epoch = len(X_train) // BATCH_SIZE 

    for e in range(EPOCHS):
        for i in range(steps_per_epoch):
            D_loss_sum = 0
        
            # Update dcritic a set number of times for each update of the generator
            for n in range(D_UPDATES_PER_G_UPDATE):
                gan.D.reset_states()
                D_loss_dict = gan.train_D(np.array(random.sample(X_train, BATCH_SIZE)))
                D_loss_sum += D_loss_dict['d_loss']
            
            # Calculate average loss of critic for current step
            D_loss = D_loss_sum / D_UPDATES_PER_G_UPDATE
            
            G_loss_dict = gan.train_G()
            G_loss = G_loss_dict['g_loss']
        
            # Write logs
            tf.summary.scalar('d_loss', D_loss, step=(e*steps_per_epoch)+i)
            tf.summary.scalar('g_loss', G_loss, step=(e*steps_per_epoch)+i)
        
            print(f"step {(e*steps_per_epoch)+i}: d_loss = {D_loss} g_loss = {G_loss}")
        
        # Periodically sample generator
        if e % EPOCHS_PER_SAMPLE == 0:
            gan.sample(e)