# WGAN with DCGAN layers
Code is mainly based upon the DCGAN implementation in the TensorFlow tutorials

In [1]:
from __future__ import absolute_import, division, print_function

# Import TensorFlow >= 1.10
import tensorflow as tf

import librosa
import os
import time
import numpy as np
import matplotlib.pyplot as plt
import PIL
import imageio
import simpleaudio as sa
import math
from IPython import display

  from ._conv import register_converters as _register_converters


In [2]:
sess = tf.Session(config=tf.ConfigProto(log_device_placement=False))

In [3]:
def playAudio(audio, sr):
    audio = audio.astype(np.int16)
    play_obj = sa.play_buffer(audio, 1, 2, sr)
    play_obj.wait_done()

# Hyperparameters

In [4]:
BUFFER_SIZE = 2048
BATCH_SIZE = 256
EPOCHS = 150
noise_dim = 100
num_examples_to_generate = 16
clipping_parameter = 0.01
# keeping the random vector constant for generation (prediction) so
# it will be easier to see the improvement of the gan.
random_vector_for_generation = tf.random_normal([num_examples_to_generate,
                                                 noise_dim])

# Make a dataset using TensorFlow's input pipeline

In [5]:
# Reading spectrograms from TFRecord file, making dataset
read_features = {
    'note': tf.FixedLenFeature([], dtype=tf.int64),
    'note_str': tf.FixedLenFeature([], dtype=tf.string),
    'instrument': tf.FixedLenFeature([], dtype=tf.int64),
    'instrument_str': tf.FixedLenFeature([], dtype=tf.string),
    'pitch': tf.FixedLenFeature([], dtype=tf.int64),
    'velocity': tf.FixedLenFeature([], dtype=tf.int64),
    'sample_rate': tf.FixedLenFeature([], dtype=tf.int64),
    'spectrogram': tf.FixedLenFeature([258300], dtype=float),
    'instrument_family': tf.FixedLenFeature([], dtype=tf.int64),
    'instrument_family_str': tf.FixedLenFeature([], dtype=tf.string),
    'instrument_source': tf.FixedLenFeature([], dtype=tf.int64),
    'instrument_source_str': tf.FixedLenFeature([], dtype=tf.string)
}

def _map(raw_data):
    return tf.reshape(tf.parse_single_example(serialized=raw_data, features=read_features)['spectrogram'], [1025, 126, 2])

specs = tf.data.TFRecordDataset("spectrograms.tfrecord")
specs = specs.apply(tf.data.experimental.shuffle_and_repeat(buffer_size=BUFFER_SIZE))
specs = specs.map(map_func=_map, num_parallel_calls=-1)
specs = specs.batch(batch_size=BATCH_SIZE)
specs = specs.prefetch(buffer_size=BUFFER_SIZE)

# Define models

In [7]:
class Generator(tf.keras.Model):
    def __init__(self):
        super(Generator, self).__init__()
        self.fc1 = tf.keras.layers.Dense(41*7*128, use_bias=False)
        self.batchnorm1 = tf.keras.layers.BatchNormalization()
        
        self.conv1 = tf.keras.layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False)
        self.batchnorm2 = tf.keras.layers.BatchNormalization()
    
        self.conv2 = tf.keras.layers.Conv2DTranspose(64, (5, 5), strides=(1, 2), padding='same', use_bias=False)
        self.batchnorm3 = tf.keras.layers.BatchNormalization()
    
        self.conv3 = tf.keras.layers.Conv2DTranspose(32, (5, 5), strides=(5, 3), padding='same', use_bias=False)
        self.batchnorm4 = tf.keras.layers.BatchNormalization()
        
        self.conv4 = tf.keras.layers.Conv2DTranspose(2, (5, 5), strides=(5, 3), padding='same', use_bias=False)
        
    def call(self, x, training=True):
        x = self.fc1(x)
        x = self.batchnorm1(x, training=training)
        x = tf.nn.relu(x)
        
        x = tf.reshape(x, shape=(-1, 41, 7, 128))
        
        x = self.conv1(x)
        x = self.batchnorm2(x, training=training)
        x = tf.nn.relu(x)
        
        x = self.conv2(x)
        x = self.batchnorm3(x, training=training)
        x = tf.nn.relu(x)
        
        x = self.conv3(x)
        x = self.batchnorm4(x, training=training)
        x = tf.nn.relu(x)
        
        x = tf.nn.tanh(self.conv4(x))  
        
        return x

In [8]:
class Discriminator(tf.keras.Model):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same')
        self.conv2 = tf.keras.layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same')
        self.dropout = tf.keras.layers.Dropout(0.3)
        self.flatten = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(1)

    def call(self, x, training=True):
        x = tf.nn.leaky_relu(self.conv1(x))
        x = self.dropout(x, training=training)
        x = tf.nn.leaky_relu(self.conv2(x))
        x = self.dropout(x, training=training)
        x = self.flatten(x)
        x = self.fc1(x, activation=None)
        return x

In [9]:
def discriminator_loss(real_output, generated_output):
    return real_output - generated_output

In [10]:
def generator_loss(generated_output):
    return -1*generated_output

# Defining training
buildgraph() is largely based upon Zardinality's WGAN implementation

In [13]:
def generate_and_save_images(model, epoch, test_input):
    # make sure the training parameter is set to False because we
    # don't want to train the batchnorm layer when doing inference.
    predictions = sess.run(model(test_input, training=False))

    fig = plt.figure(figsize=(4,4))

    for i in range(predictions.shape[0]):
        plt.subplot(4, 4, i+1)
        plt.imshow(predictions[i, :, :, 0] * 127.5, cmap="magma", origin="lower", aspect="auto")
        plt.axis('off')

    plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
    plt.show()

In [14]:
def buildgraph():
    epochs = 0
    with tf.variable_scope('gen'):
        generator = Generator()
    with tf.variable_scope('disc'):
        discriminator = Discriminator()
    noise = tf.random_normal([BATCH_SIZE, noise_dim])
    def ranEpoch():
        epochs += 1
    def getEpoch():
        return epochs
    images = tf.placeholder(tf.float32, shape=[BATCH_SIZE, 1025, 126, 2], name='images')
    
    gen_opt = tf.contrib.layers.optimize_loss(
        tf.reduce_mean(generator_loss(discriminator(generator(noise, training=True), training=True))), 
        None, learning_rate=1.2e-4, optimizer='Adam', variables=generator.trainable_variables)
    with tf.control_dependencies([gen_opt]):
        gen_opt = tf.tuple([tf.assign(var, tf.clip_by_value(var, -1*clipping_parameter, clipping_parameter)) 
                            for var in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='gen')])
        
    disc_opt = tf.contrib.layers.optimize_loss(
        tf.reduce_mean(discriminator_loss(
            discriminator(images, training=True), discriminator(generator(noise, training=True), training=True))),
        None, learning_rate=1e-4, optimizer='Adam', variables=discriminator.trainable_variables)
    with tf.control_dependencies([disc_opt]):
        disc_opt = tf.tuple([tf.assign(var, tf.clip_by_value(var, -1*clipping_parameter, clipping_parameter)) 
                             for var in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='disc')])
    
    return generator, gen_opt, disc_opt, images, ranEpoch, getEpoch

In [15]:
def train(batch, epochs, noise_dim, gen_opt, disc_opt, images):  
    for epoch in range(epochs):
        start = time.time()
        runOneEpoch(batch, noise_dim, gen_opt, disc_opt, images)
        print ('Time taken for epoch {} is {} sec'.format(epoch + 1,
                                                          time.time()-start))
  # generating after the final epoch
    display.clear_output(wait=True)
    generate_and_save_images(generator,
                           -1,
                           random_vector_for_generation)
    
def runOneEpoch(batch, noise_dim, gen_opt, disc_opt, images):
    start = time.time()
    for num in range(math.ceil(BUFFER_SIZE/BATCH_SIZE)):
        if(num % 5 == 4):
            sess.run(gen_opt, feed_dict={images: sess.run(batch)})
        else:
            sess.run(disc_opt, feed_dict={images: sess.run(batch)})
        print("Finished {} out of {}".format(num*BATCH_SIZE+BATCH_SIZE, math.ceil(BUFFER_SIZE)), end='\r')
    display.clear_output(wait=True)
    generate_and_save_images(generator,
                           -1,
                           random_vector_for_generation)

    # saving (checkpoint) the model every 15 epochs
    #if (epoch + 1) % 15 == 0:
        #checkpoint.save(file_prefix = checkpoint_prefix)

    print ('Time taken {} sec'.format(time.time()-start))

In [16]:
def testAudio(batch):
    while(True):
        spec = sess.run(batch)[0]
        mag = spec[:, :, 0]
        angles = spec[:, :, 1]
        mag = ((mag+1)/2)*48-32
        angles = angles*math.pi
        ft =(np.exp(mag)-1.2664166e-14)*np.exp(1j*angles)
        newaudio = librosa.istft(ft, 512, 2048)
        print('Generated audio')
        print('Interval of audio: [{}, {}]'.format(np.amin(newaudio), np.amax(newaudio)))
        playAudio(newaudio, 16000)

# Running the model

In [17]:
generator, gen_opt, disc_opt, images, ranEpoch, getEpoch = buildgraph()
iterator = specs.make_one_shot_iterator()
batch = iterator.get_next()
sess.run(tf.global_variables_initializer())

In [18]:
saver = tf.train.Saver()
if(not(os.path.isdir("checkpoints"))):
    saver.save(sess, "checkpoints/model")

In [None]:
# Can exit any time because model saves after every epoch
while True:
    saver.restore(sess, "checkpoints/model")
    runOneEpoch(batch, noise_dim, gen_opt, disc_opt, images)
    saver.save(sess, "checkpoints/model")

INFO:tensorflow:Restoring parameters from checkpoints/model


In [None]:
generate_and_save_images(generator, -1``, random_vector_for_generation)