In [None]:
import numpy as np
import pandas as pd
import scipy
import librosa
import matplotlib.pyplot as plt
import IPython.display as ipd
from sklearn import metrics
import tensorflow as tf

import os
import soundfile as sf
import time
import random

import data
import model_utils

%matplotlib inline
%config IPCompleter.greedy=True

# Training Parameters

In [None]:
# Main parameters
sr = 22050

# Choose which genre to model
genre = "Metal"

# Size of frame (in samples) that is fed to the model during training
frame = 64

# Chunk == sample
chunk = 1

# Duration of data in seconds
duration = 30

# Model name for saving the model
model_name_save = 'csc475_gan_baseline'

# Use saved data in dataset/experiments folder
use_saved_data = False

# Type of input; sequential or random frame order
sequential_input = False

if sequential_input==True:
    shuffle_state = False
elif sequential_input==False:
    shuffle_state = True

# Ratio of test data to training data
test_ratio = 0.2

# Use mu law companding
mu_law=False

# Batch size for training
batch_size_para = 64

# Epochs during training
epochs_ = 25

# Choose which test file segment to inspect visually
# Between 0 and 5 for default 2 minute duration
index = 4

In [None]:
# Disabling GPU for Mac M1 chips running tensorflow metal, do not use if not on Mac with M1 chip. 
# (RNNs run slowly on GPU)
tf.config.experimental.set_visible_devices([], 'GPU')

# Loading Training Data

In [None]:
# Building the dataset
file_data_path = os.path.join("dataset", "fileData.csv")
effect_data_path = os.path.join("dataset", "effectData.csv")

# Loading the dry and wet audio
clean_audio_path = os.path.join("dataset", "experiments", "clean_data.wav")
effect_audio_path = os.path.join("dataset", "experiments", "effect_data.wav")

if (use_saved_data == False):
    signal, wet = data.create_data(genre, effect_data_path, file_data_path, mu_comp=mu_law, srate=sr, 
                                   duration=duration, type="random")
    scipy.io.wavfile.write(clean_audio_path, rate=sr, data=signal)
    scipy.io.wavfile.write(effect_audio_path, rate=sr, data=wet)
else:
    signal, _ = librosa.load(clean_audio_path, sr=sr)
    wet, _ = librosa.load(effect_audio_path, sr=sr)
    

# Size of frames in training dataset
training_dataset_ = (int) ((len(signal) / frame) * (1 - test_ratio))

# Size of frames for testing (not the proper testset, details below)
testing_dataset_ = (int) ((len(signal) / frame) * test_ratio)

# Whether to filter the audio
filtered = False

# Creating a high pass filter
numtaps = 91
cutoff = 0.015
b = scipy.signal.firwin(numtaps, cutoff, width=None, window='hamming', pass_zero='highpass')

# Creating a lowpass filter
numtaps = 41
cutoff = 0.92

b2 = scipy.signal.firwin(numtaps, cutoff, width=None, window='hamming', pass_zero='lowpass')

# Optionally high pass audio to emphasize high frequency information, low pass to avoid aliasing artifacts
if filtered is True:
    # High Pass Filter
    signal = scipy.signal.lfilter(b, 1, signal)
    wet = scipy.signal.lfilter(b, 1, wet)
    # Low Pass Filter
    signal = scipy.signal.lfilter(b2, 1, signal)
    wet = scipy.signal.lfilter(b2, 1, wet)

In [None]:
print('Comparing the original dry audio to the wet audio as a reference' )
print('Mean absolute error: %.4f'% metrics.mean_absolute_error(signal, wet))
print('Mean squared error: %.4f'% metrics.mean_squared_error(signal, wet))
print('Coefficient of determination (R2 score): %.4f'% metrics.r2_score(signal, wet))

In [None]:
train_length = (int) (len(signal) * (1 - test_ratio))
test_length = (int) (len(signal) * test_ratio)

features_train = signal[0:train_length]
features_test = signal[train_length:train_length + test_length]

targets_train = wet[0:train_length]
targets_test = wet[train_length:train_length + test_length]

length_in_seconds = features_train.size / sr
length_for_wet = targets_train.size / sr
print('Length of training dry audio is {} seconds'.format(length_in_seconds)) 
print('Length of training wet audio is {} seconds'.format(length_for_wet))

# Constructing the baseline models

In [None]:
# Define the GAN architecture
class Generator(tf.keras.models.Model):
    def __init__(self):
        super(Generator, self).__init__()
        self.dense1 = tf.keras.layers.Dense(256, activation='relu')
        self.dense2 = tf.keras.layers.Dense(512, activation='relu')
        self.dense3 = tf.keras.layers.Dense(frame * chunk, activation='tanh')

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        return self.dense3(x)

class Discriminator(tf.keras.models.Model):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.dense1 = tf.keras.layers.Dense(512, activation='relu')
        self.dense2 = tf.keras.layers.Dense(256, activation='relu')
        self.dense3 = tf.keras.layers.Dense(1, activation='sigmoid')

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        return self.dense3(x)

# Initialize generator, discriminator, and GAN
generator = Generator()
discriminator = Discriminator()

model = tf.keras.Sequential([generator, discriminator], name=model_name_save)

# Define loss functions
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

# Define optimizers
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

model.summary()

# Training the Model

In [None]:
noise_dim = 100

def generate_noise(batch_size, noise_dim):
    return tf.random.normal([batch_size, noise_dim])

@tf.function
def train_step(audio, effects):
    noise = generate_noise(tf.shape(audio)[0], noise_dim)
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_audio = generator(tf.concat([noise, effects], axis=1), training=True)

        real_output = discriminator(tf.concat([audio, effects], axis=1), training=True)
        fake_output = discriminator(tf.concat([generated_audio, effects], axis=1), training=True)

        gen_loss = cross_entropy(tf.ones_like(fake_output), fake_output)
        disc_loss_real = cross_entropy(tf.ones_like(real_output), real_output)
        disc_loss_fake = cross_entropy(tf.zeros_like(fake_output), fake_output)
        disc_loss = disc_loss_real + disc_loss_fake

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    return gen_loss, disc_loss

# Define function to apply effect to audio
def apply_effect(audio, batch_size):
    noise = generate_noise(1, noise_dim)
    generated_audio = []
    counter = 0

    for i in range(0, len(audio), batch_size):
        batch_audio = []
        
        if (audio[i:i+batch_size].size == batch_size):
            batch_audio.append(audio[i:i+batch_size])
            generated_batch = generator(tf.concat([noise,batch_audio], axis=1), training=False)
            
            generated_audio = np.hstack([generated_audio, generated_batch[0]])
            counter += 1
            print('Batch {} / {}'.format(counter, (int) (len(audio) / batch_size)), end='\r')
    return generated_audio

# train GAN
def train_gan(audio, effects, epochs, batch_size):
    gen_loss = 0
    disc_loss = 0
    
    for epoch in range(epochs):
        print('Epoch {}'.format(epoch + 1))
        start = time.time()
        counter = 0
        
        for i in range(0, len(audio), batch_size):
            batch_audio = []
            batch_effects = []
            
            if (audio[i:i+batch_size].size == batch_size and effects[i:i+batch_size].size == batch_size):
                batch_audio.append(audio[i:i+batch_size])
                batch_effects.append(effects[i:i+batch_size])

                gen_loss, disc_loss = train_step(batch_audio, batch_effects)
                counter += 1
                print('Batch %d / %d' % (counter, (int) (len(audio) / batch_size)), end='\r')
                
                
        print ('Time for epoch %d is %.4f sec, generator loss %.4f, discriminator loss %.4f' % (
            epoch + 1, time.time()-start, gen_loss, disc_loss
        ))

In [None]:
train_gan(features_train, targets_train, epochs=epochs_, batch_size=batch_size_para)

In [None]:
tar_pred = apply_effect(features_test, batch_size_para)
train_tar_pred = apply_effect(features_train, batch_size_para)

# Mean squared error (lower the better)
print('Mean squared error: {}'.format(metrics.mean_squared_error(targets_test[:len(tar_pred)], tar_pred)))

# Mean absolute error (lower the better)
print('Mean absolute error: %.4f'% metrics.mean_absolute_error(targets_test[:len(tar_pred)], tar_pred))

# Median absolute error (lower the better)
print('Median absolute error: %.4f'% metrics.median_absolute_error(targets_test[:len(tar_pred)], tar_pred))

# Coefficient of determination (r2 score): 1 is perfect prediction (it can get arbitrary negative)
print('Coefficient of determination (R2 score): %.4f'% metrics.r2_score(targets_test[:len(tar_pred)], tar_pred))

# Explained variance score: 1 is perfect prediction (it can get arbitrary worse)
print('Explained variance score: %.4f'% metrics.explained_variance_score(targets_test[:len(tar_pred)], tar_pred))
    
model_utils.plot_result(targets_train[:len(train_tar_pred)], targets_test[:len(tar_pred)], train_tar_pred, tar_pred)

In [None]:
# Saving the model (ignore warnings)
model.save(os.path.join('dataset', 'models',model_name_save + '.keras'))

# Evaluating the Model

## Work in Progress
Not complete.