In [None]:
import os
import tensorflow as tf
from keras.layers.merge import _Merge
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.utils import plot_model
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, Conv2DTranspose, Reshape, Lambda, Activation, BatchNormalization, LeakyReLU, Dropout, ZeroPadding2D, UpSampling2D
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras import backend as K
from tensorflow.keras.initializers import RandomNormal

import math
import matplotlib.pyplot as plt

import pypianoroll
import numpy as np
from pypianoroll import Multitrack, Track
from functools import partial

In [None]:
# run params
run_id = '4001'
music_name = 'wgan-gp/'

RUN_FOLDER = 'run/'
RUN_FOLDER += '_'.join([run_id, music_name])

# Number of timestept the slices Pianorolls should have (Needs to be dividable by 16)
pianrollLength = 128

store_folder = os.path.join(RUN_FOLDER, 'store')
data_folder = os.path.join('data', music_name)

if not os.path.exists(RUN_FOLDER):
    os.makedirs(RUN_FOLDER)
    os.mkdir(os.path.join(RUN_FOLDER, 'store'))
    os.mkdir(os.path.join(RUN_FOLDER, 'output'))
    os.mkdir(os.path.join(RUN_FOLDER, 'weights'))
    os.mkdir(os.path.join(RUN_FOLDER, 'viz'))

mode = 'build' # 'load' # 

#weight_init = RandomNormal(mean=0.05, stddev=0.05)
weight_init = tf.keras.initializers.he_uniform(seed=16)
# Clip Threshold for weight clipping should be in range [-0.01, 0.01]
clip_threshold = 0.01
# Set Note bounds for faster training
lowestNotePossible = 20
highestNotePossible = 108
# possibleNotes mus be dividable by 4 else the Architekture needs to be changed
possibleNotes = highestNotePossible - lowestNotePossible

input_dim = (pianrollLength,possibleNotes,1)

batch_size = 32

grad_weight = 10

In [None]:
# Save the numpyArray for further us
'''
reshaped = np.load('data/preprocessed/midi_p128_dn88.npy')
isNormalized = False

reshaped = np.load('data/preprocessed/midi_normalized_p128_dn88.npy')
isNormalized = True
'''

reshaped = np.load('data/preprocessed/midi_binarized_p128_dn88.npy')
isNormalized = True

In [None]:
class RandomWeightedAverage(tf.keras.layers.Layer):
    def __init__(self, batch_size):
        super().__init__()
        self.batch_size = batch_size
    # Provides a (random) weighted average between real and generated image samples
    def call(self, inputs, **kwargs):
        #https://stackoverflow.com/a/58136256
        alpha = K.random_uniform((self.batch_size, 1, 1, 1))
        return (alpha * inputs[0]) + ((1 - alpha) * inputs[1])
    
    def compute_output_shape(self, input_shape):
        return input_shape[0]

In [None]:
# https://github.com/jacobgil/keras-grad-cam/issues/17#issuecomment-398053700
def _compute_gradients(tensor, var_list):
    grads = tf.gradients(tensor, var_list)
    return [grad if grad is not None else tf.zeros_like(var) for var, grad in zip(var_list, grads)]

def gradient_penalty_loss(y_true, y_pred, interpolated_samples):
    """
    Computes gradient penalty based on prediction and weighted real / fake samples
    """
    gradients = _compute_gradients(y_pred, [interpolated_samples])[0]
    #gradients = K.gradients(y_pred, interpolated_samples)[0]

    # compute the euclidean norm by squaring ...
    gradients_sqr = K.square(gradients)
    #   ... summing over the rows ...
    gradients_sqr_sum = K.sum(gradients_sqr, axis=np.arange(1, len(gradients_sqr.shape)))
    #   ... and sqrt
    gradient_l2_norm = K.sqrt(gradients_sqr_sum)
    # compute lambda * (1 - ||grad||)^2 still for each single sample
    gradient_penalty = K.square(1 - gradient_l2_norm)
    # return the mean as loss over all the batch samples
    return K.mean(gradient_penalty)

In [None]:
# Quelle: https://github.com/ChengBinJin/WGAN-GP-tensorflow/blob/master/src/wgan_gp.py 90-98
def gradient_penalty():
    alpha = tf.random_uniform(shape=[batch_size, 1, 1, 1], minval=0., maxval=1.)
    differences = g_samples - Y
    interpolates = Y + (alpha * differences)
    gradients = tf.gradients(discriminator(interpolates, is_reuse=True), [interpolates])[0]
    slopes = tf.sqrt(tf.reduce_sum(tf.square(gradients), reduction_indices=[1, 2, 3]))
    gradient_penalty = tf.reduce_mean((slopes-1.)**2)

    return gradient_penalty

# Creating the neural Network

In [None]:
### THE discriminator
discriminator_input = Input(shape=(pianrollLength,possibleNotes,1), name='discriminator_input')

x = discriminator_input

x = Conv2D(filters = 64, kernel_size = (5,5), strides=2, padding = 'same', kernel_initializer = weight_init, name = 'discriminator_conv_0')(x)
# x = LeakyReLU()(x)
x = Activation('relu')(x)
x = Dropout(0.4)(x)

x = Conv2D(filters = 64, kernel_size = (5,5), strides=2, padding = 'same', kernel_initializer = weight_init, name = 'discriminator_conv_1')(x)
# x = LeakyReLU()(x)
x = Activation('relu')(x)
x = Dropout(0.4)(x)

x = Conv2D(filters = 128, kernel_size = (5,5), strides=2, padding = 'same', kernel_initializer = weight_init, name = 'discriminator_conv_2')(x)
# x = LeakyReLU()(x)
x = Activation('relu')(x)
x = Dropout(0.2)(x)

x = Conv2D(filters = 128, kernel_size = (5,5), strides=1, padding = 'same', kernel_initializer = weight_init, name = 'discriminator_conv_3')(x)
# x = LeakyReLU()(x)
x = Activation('relu')(x)
x = Dropout(0.2)(x)


x = Flatten()(x)

discriminator_output = Dense(1, activation='sigmoid', kernel_initializer = weight_init)(x)
discriminator = Model(discriminator_input, discriminator_output, name= 'discriminator')

In [None]:
discriminator.summary()

In [None]:
# Quelle: https://stackoverflow.com/a/45199301
# Save the summary to a file 
#from contextlib import redirect_stdout

#with open(os.path.join(store_folder, 'modelsummarydiscriminator.txt'), 'w') as f:
#    with redirect_stdout(f):
#        discriminator.summary()

In [None]:
z_dim = 100

generator_input = Input(shape=(z_dim,), name='generator_input')
generator_initial_dense_layer_size = (int(pianrollLength/4),int(possibleNotes/4),8)

x = generator_input
x = Dense(np.prod(generator_initial_dense_layer_size), kernel_initializer = weight_init)(x)
x = BatchNormalization(momentum=0.9)(x)

# x = LeakyReLU()(x)
x = Activation('relu')(x)
x = Reshape(generator_initial_dense_layer_size)(x)

x = UpSampling2D(size=(2, 2), data_format=None, interpolation='nearest')(x)
x = Conv2D(filters = 128, kernel_size = (5,5), padding='same', kernel_initializer = weight_init, name = 'generator_conv_0')(x)
x = BatchNormalization(momentum=0.9)(x)
# x = LeakyReLU()(x)
x = Activation('relu')(x)  

x = UpSampling2D(size=(2, 2), data_format=None, interpolation='nearest')(x)
x = Conv2D(filters = 64, kernel_size = (5,5), padding='same', kernel_initializer = weight_init, name = 'generator_conv_1')(x)
x = BatchNormalization(momentum=0.9)(x)
# x = LeakyReLU()(x)
x = Activation('relu')(x) 

x = Conv2D(filters = 64, kernel_size = (5,5), padding = 'same', kernel_initializer = weight_init, name = 'generator_conv_2')(x)
x = BatchNormalization(momentum=0.9)(x)
# x = LeakyReLU()(x)
x = Activation('relu')(x)

x = Conv2D(filters = 1, kernel_size = (5,5), padding = 'same', kernel_initializer = weight_init)(x)        
x = Activation('sigmoid')(x)


generator_output = x
generator = Model(generator_input, generator_output, name='generator')

In [None]:
generator.summary()

In [None]:
# Quelle: https://stackoverflow.com/a/45199301
# Save the summary to a file 
#from contextlib import redirect_stdout

#with open(os.path.join(store_folder, 'modelsummarygenerator.txt'), 'w') as f:
#    with redirect_stdout(f):
#        generator.summary()

In [None]:
def set_trainable(model, isTrainable):
    for layer in model.layers:
        layer.trainable = isTrainable

In [None]:
# Wasserstein loss function (Forster S. 117)
def wasserstein(y_true, y_pred):
    return -K.mean(y_true * y_pred)

In [None]:
#############################
### COMPILE DISCRIMINATOR ###
#############################

# Freeze generator's layers while training critic
set_trainable(generator, False)

# Image input (real sample)
real_img = Input(shape=input_dim)

# Fake image
z_disc = Input(shape=(z_dim,))
fake_img = generator(z_disc)

# critic determines validity of the real and fake images
fake = discriminator(fake_img)
valid = discriminator(real_img)

# Construct weighted average between real and fake images
interpolated_img = RandomWeightedAverage(batch_size)([real_img, fake_img])
# Determine validity of weighted sample
validity_interpolated = discriminator(interpolated_img)

# Use Python partial to provide loss function with additional
# 'interpolated_samples' argument
partial_gp_loss = partial(gradient_penalty_loss,
                          interpolated_samples=interpolated_img)
partial_gp_loss.__name__ = 'gradient_penalty' # Keras requires function names

discriminator_model = Model(inputs=[real_img, z_disc],
                            outputs=[valid, fake, validity_interpolated])

discriminator_model.compile(
            loss=[wasserstein,wasserstein, partial_gp_loss]
            ,optimizer=Adam(lr=0.0002, beta_1=0.5)
            ,loss_weights=[1, 1, grad_weight]
            )

############################
### COMPILE THE FULL GAN ###
############################
set_trainable(discriminator, False)
set_trainable(generator, True)

model_input = Input(shape=(z_dim,), name='model_input')
model_output = discriminator(generator(model_input))
GANModel = Model(model_input, model_output)

GANModel.compile(optimizer=Adam(learning_rate=0.0002, beta_1=0.5) , loss=wasserstein) #, metrics=['accuracy'])

set_trainable(discriminator, True)


In [None]:
#plot_model(model, to_file=os.path.join(RUN_FOLDER ,'viz/model.png'), show_shapes = True, show_layer_names = True)
#plot_model(discriminator, to_file=os.path.join(RUN_FOLDER ,'viz/discriminator.png'), show_shapes = True, show_layer_names = True)
#plot_model(generator, to_file=os.path.join(RUN_FOLDER ,'viz/generator.png'), show_shapes = True, show_layer_names = True)

# Training of the GAN

In [None]:
def generate_Midi_File(img, epoch, isNormalized):
    gen_img = img.reshape(128, 88)
    result = np.zeros((pianrollLength, 128))
    result[:,lowestNotePossible:highestNotePossible] = gen_img
    #if isNormalized:
    #result = (result > 0.1) * 255.
    result = result * 255.
    
    track = Track(pianoroll=result, program=0, is_drum=False,name='my awesome piano')

    multi = Multitrack()
    multi.tracks[0] = track
    pypianoroll.write(multi, os.path.join(RUN_FOLDER, "images/"+str(epoch)+".mid"))
    
    multi.tracks[0].plot()
    #fig.set_size_inches(100,100)
    #plt.show()
    plt.savefig(os.path.join(RUN_FOLDER, "images/"+str(epoch)+"_pianoroll.png"), format='png')
    plt.close()

In [None]:
def train_normal(epochs, batch_size=128):
    # Save the Generator and discriminator models
    #save_model(generator, os.path.join(RUN_FOLDER, 'images/generator'))
    #save_model(discriminator, os.path.join(RUN_FOLDER, 'images/discriminator'))
    #save_model(GANModel, os.path.join(RUN_FOLDER, r"images/GANModel"))
    # Adversarial ground truths
    valid = np.ones((batch_size, 1), dtype=np.float32)
    fake = -np.ones((batch_size, 1), dtype=np.float32)
    dummy = np.zeros((batch_size, 1), dtype=np.float32) # Dummy gt for gradient penalty
    # Label Smoothening
    #valid_smoothened = np.random.uniform(low=0.995, high=1.0, size=(batch_size,1))
    #fake_smoothened = np.random.uniform(low=0.0, high=0.005, size=(batch_size,1))
    
    for epoch in range(epochs):

        for x in range(5):
            # ---------------------
            #  Train Discriminator
            # ---------------------  
            valid = np.ones((batch_size, 1), dtype=np.float32)
            fake = -np.ones((batch_size, 1), dtype=np.float32)
            dummy = np.zeros((batch_size, 1), dtype=np.float32) # Dummy gt for gradient penalty
            
            # Select a random half of images
            idx = np.random.randint(0, reshaped.shape[0], batch_size)
            true_imgs = reshaped[idx]
            
            noise = np.random.normal(0, 1, (batch_size, z_dim))
        
            # ---------------------
            #  Train the discriminator (real classified as ones and generated as zeros)
            # ---------------------
            d_loss = discriminator_model.train_on_batch([true_imgs, noise], [valid, fake, dummy])

        # ---------------------
        #  Train Generator
        # ---------------------
        for x in range(1):
            valid = np.ones((batch_size,1), dtype=np.float32)
            noise = np.random.normal(0, 1, (batch_size, z_dim))
            g_loss = GANModel.train_on_batch(noise, valid)
        #print ("%d [G loss: %.3f] [G acc: %.3f]" % (epoch, g_loss[0], g_loss[1]))
        
        # ---------------------
        #  Save Losses for evaluation
        # ---------------------
        d_losses.append(d_loss)
        g_losses.append(g_loss)
        
        
        if (epoch % 100 == 0): 
            # Save an example
            noise = np.random.normal(0, 1, (batch_size, z_dim))
            gen_imgs = generator.predict(noise)
            fig=plt.figure(figsize=(64, 64))
            plt.imshow(gen_imgs[0, :, :, 0], cmap='gray')
            plt.axis('off')
            plt.savefig(os.path.join(RUN_FOLDER, "images/"+str(epoch)+".png"), format='png')
            generate_Midi_File(gen_imgs[0, :, :, 0], epoch, isNormalized)
            plt.close()
            
            # Continuiously save a plot with the new values to see the development of the loss
            fig = plt.figure()
            plt.plot([x[0] for x in d_losses], color='black', linewidth=0.25)
            plt.plot([x[1] for x in d_losses], color='green', linewidth=0.25)
            plt.plot([x[2] for x in d_losses], color='red', linewidth=0.25)
            plt.plot([x for x in g_losses], color='orange', linewidth=0.25)

            plt.xlabel('batch', fontsize=18)
            plt.ylabel('loss', fontsize=16)

            # plt.xlim(0, 2000)
            #plt.ylim(0, 50)

            plt.savefig(os.path.join(RUN_FOLDER, "images/loss_chart.png"), format='png')
            plt.show()
            plt.close
            # Save the loss arrays
            np.save(os.path.join(RUN_FOLDER, "images/D_loss.npy"), d_losses)
            np.save(os.path.join(RUN_FOLDER, "images/G_loss.npy"), g_losses)
            
            if (epoch % 1000 == 0):
                GANModel.save(os.path.join(RUN_FOLDER, 'images/GANModel_'+str(epoch)+'_loss_'+str(g_loss)+'.h5'))
                discriminator.save(os.path.join(RUN_FOLDER, 'images/discriminator_'+str(epoch)+'_loss_'+str(d_loss)+'.h5'))
                generator.save(os.path.join(RUN_FOLDER, 'images/generator_'+str(epoch)+'_loss_'+str(g_loss)+'.h5'))
                
                
        # Plot the progress
        if (epoch % 10 == 0):
            print ("%d [D loss: (%.1f)(R %.1f, F %.1f, G %.1f)] [G loss: %.1f]" % (epoch, d_loss[0], d_loss[1],d_loss[2],d_loss[3],g_loss))

In [None]:
d_losses = []
g_losses = []

train_normal(10001, batch_size)

In [None]:
fig = plt.figure()
plt.plot([x[0] for x in d_losses], color='black', linewidth=0.25)
plt.plot([x[1] for x in d_losses], color='green', linewidth=0.25)
plt.plot([x[2] for x in d_losses], color='red', linewidth=0.25)
plt.plot([x[0] for x in g_losses], color='orange', linewidth=0.25)

plt.xlabel('batch', fontsize=18)
plt.ylabel('loss', fontsize=16)

plt.xlim(0, 10000)
plt.ylim(0, 50)

plt.savefig(os.path.join(RUN_FOLDER, "images/loss_chart4.png"), format='png')
plt.show()
plt.close

In [None]:
# Quelle: https://stackoverflow.com/a/6537563
# Beep to tell training finished
import winsound
frequency = 300  # Set Frequency To 2500 Hertz
duration = 1000  # Set Duration To 1000 ms == 1 second
winsound.Beep(frequency, duration)

In [None]:
#d_losses = np.load(os.path.join(RUN_FOLDER, "images/D_loss.npy"))
#g_losses = np.load(os.path.join(RUN_FOLDER, "images/G_loss.npy"))