# Introduction

This notebook first creates some custom layers that are used by the discriminator and the generator. Then functions to create the discriminator and the generator dependend on the level of the models and whether they should have the fade in paths or not. 
Both models are then passed to a WGAN_GP class where the training steps are defined. The training data is loaded in batches, so a Data Generator is defined as well as Callbacks to allow for monitoring during training.
After instantiating the needed parameters the last section of code defines the training loop for training the whole algorithm with all levels. After every epoch of the full network the generator and the discriminator are saved to a monitoring folder. The training can therefore be interrupted and continued by loading the saved models.

# Imports and Gloabl Variables

* The library os is used get and crate the paths for the data. 
* The library time is used to measure the execution time for code snippets. That was used to debug and improve the code. For this notebook it is now only used to measure the execution time one one batch.
* The library numpy is used to create and load the training data and to execute mathmatical operations. \
* The library tensorflow is used create, save and load models and layers and to create Callbacks and the data generator. \
* The library gc is used to execute python gargabe collection.

In [None]:
import os
import time
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import load_model
import gc

In [None]:
FINISH_SAMPLE_RATE = 22050  # sample rate of the final level
DURATION = 262144/FINISH_SAMPLE_RATE  # in seconds; 262144 is a power of 2 and the number of samples of the final level
FILENAME_DIR = "C:/Masterarbeit"  # where the file with the names of the audio data is
TRAINING_DATA_BASE_DIR = "C:/Masterarbeit/music_npy" # where the .npy files are with the numeric wave data
MONITORING_DIR = "./Monitoring2" # to save the monitoring data while training

# Build Model

## Custom Layers

### Weighted Sum

The code for the weighted sum layer was taken from following tutorial and was verfied by double checking the code of the original growing gan paper: \
Tutorial: https://machinelearningmastery.com/how-to-implement-progressive-growing-gan-models-in-keras/ \
Original paper: https://github.com/tkarras/progressive_growing_of_gans

In [None]:
class WeightedSum(keras.layers.Add):
    '''
    inherits from keras Add layer
    takes to inputs multiplies them by a factor and adds them up
    outputs the weighted sum 
    '''
    # init with default value
    def __init__(self, alpha=0.0, **kwargs):
        super(WeightedSum, self).__init__(**kwargs)   # initializes the parent class constructor
        self.alpha = keras.backend.variable(alpha, name='ws_alpha') # initializes the class variales alpha

    # output a weighted sum of inputs
    def _merge_function(self, inputs):
        # only supports a weighted sum of two inputs
        assert (len(inputs) == 2) # make sure it is only two inputs
        # ((1-a) * input1) + (a * input2)
        output = ((1.0 - self.alpha) * inputs[0]) + (self.alpha * inputs[1]) 
        return output

### Mini Batch StDev
The code for the weighted sum layer was taken from following tutorial and was verfied by double checking the code of the original growing gan paper: \
Tutorial: https://machinelearningmastery.com/how-to-train-a-progressive-growing-gan-in-keras-for-synthesizing-faces/ \
Original paper: https://github.com/tkarras/progressive_growing_of_gans

In [None]:
# mini-batch standard deviation layer
class MinibatchStdev(keras.layers.Layer):
    '''
    inheris from keras layer
    caculates the standardeviation for every position for every feature map across a minibatch
    and consolidates the information to a single value by taking the average
    this average value is replicated and added to the data as a new feature map
    '''
    def __init__(self, **kwargs):
        super(MinibatchStdev, self).__init__(**kwargs)  # initializes the parent class constructor

    # perform the operation
    def call(self, inputs):

        mean = keras.backend.mean(inputs, axis=0, keepdims=True)  # calculate the mean value for each position across feature maps
        squ_diffs = keras.backend.square(inputs - mean)  # calculate the squared differences between signal values and mean
        mean_sq_diff = keras.backend.mean(squ_diffs, axis=0, keepdims=True) # calculate the average of the squared differences (variance)
        mean_sq_diff += 1e-8  # add a small value to avoid a blow-up when we calculate stdev
        stdev = keras.backend.sqrt(mean_sq_diff) # square root of the variance (stdev)
        mean_pix = keras.backend.mean(stdev, keepdims=True) # calculate the mean standard deviation across each position
        # scale this up to be the size of one input feature map for each sample
        shape = keras.backend.shape(inputs)
        output = keras.backend.tile(mean_pix, (shape[0], shape[1], 1))
        combined = keras.backend.concatenate([inputs, output], axis=-1) # concatenate with the output
        return combined

    # define the output shape of the layer
    def compute_output_shape(self, input_shape):
        input_shape = list(input_shape) # create a copy of the input shape as a list
        input_shape[-1] += 1 # add one to the channel dimension 
        return tuple(input_shape) # convert list to a tuple

### Pixel Normalization
The code for the weighted sum layer was taken from following tutorial and was verfied by double checking the code of the original growing gan paper: \
Tutorial: https://machinelearningmastery.com/how-to-train-a-progressive-growing-gan-in-keras-for-synthesizing-faces/ \
Original paper: https://github.com/tkarras/progressive_growing_of_gans

In [None]:

class PixelNormalization(keras.layers.Layer):
    '''
    inherits from keras layer
    scales the feature vector for every position to unit length
    '''
    # initialize the layer
    def __init__(self, **kwargs):
        super(PixelNormalization, self).__init__(**kwargs) # initializes the parent class constructor

    # perform the operation
    def call(self, inputs):
        values = inputs**2.0 # calculate square signal values     
        mean_values = keras.backend.mean(values, axis=-1, keepdims=True) # calculate the mean signal values      
        mean_values += 1.0e-8 # ensure the mean is not zero
        l2 = keras.backend.sqrt(mean_values) # calculate the sqrt of the mean squared value (L2 norm)
        normalized = inputs / l2 # normalize values by the l2 norm
        return normalized

    # define the output shape of the layer
    def compute_output_shape(self, input_shape):
        return input_shape

### Equalized Learning Rate

The code defined a new type of layer called Conv1DEQ implementing the equalized learning rate from He (2015). The code was inspired but not copied from the orignal growing gan paper: \
Equalized learning rate defined here: https://arxiv.org/pdf/1502.01852.pdf p.3 \
Code from original paper: https://github.com/tkarras/progressive_growing_of_gans

In [None]:
# normalisiert die gewichte während der Laufzeit pro Layer mit einer Layer eigenen konstante

class Conv1DEQ(keras.layers.Conv1D):
    """
    inherits conv1d from keras layers 
    extends this by applying scaling of the weights with He's per-layer constant 
    """
    def __init__(self, **kwargs):
        super().__init__(kernel_initializer=keras.initializers.RandomNormal(stddev=1), **kwargs) # initializes the parent class constructor

    def build(self, input_shape):
        super().build(input_shape) # build the parant class
        n = np.product([self.kernel_size[0],input_shape[-1]]) # multiply the kernel length with number of feature maps
        self.c = np.sqrt(2/n) # He initialization constant

    def call(self, inputs):
        outputs = tf.nn.conv1d( ## apply tensorflow conv1d layer
            inputs,
            self.kernel/self.c, # scale kernel
            stride=self.strides,
            padding="SAME")

        if self.use_bias: # add bias if there is one
            outputs = tf.nn.bias_add(
                outputs,
                self.bias)

        if self.activation is not None: # use activation if there is one
            return self.activation(outputs) 
        return outputs

The Conv1DEQ_load class is basicall the same layer as the one above. The only difference is that this one does not take a kernel initializer in the constructur. This is needed to avoid an error when loading the model because the weights of the loaded model are already defined so no initialization is neededn.

In [None]:
class Conv1DEQ_load(keras.layers.Conv1D):
    """
    inherits conv1d from keras layers 
    extends this by applying scaling of the weights with He's per-layer constant 
    """
    def __init__(self, **kwargs):
        super().__init__(**kwargs) # initialize parent constructor without weight initialization

    def build(self, input_shape):
        super().build(input_shape)
        # The number of inputs
        n = np.product([self.kernel_size[0],input_shape[-1]])
        # He initialisation constant
        self.c = np.sqrt(2/n)

    def call(self, inputs):
        
        outputs = tf.nn.conv1d(
            inputs,
            self.kernel*self.c, # scale kernel
            stride=self.strides,
            padding="SAME")

        if self.use_bias:
            outputs = tf.nn.bias_add(
                outputs,
                self.bias)

        if self.activation is not None:
            return self.activation(outputs)
        return outputs

## Discriminator

This part of the code consists of two functions to create the discriminator model. The function create_discriminator is called later to define the discriminator for a given level and with the fade in path or without and calls the create_discriminator_block function. Level one corresponds to n_blocks = 0 because no additional blocks are needed. If the fade in parameter is set to True the split path as explained in the thesis is added to the network.

In [None]:
# number of filters used for the different levels
n_filters = {0:256,1:256,2:128,3:64,4:64,5:32,6:32,7:16,8:16}

def create_discriminator_block(x,block_n):
    '''
    creates a new convolutional block consisting of two conv layers with equalized learning rate and Leaky Relu activation function 
    and an average pooling layer at the end
    x: keras layer
    block_n: number of current block that is added
    '''
    x = Conv1DEQ(filters=n_filters[block_n],kernel_size=25, padding='same' ,name=f"block{block_n}_Conv1D_1")(x)
    x = keras.layers.LeakyReLU(alpha=0.2,name=f"block{block_n}_LeakyRelu_1")(x)
    x = Conv1DEQ(filters=n_filters[block_n-1], kernel_size=25, padding='same',name=f"block{block_n}_Conv1D_2")(x)
    x = keras.layers.LeakyReLU(alpha=0.2,name=f"block{block_n}_LeakyRelu_2")(x)
    x = keras.layers.AveragePooling1D(pool_size=2,strides=2,
                                      padding="same",name=f"block{block_n}_AveragePooling1D")(x) 
    return x

# erstellt den discriminator mit der Anzahl der Blöcke die übergeben wird
# fade_in gibt ob es mit oder ohne Übergangsphase ist
def create_discriminator(n_blocks,fade_in=False):
    '''
    creats the discriminator network as either a fade in network or a full level network with the number of conv blocks passed
    n_blocks: basically indicates the level of the discriminator with 0 being the first level and 8 being the final
    fade_in: indicate if its a fade in network or a full network (True/False)
    '''
    if n_blocks==0: fade_in=False # for 0 blocks fade_in alwayse False (better: do error handling)
    input_shape = (1024*2**n_blocks,1) # input_shape of the discriminator depends on number of blocks added
    inputs = keras.layers.Input(shape=input_shape,name="discriminator_inputs") # define keras input
    x = Conv1DEQ(filters=n_filters[n_blocks], kernel_size=1, padding='same', name="input_Conv1D_1")(inputs) # input Conv layer with kernel size 1
    x = keras.layers.LeakyReLU(alpha=0.2,name=f"input_LeakyRelu_1")(x) # ReLU activation 
    
    if fade_in: # create fade in network if fade_in is set to True
        new_block = create_discriminator_block(x,n_blocks) # create the new Conv block that is faded in smoothly
        
        downsample = keras.layers.AveragePooling1D(pool_size=2,strides=2,  # downsample the input directly before processing
                                                   padding="same",
                                                   name=f"Downsample_AveragePooling1D_fade_in_model")(inputs)
        x = Conv1DEQ(filters=n_filters[n_blocks-1], kernel_size=1, padding='same', name=f"block{n_blocks-1}_input_Conv1D_1")(downsample) # input conv layer
        old_block = keras.layers.LeakyReLU(alpha=0.2,name=f"block{n_blocks-1}_input_LeakyRelu_1")(x) # downsampled input from the old block
        
        x = WeightedSum(name="WeightedSum_Layer")([old_block, new_block]) # weighted sum of old and new block
        
        for block_n in range(n_blocks-1,0,-1):  # build the rest of the network 
            x = create_discriminator_block(x,block_n)             
        
    else: # create full network if fade_in is set to False
        for block_n in range(n_blocks,0,-1): # directly build all blocks without a split path
            x = create_discriminator_block(x,block_n)
        
    x = MinibatchStdev(name="MinibatchStdev")(x) # add mini batch stdev as a new feature map
    # add last conv block which ist the block form level 1
    x = Conv1DEQ(filters=256,kernel_size=25 , padding='same', name="block_0_Conv1D_1")(x) 
    x = keras.layers.LeakyReLU(alpha=0.2,name=f"block_0_LeakyRelu_1")(x)
    x = Conv1DEQ(filters=256, kernel_size=25, padding='same', name="block_0_Conv1D_2")(x)
    x = keras.layers.LeakyReLU(alpha=0.2,name=f"block_0_LeakyRelu_2")(x)
    
    x = keras.layers.Flatten(name="Flatten_Layer")(x) # Flatten the output of the conv block
    out_class = keras.layers.Dense(1,name="discriminator_outputs")(x) # consolidate all information to a single number, no activation = linear activation

    model = keras.models.Model(inputs, out_class,name="discriminator_model") # create the keras model
    return model # return the model
 

## Generator

This part of the code consists of two functions to create the generator model. The function create_generator is called later to define the generator for a given level and with the fade in path or without and calls the create_generator_block function. The latent dimension is set later in the notebook.
Level one corresponds to n_blocks = 0 because no additional blocks are needed. If the fade in parameter is set to True the split path as explained in the thesis is added to the network.

In [None]:
# number of filters used for the different levels
n_filters = {0:256,1:256,2:128,3:64,4:64,5:32,6:32,7:16,8:16}

def create_generator_block(x,block_n):
    '''
    creates a new convolutional block consisting of and upsampling layer followed by
    two convolutional layer with equalized learning rate, pixel normalization 
    and a leaky ReLu activation function
    x: keras Layer
    block_n: number of current block that is added
    '''
    upsampling = keras.layers.UpSampling1D(size=2,name=f"block{block_n}_Upsampling")(x)
    x = Conv1DEQ(filters=n_filters[block_n], kernel_size=25, padding='same',name=f"block{block_n}_Conv1D_1")(upsampling)
    x = PixelNormalization(name=f"block{block_n}_PixNorm_1")(x)
    x = keras.layers.LeakyReLU(alpha=0.2,name=f"block{block_n}_LeakyRelu_1")(x)
    x = Conv1DEQ(filters=n_filters[block_n], kernel_size=25, padding='same',name=f"block{block_n}_Conv1D_2")(x)
    x = PixelNormalization(name=f"block{block_n}_PixNorm_2")(x)
    x = keras.layers.LeakyReLU(alpha=0.2,name=f"block{block_n}_LeakyRelu_2")(x)
    return x
    
    
def create_generator(latent_dim,n_blocks,fade_in=False,in_dim=1024):
    '''
    creats the generator network as either a fade in network or a full level network with the number of conv blocks passed
    latent_dim: dimension of the latent vector that is fed as an input to the generator
    n_blocks: basically indicates the level of the discriminator with 0 being the first level and 8 being the final
    fade_in: indicate if its a fade in network or a full network (True/False)    
    in_dim: starting dimension of the network, for out case alwayse 1024 
    '''
    if n_blocks==0: fade_in=False  # for 0 blocks fade_in alwayse False (better: do error handling)
    init = keras.initializers.RandomNormal(stddev=1) # weight initialization for the dense layer
    inputs = keras.layers.Input(shape=(latent_dim,),name="generator_inputs") # keras input
    x = keras.layers.Dense(4 * in_dim, kernel_initializer=init,name="Dense_Layer")(inputs)  # scale the input to the starting dimension with four feature maps
    x = keras.layers.Reshape((in_dim, 4),name="Reshape_Layer")(x) # Reshape to the starting dimension with four feature maps
    # first convolutional block which is the same for all levels
    x = Conv1DEQ(filters=256, kernel_size=25, padding='same', name="block0_Conv1D_1")(x) 
    x = PixelNormalization(name="block0_PixNorm_1")(x)
    x = keras.layers.LeakyReLU(alpha=0.2,name="block0_LeackRelu_1")(x)
    x = Conv1DEQ(filters=256, kernel_size=25, padding='same', name="block0_Conv1D_2")(x)
    x = PixelNormalization(name="block0_PixNorm_2")(x)
    x = keras.layers.LeakyReLU(alpha=0.2,name="block0_LeackRelu_2")(x)   
    
    if fade_in: # create fade in network if fade_in is set to True
        for block_n in range(n_blocks-1): # for fade in = True add all blocks expcept for the last one because the path is split
            x = create_generator_block(x,block_n+1)    

 
        upsampling = keras.layers.UpSampling1D(size=2,name=f"fade_in_Upsampling")(x) # upsample the output of the last conv block, which is the output of the last level
        output_old = Conv1DEQ(filters=1, kernel_size=1, padding='same',activation="tanh", # add output conv layer
                             name="generator_output_old")(upsampling) 
        x = create_generator_block(x,n_blocks) # add the new conv block
        output_new = Conv1DEQ(filters=1, kernel_size=1, padding='same',activation="tanh", # add the output conv layer for the new conv block
                             name="generator_output_new")(x)  
        outputs = WeightedSum(name="weighted_sum_layer")([output_old, output_new]) # build the weighed sum of the ols and new block           
        
    else: # create full network if fade_in is set to False
        for block_n in range(n_blocks): # add all blocks without a split path
            x = create_generator_block(x,block_n+1)    

        outputs = Conv1DEQ(filters=1, kernel_size=1, padding='same',activation="tanh", # add the output layer
                                 name="generator_outptus")(x)
    model = keras.models.Model(inputs, outputs,name="generator_model_full") # build the keras model
    return model # return the model



## GAN
This part of the code defines the model as a whole that is used for training later on. It takes the generator and the discriminator as an input and sets all hyperparameters needed for the training. The fit method is later called on this model. It has methods to save and set the weights of both the generator and discriminator.

The structure and the methods train_step and gradient_penalty were taken from: \
https://keras.io/examples/generative/wgan_gp/

The method train_step is decorated by the function "@tf.function". This converts the code into an executable graph when training. This results in a longer loading time until the training starts but makes the code significantly faster when the graph is executed. To use this function only tensorflow's own data types are allowed. Within the graph no variable values can be changed. For more details refer to: https://www.tensorflow.org/guide/function#conditionals

In [None]:
class WGAN_GP(keras.Model):
    '''
    build the GAN network with wasserstein loss and gradient penalty
    inherits from keras Model
    discriminator: discriminator network
    generator: generator network
    latent_dim: latent dimension for generator input
    n_epochs: number of epochs that it should the both networks with
    fade_in: if both networks are fade in networks or not
    discriminator_extra_steps: how many training steps the discriminator should be trained for every training step of the generator
    gp_weight: the weight for the gradient penalty
    epsilon_drift: the weight for the output drift penalty
    '''
    def __init__(self, discriminator, generator, latent_dim,n_epochs,fade_in,
                 discriminator_extra_steps=3, gp_weight=10, epsilon_drift = 0.0
                 ):
        super(WGAN_GP, self).__init__()  # call the constructure of the parent class
        # instantiate all class variables
        self.discriminator = discriminator  
        self.generator = generator
        self.latent_dim = latent_dim
        self.n_epochs = n_epochs
        self.fade_in = fade_in
        self.d_steps = discriminator_extra_steps
        self.gp_weight = gp_weight
        self.epsilon_drift = epsilon_drift
        
        

    def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):
        '''
        compile the model, basically initializes the optimizers and the loss functions
        '''
        super(WGAN_GP, self).compile()  # call the parent compile function
        self.d_optimizer = d_optimizer  # discrimintor optimizer
        self.g_optimizer = g_optimizer # generator optimizer
        self.d_loss_fn = d_loss_fn # discriminator loss function
        self.g_loss_fn = g_loss_fn # generator loss function
    
    # forward pass of the network, needs to be defined when using subclassing with tf.function
    def call(self, data):
        '''
        calculates the forward pass of the network
        not needed because the generator and the discriminator are trained internally 
        '''
        pass
    
    
    @tf.function # tf.function converts the decorated funciton into an executable graph (eager execution is disabled) 
    def train_step(self, real_samples):
        '''
        executes one training step of the GAN network this involes calculating an applying the gradients of the
        loss functions for the discriminator and the generator w.r.t. the parameters 
        real_samples: batch of training data of real songs
        '''
        if isinstance(real_samples, tuple): # check for tuple data type
            real_samples = real_samples[0] # extract the training data from the tuple
        batch_size = tf.shape(real_samples)[0] # extract the batch size
    
        # train the discriminator
    
        for i in range(self.d_steps): # execute the number of discriminator training steps that was defined before training the generator
            random_latent_vectors = tf.random.uniform(shape=(batch_size,latent_dim), # get the latent vector from a uniform [-1,1] distribution
                                                  minval=-1, maxval=1)
            with tf.GradientTape() as tape: # create a gradient tape
                fake_samples = self.generator(  # generate fake samples with the generator
                    random_latent_vectors, training=True)
                fake_logits = self.discriminator(fake_samples, training=True) # get the critic score of the discriminator for the fake samples
                real_logits = self.discriminator(real_samples, training=True) # get the critic score of the discriminator for the real samples
         
                d_cost = self.d_loss_fn(real_sample=real_logits,  # calculate the discriminator loss with the loss funciton passed in the compile method
                                        fake_sample=fake_logits)
                gp = self._gradient_penalty( # calculate the gradient penaly with the defined function
                    batch_size, real_samples, fake_samples)
                
                # add the gradient penalty and the drift penalty to the original discriminator loss
                d_loss = d_cost + gp * self.gp_weight + self.epsilon_drift *((tf.math.abs(tf.reduce_mean(fake_logits))+tf.math.abs(tf.reduce_mean(real_logits)))/2)**2

            d_gradient = tape.gradient(        
                d_loss, self.discriminator.trainable_variables) # get the gradient w.r.t the discriminator loss
            
            self.d_optimizer.apply_gradients(  
                zip(d_gradient, self.discriminator.trainable_variables) # update the weights of the discriminator using the discrminator optimizer
            )  
            
        # train the generator
        
        random_latent_vectors = tf.random.uniform(shape=(batch_size,latent_dim),
                                                  minval=-1, maxval=1)          # get the latent vector
       
        with tf.GradientTape() as tape:  # create a gradient tape
            generated_samples = self.generator(random_latent_vectors,training=True) #generate fake images using the generator
            gen_sample_logits = self.discriminator(generated_samples,training=True) # get the discriminator critic score for fake samples
            g_loss = self.g_loss_fn(gen_sample_logits) # calculate the generator loss
        
        
        
        gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables) # get the gradients w.r.t. the generator loss
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables) # update the weights of the generator using the genrator optimizer
        )
        return {"d_loss":d_loss, "g_loss":g_loss}  # return the losses as a dictionary
                
    def _gradient_penalty(self, batch_size, real_samples, fake_samples):
        '''
        calculates the gradient penalty to enforce the lipschitz constraint
        batch_size: batch size of the training
        real_samples: real audio samples
        fake_samples: samples generated by the generator
        '''

        # get inerpolated sample, so a sample which lies between the real and the fake one
        alpha = tf.random.normal(
            shape=[batch_size, 1,1], mean=0.0, stddev=1.0)
        diff = fake_samples - real_samples
        interpolated = real_samples + alpha * diff # create the interpolated sample

        with tf.GradientTape() as gp_tape: # create a gradient tape
            gp_tape.watch(interpolated) # watch the gradient for the data interpolated
            pred = self.discriminator(interpolated, training=True)  # get the critic score for the interpolated sample

        grads = gp_tape.gradient(pred, [interpolated])[0] # get the gradients 
        # calculate gradient norm
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2]))
        gp = tf.reduce_mean((norm-1.0)**2)  
        return gp
    
    
    def update_generator_weights(self,generator_weights):
        '''
        upadte the weights of the generator with some given weights
        generator_weights: dictionary of layer names and weights create by the methond save_generator_weights
        '''
        for i in range(len(self.generator.layers)): # loop through all layers
            try: self.generator.layers[i].set_weights(generator_weights[self.generator.layers[i].name]) # check if layer name matches with the dict and set weights
            except: pass # if layer name is not found in dict then skip the layer
    def update_discriminator_weights(self,discriminator_weights):
        '''
        upadte the weights of the discriminator with some given weights
        discriminator_weights: dictionary of layer names and weights create by the methond save_discriminator_weights
        '''
        for i in range(len(self.discriminator.layers)): # loop through all layers
            try: self.discriminator.layers[i].set_weights(discriminator_weights[self.discriminator.layers[i].name]) # check if layer name matches with the dict and set weights
            except: pass # if layer name is not found in dict then skip the layer
            
    def save_generator_weights(self):
        '''
        create a dictionary with the layers names and their weights of the generator
        '''
        generator_weights = {} # instantate dict
        for i in range(len(self.generator.layers)): # loop through layers
            key = self.generator.layers[i].name # save name as key
            value = self.generator.layers[i].get_weights() # save weights as value
            generator_weights[key]=value # add key value pair
        return generator_weights # return dict
    def save_discriminator_weights(self):
        '''
        create a dictionary with the layers names and their weights of the discriminator
        '''        
        discriminator_weights = {} # instantate dict
        for i in range(len(self.discriminator.layers)):# loop through layers
            key = self.discriminator.layers[i].name # save name as key
            value = self.discriminator.layers[i].get_weights() # save weights as value
            discriminator_weights[key]=value # add key value pair
        return discriminator_weights # return dict

# Data Loading
Because of the large size of the training data, not all the data is loaded into memory at once. The BatchGenerator provided the data to the model batch-wise. This is usually done by the CPU while the actually training (calculating and applying gradients) is done by a GPU. Therefore several batches can be preloaded into a queue, this is defined later in the fit method.

This part of the code was inspired by: https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence

In [None]:
class BatchGenerator(tf.keras.utils.Sequence):
    '''
    inherit from keras Sequence
    creats BatchGenerator that returns a batch of training data loaded from the given folder
    filenames: .npy file with the filenames of files in a folder that will be loaded
    batch_size: how many files will be loaded
    training_data_dir: folder name that lies within the TRAINING_DATA_BASE_DIR folder
    '''
  
    def __init__(self, filenames, batch_size, trainig_data_dir) :
        self.filenames = filenames
        self.batch_size = batch_size
        self.trainig_data_dir = trainig_data_dir
    
    
    def __len__(self) :
        '''
        caculated the number of batches for a given batch size
        '''
        return (np.ceil(len(self.filenames) / float(self.batch_size))).astype(np.int)
  
  
    def __getitem__(self, idx) :
        '''
        loads the data from the defined folder and returns a batch of data
        '''
        batch_x = self.filenames[idx * self.batch_size : (idx+1) * self.batch_size] # get filenames of the files for the batch
        
        sound_files = np.array([np.load(f"{TRAINING_DATA_BASE_DIR}/{self.trainig_data_dir}/{file_name}",allow_pickle=True)
                         for file_name in batch_x]) # load the .npy files from the folder given the filename
        sound_files = np.expand_dims(sound_files, axis=-1) # add an axis so its compatilbe with the network input sturcture
        return sound_files

# Monitoring

Here, the keras callback is defined. This class provides methods that are executed during the training process. It allows to output the current loss and save intermediate results. After every epoch of the full network (fade_in = False) the generator and the discriminator are saved along with a graph that shows the loss for each batch for that epoch. 

This part of the code was inspired by: https://www.tensorflow.org/guide/keras/custom_callback

In [None]:
import soundfile as sf
from matplotlib import pyplot as plt

class GANMonitor(keras.callbacks.Callback):
    '''
    inherits from kreas callback
    create a custom callback with methods that are executed after some part of the training is done
    latent_dim: latent dimension of the generator input vector
    sample_rate: sampling rate of the current network level
    iteration: indicating the level of the current network (iteration 0 corresponds to level 1)
    fade_in: whether the networsk are fade in or full networks (True/False)
    n_epochs: number of epochs for the training iteration
    n_batches: number of batches for the training iteration
    '''
    def __init__(self,latent_dim,sample_rate,iteration,fade_in,n_epochs,n_batches):
        self.latent_dim = latent_dim
        self.sample_rate = sample_rate
        self.iteration = iteration
        self.fade_in = fade_in
        self.n_epochs = n_epochs
        self.n_batches = n_batches
        self.g_loss_history = []  # instantiate an empty list to save the generator loss values
        self.d_loss_history = []  # instantiate an empty list to save the discriminator loss values
        self.epoch = 1 # instantiate the current epoch with one

        
    def on_epoch_end(self,epoch,logs=None):
        '''
        gets executed when one training epoch has finished
        epoch: number of current epoch
        logs: loss dict for loss values of the generator and discriminator
        '''
        self.epoch = epoch
        if not(self.fade_in): # if fade_in == False then output the loss history and save the discriminator and generator
            g_name = f"{MONITORING_DIR}/iteration_{self.iteration}_generator_epoch_{epoch+1}.h5" # define gen name for current iteration and epoch
            d_name = f"{MONITORING_DIR}/iteration_{self.iteration}_discriminator_epoch_{epoch+1}.h5" # define discr name for current iteration and epoch
            self.model.generator.save(g_name) # save generator
            self.model.discriminator.save(d_name) # save discriminator
            
            plt.plot(self.d_loss_history, label='d_loss') # create a plot of discriminator loss
            plt.plot(self.g_loss_history, label='g_loss') # add the generator loss to the same plot
            plt.legend() # add legend
            plt.savefig(f'{MONITORING_DIR}/iteration_{self.iteration}_loss_histroy_epoch_{epoch+1}.png') # save the plot
            plt.close() # close the plot
            self.g_loss_history, self.d_loss_history = [],[]  #reset loss values, so we get the history only for the epoch
        else: # if fade_in == True then increase the alpha value for the weighted sume
            alpha_new_value = epoch/self.n_epochs # alpah is the fraction of the current epoch and the number of total training epochs
            for layer in self.model.discriminator.layers: # loops through all layers in the discr
                if isinstance(layer, WeightedSum): # check if its the WeightedSum layer
                    keras.backend.set_value(layer.alpha, alpha_new_value) # adjust the alpha value
            for layer in self.model.generator.layers: # loops through all layers in the gen
                if isinstance(layer, WeightedSum): # check if its the WeightedSum layer
                    keras.backend.set_value(layer.alpha, alpha_new_value) # adjust the alpha value
        gc.collect() # automatic garbage collection to help avoid out of memory error
        tf.keras.backend.clear_session() # clear keras backend to avoid out of memeory errr
        
        

    def on_train_batch_end(self,batch,logs=None):
        '''
        gets executed after training with one batch has finished
        batch: current batch number
        logs: dict with loss values for gen and discr
        '''
        d_loss = tf.round(logs["d_loss"]) # save the discr loss
        g_loss = tf.round(logs["g_loss"]) # save the gen loss
        self.d_loss_history.append(d_loss) # add the discr loss to the list
        self.g_loss_history.append(g_loss) # add the gen loss to the list


        

                
    
        

# Training

## Instantiation

Here, some hyperparameters like number of epochs, batch sizes, optimizers, loss functions and penalty weights are defined.

In [None]:
growing_iterations = 8 # define number of growing iterations


EPOCHS_FADE_IN = [7,7,7,7,7,7,7,7,7] # set number of epochs for the fade in networks
EPOCHS_FULL = [15,15,15,15,15,15,15,15,15] # set number of epochs for the full networks
assert len(EPOCHS_FADE_IN) == growing_iterations+1 # make sure right number of epochs defined
assert len(EPOCHS_FULL) == growing_iterations+1 # make sure right number of epochs defined

latent_dim = 256 # set latent dimension for generator input

BATCH_SIZE = [32,32,32,16,16,16,16,16,16] # set batch size
assert len(BATCH_SIZE) == growing_iterations+1 # make sure right number of batch sizes defined

discriminator_steps = 2 # set number of discriminator training steps per generator training step


gp_weight = 10.0 # weight for gradient penalty of discriminator loss
epsilon_drift = 0.002 # weight for drift penalty for discriminator loss

# istantiate optimizers
generator_optimizer = keras.optimizers.Adam(learning_rate=0.001,
                                            beta_1=0.0,
                                            beta_2=0.9,
                                            epsilon=10e-8
                                            )
discriminator_optimizer = keras.optimizers.Adam(learning_rate=0.001,
                                               beta_1=0.0,
                                               beta_2=0.9,
                                               epsilon=10e-8)
# define losses
# gp is added later to discriminator loss
def d_loss_fn(real_sample, fake_sample):
    '''
    defines the loss function of the discriminator
    the penalties for the drift and the gradient are added later
    real_samples: critic score for real samples
    fake_samples: critic score for fake samples generated by the generator
    '''
    real_loss = tf.reduce_mean(real_sample)
    fake_loss = tf.reduce_mean(fake_sample)
    return fake_loss - real_loss

def g_loss_fn(fake_sample):
    '''
    defines the loss function of the generator
    fake_sample: critic score of the fake samples generated by the generator
    '''
    return -tf.reduce_mean(fake_sample)





## Growing Loop

This part of the code executes the actual training. It iterates over the different levels of the model and trains first the fade in models and then the full models (expect for level one, there is no fade in). The discriminator and generator are saved after every epoch, therefore, the epoch which should be loaded has to be defined as well. If the training should start with an intermediate result, then the parameter "starting_iteration" should be set accordingly (level one corresponds to starting_iteration = 0). "continue_with_same_level" can be set to True if the same level should be continued training and False if the training should load the models and then start training the next level. For example when training was interrupted after epoch 8 in iteration 5 and the model should train two more epochs for iteration 5 then set the paremeters as following. starting_iteration = 5, latest_epoch_saved = 8, continue_with_same_level = True and set the number of epochs to 2 during instantiation.



In [None]:
starting_iteration = 0  # with which iteration should the training start
latest_epoch_saved = 0  # which epoch should be loaded (leave zero for first execution)
continue_with_same_level = True # for an intermediate training start indicate whether to continue training with the same level
eager_execution = False # enable eager_execution if necessary

for iteration in range(starting_iteration,growing_iterations+1):
    epochs = EPOCHS_FULL # set epochs to EPOCHS_FULL
    n_epochs = epochs[iteration] # extracht number of epochs

    batch_size = BATCH_SIZE[iteration]   # define the batch size for the iteration
    sample_rate = FINISH_SAMPLE_RATE/2**(growing_iterations-iteration) # define the sample rate for the resolution
    training_data_dir = f"sr_{int(sample_rate)}"  # define the training directory based on the sample rate
    filenames = np.load(f"{FILENAME_DIR}/filenames.npy",allow_pickle=True) # get the filenames out of the regarding file
    
    training_data_generator = BatchGenerator(filenames,batch_size,training_data_dir) # define the Iterator Object
    n_batches = len(training_data_generator) # get the number of batches

    for fade_in in [True,False]: # execute fade in training and then full training
        print(f"Start Iteration {iteration}, fade_in:{fade_in}")
        
        
        if fade_in == False: # for fade_in get the regarding epoch numbers
            epochs = EPOCHS_FADE_IN
            n_epochs = epochs[iteration]    
        callbacks = GANMonitor(latent_dim=latent_dim,sample_rate=sample_rate,iteration=iteration, 
                               fade_in = fade_in, n_epochs=n_epochs,n_batches=n_batches) #instantiate callback Object
        
        # check if training starts with and interim model and load weights accordlingly
        if iteration == starting_iteration and fade_in == True: continue # skip fade_in for the starting iteration
        if iteration == starting_iteration and fade_in == False and starting_iteration != 0 and latest_epoch_saved != 0: # if training starts with a later level load models
            # load gen
            generator = load_model(f'./{MONITORING_DIR}/iteration_{starting_iteration}_generator_epoch_{latest_epoch_saved}.h5', custom_objects={
                                                                                'PixelNormalization': PixelNormalization,
                                                                                'WeightedSum': WeightedSum,
                                                                                'Conv1DEQ':Conv1DEQ_load})
            # load discr
            discriminator = load_model(f'./{MONITORING_DIR}/iteration_{starting_iteration}_discriminator_epoch_{latest_epoch_saved}.h5', custom_objects={
                                                                                'MinibatchStdev': MinibatchStdev,
                                                                                'WeightedSum': WeightedSum,
                                                                                'Conv1DEQ':Conv1DEQ_load})
            # instantiate gan
            wgan_gp = WGAN_GP(discriminator, generator, latent_dim,n_epochs=n_epochs,fade_in=fade_in,
                         discriminator_extra_steps=discriminator_steps, gp_weight=gp_weight, epsilon_drift = epsilon_drift)
            # save weights
            discriminator_weights = wgan_gp.save_discriminator_weights()
            generator_weights = wgan_gp.save_generator_weights()
            
            
            if continue_with_same_level == False: continue # skip training for this iteration because the models are already trained and the weights saved


        # start training    
        generator = create_generator(latent_dim,iteration,fade_in=fade_in) # instantiate generator
        discriminator = create_discriminator(iteration,fade_in=fade_in) # instatiate discriminator
        wgan_gp = WGAN_GP(discriminator, generator, latent_dim,n_epochs=n_epochs,fade_in=fade_in,
                         discriminator_extra_steps=discriminator_steps, gp_weight=gp_weight)  # instantiate GAN  
        # if iteration is greater than zero update the weights of the network with the saved values
        if iteration > 0:
            wgan_gp.update_generator_weights(generator_weights)
            wgan_gp.update_discriminator_weights(discriminator_weights)
        # compile the GAN    
        wgan_gp.compile(
            d_optimizer=discriminator_optimizer,
            g_optimizer=generator_optimizer,
            g_loss_fn=g_loss_fn,
            d_loss_fn=d_loss_fn,
        )
        
        wgan_gp.run_eagerly = eager_execution  # enable eager execution if set to True
        
        # Fit teh GAN: uses the keras interanl Fit function with a Generator and a custom callback 
        wgan_gp.fit(training_data_generator,batch_size=batch_size, epochs=n_epochs,callbacks=[callbacks],
                 verbose=1,max_queue_size=8,workers=8)
        
        discriminator_weights = wgan_gp.save_discriminator_weights() # save discr weights for next iteration
        generator_weights = wgan_gp.save_generator_weights() # save gen weights for next iteration
        del generator # delete generator object
        del discriminator # delete discriminator object
        del wgan_gp # delete GAN object
        gc.collect() # python garbage collection to avoid OOM
        tf.keras.backend.clear_session() # keras clear backend session OOM