In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import librosa
import librosa.display
import IPython.display as ipd
import matplotlib.pyplot as plt
import scipy.io.wavfile as wavf
import math

# Load and Normalize Data

In [None]:
def load_mel_and_names(group):
    # group in ('train','test','valid')
    X_mel = pd.read_csv("Downloads/guitar/nsynth-" + group + "/X_mel_128.csv").drop(columns=['Unnamed: 0']).to_numpy()
    X_names = pd.read_csv("Downloads/guitar/nsynth-" + group + "/X_names.csv").drop(columns=['Unnamed: 0']).to_numpy()
    
    return X_mel, X_names

def normalize_mel(X_mel, avg, std):
    X_mel = (X_mel - avg) / std
    X_mel = np.swapaxes(np.reshape(X_mel, (X_mel.shape[0], 128, 173, 1)), 1,2)
    
    return X_mel

In [None]:
X_mel, X_names = load_mel_and_names('train')
X_mel_valid, X_names_valid = load_mel_and_names('valid')
X_mel_test, X_names_test = load_mel_and_names('test')

# Global Norm of Training Set
avg = np.average(np.sum(X_mel,axis=0)/X_mel.shape[0])
std = np.std(np.sum(X_mel,axis=0)/X_mel.shape[0])

X_mel = normalize_mel(X_mel, avg, std)
X_mel_valid = normalize_mel(X_mel_valid, avg, std)
X_mel_test = normalize_mel(X_mel_valid, avg, std)

print(X_mel.shape, X_mel_valid.shape, X_mel_test.shape)

## Create Encoder/Decoder Architecture

In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding the input."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [None]:
def build_encoder(latent_dim, lstm_dim, units=[32,32,64,64], kernel_sizes=[3,3,3,3], strides=[2,2,2,2]):
    encoder_inputs = keras.Input(shape=(173, 128,1))
    for i, (unit, kernel_size, stride) in enumerate(zip(units,kernel_sizes,strides)):
        if i == 0:
            x = layers.Conv2D(unit, (kernel_size), activation="relu", strides=(stride), padding="same")(encoder_inputs)
        else:
            x = layers.Conv2D(unit, (kernel_size), activation="relu", strides=(stride), padding="same")(x)
    x = layers.TimeDistributed(layers.Flatten())(x)
    x = layers.TimeDistributed(layers.Dense(lstm_dim, activation="relu"))(x)
    x = layers.LSTM(lstm_dim, activation="tanh", return_sequences=False, dropout=0.1)(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])
    encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
    encoder.summary()
    
    return encoder

In [None]:
def _conv_shape(strides, dim_size=[173,128]):
    for i in strides:
        dim_size = [math.ceil(x / i) for x in dim_size]
    return dim_size

In [None]:
def build_decoder(latent_dim, lstm_dim, units=[32,32,64,64], kernel_sizes=[3,3,3,3], strides=[2,2,2,2]):
    conv_shape = _conv_shape(strides)
    units.reverse()
    kernel_sizes.reverse()
    strides.reverse()

    latent_inputs = keras.Input(shape=(latent_dim,))
    x = layers.RepeatVector(conv_shape[0])(latent_inputs)
    if latent_dim != lstm_dim:
        x = layers.TimeDistributed(layers.Dense(lstm_dim, activation="relu"))(x)
    x = layers.LSTM(lstm_dim, activation="tanh", return_sequences=True, dropout=0.1)(x)
    x = layers.TimeDistributed(layers.Dense(conv_shape[1] * units[0], activation="relu"))(x)
    x = layers.Reshape((conv_shape[0], conv_shape[1], units[0]))(x)
    for i, (unit, kernel_size, stride) in enumerate(zip(units,kernel_sizes,strides)): 
        x = layers.Conv2DTranspose(unit, (kernel_size), activation="relu", strides=(stride), padding="same")(x)
    x = layers.Cropping2D(cropping=((0, 3), (0, 0)))(x)
    decoder_outputs = layers.Conv2DTranspose(1, 3, activation="linear", padding="same")(x)
    decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
    decoder.summary()
    
    return decoder

## Create VAE Architecture

In [None]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.mean_squared_error(data, reconstruction), axis=(1,2)
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

In [None]:
def build_vae(latent_dim, lstm_dim, units=[32,32,64,64], kernel_sizes=[3,3,3,3], strides=[2,2,2,2]):
    encoder = build_encoder(latent_dim, lstm_dim, units, kernel_sizes, strides)
    decoder = build_decoder(latent_dim, lstm_dim, units, kernel_sizes, strides)
    vae = VAE(encoder, decoder)
    vae.compile(optimizer=keras.optimizers.Adam())    
    return vae

## Create and train VAE

In [None]:
vae = build_vae(latent_dim = 128, lstm_dim = 128)

Load Weights (optional)

In [None]:
vae.load_weights('Downloads/vae-weights/vae37')

In [None]:
vae.fit(X_mel, epochs=20, batch_size=128)

## Compute Validation Loss

In [None]:
def compute_val_loss(vae,X_mel_valid):
    z_mean, z_log_var, z = vae.encoder.predict(X_mel_valid)
    y = vae.decoder.predict(z)
    loss = np.swapaxes(np.abs(y-X_mel_valid), 1,2)
    loss_i = np.sum(loss, axis=(1,2))
    print(np.average(loss_i), np.std(loss_i))
    return y, loss

In [None]:
y, loss = compute_val_loss(vae,X_mel_valid)

In [None]:
amax = np.sum(loss, axis=(1,2,3)).argsort()[-100:]
amin = np.sum(loss, axis=(1,2,3)).argsort()[:100]
X_names_valid_min = [X_names_valid[m][0] for m in amin]
X_names_valid_max = [X_names_valid[m][0] for m in amax]
loss_min = np.sum([loss[m] for m in amin], axis=(1,2,3))
loss_max = np.sum([loss[m] for m in amax], axis=(1,2,3))
loss_map = np.sum(loss, axis=0)/X_mel_valid.shape[0]
loss_map_max = np.sum([loss[i] for i in amax],axis=0)/X_mel_valid.shape[0]
loss_map_min = np.sum([loss[i] for i in amin],axis=0)/X_mel_valid.shape[0]

## Plot Losses

This helps show where we are losses are on the whole dataset, on the worst predictions, and the best predictions<br>
<br>
**loss_map** = average losses from every validation input<br>
**loss_map_max** = average losses from the worst 100 validation losses<br>
**loss_map_min** = average losses from the best 100 validation losses<br>

In [None]:
for i in [loss_map, loss_map_max, loss_map_min]:
    plt.figure(figsize=(25, 10))
    librosa.display.specshow(np.reshape(i, (128, 173)), 
                             x_axis="time",
                             y_axis="mel", 
                             sr=22050)
    plt.colorbar(format="%+2.f")
    plt.show()

## Plot Spectograms

In [None]:
def plot(ind, y, X_mel_valid):
    print("Real")
    plt.figure(figsize=(25, 10))
    librosa.display.specshow(np.swapaxes(np.reshape(X_mel_valid[ind], (173, 128)), 0,1), 
                             x_axis="time",
                             y_axis="mel", 
                             sr=22050)
    plt.colorbar(format="%+2.f")
    plt.show()
    print("Generated")
    plt.figure(figsize=(25, 10))
    librosa.display.specshow(np.swapaxes(np.reshape(y[ind], (173, 128)), 0,1), 
                             x_axis="time",
                             y_axis="mel", 
                             sr=22050)
    plt.colorbar(format="%+2.f")
    plt.show()

Plots for the worst 20 predictions

In [None]:
for m in amax[-20:]:
    plot(m, y, X_mel_valid)

Plots for the best 20 predictions

In [None]:
for m in amin[-20:]:
    plot(m, y, X_mel_valid)

## Create wav file from predictions

In [None]:
def mel_to_audio(y, valid, index):
    pred =  np.swapaxes(y[index].reshape(173,128), 0,1)*std + avg
    valid =  np.swapaxes(X_mel_valid[index].reshape(173,128), 0,1)*std + avg
    
    pred_mel = librosa.db_to_power(pred)
    valid_mel = librosa.db_to_power(valid)
    
    pred_audio = librosa.feature.inverse.mel_to_audio(pred_mel, sr=22050, n_fft=2048, hop_length=512, win_length=None, 
                                                    window='hann', center=True, pad_mode='reflect', power=2.0, n_iter=32)
    valid_audio = librosa.feature.inverse.mel_to_audio(valid_mel, sr=22050, n_fft=2048, hop_length=512, win_length=None, 
                                                    window='hann', center=True, pad_mode='reflect', power=2.0, n_iter=32)
    
    wavf.write('pred.wav', 22050, pred_audio)
    wavf.write('valid.wav', 22050, valid_audio)

In [None]:
# Change index for specific sample in X_mel_valid
mel_to_audio(y, X_mel_valid, index=1) 

In [None]:
ipd.Audio('valid.wav')

In [None]:
ipd.Audio('pred.wav')