In [None]:
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
from IPython.display import Audio
import librosa
import json
from IPython.core.display import display


In [2]:
tf.__version__

'2.13.0'

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
#@title Hyperparameters
#@title Hyperparameters
learning_rate = 0.0005 #param {type:"raw"}
num_epochs_to_train =  120#param {type:"integer"}
batch_size =  64#param {type:"integer"}
vector_dimension = 64 #param {type:"integer"}

hop=256               #hop size (window size = 4*hop)
frame_size=512
fs=16000              #sampling rate
min_level_db=-100     #reference values to normalize data
ref_level_db=20

LEARNING_RATE = learning_rate
BATCH_SIZE = batch_size
EPOCHS = num_epochs_to_train
VECTOR_DIM=vector_dimension

shape=128           #length of time axis of split specrograms


In [None]:
#@title Prepare Dataset
def prepareDataset(data):
  numberOfClips = 1000 #param #number of audio files to use in training
  smallerData = data.take(numberOfClips)
  # get audio arrays
  specArrays = []
  for i in smallerData:
    audio = i['audio'].numpy()
    stft = librosa.stft(audio, n_fft=frame_size, hop_length=hop)[:-1]
    spectrogram = np.abs(stft)
    log_spectrogram = librosa.amplitude_to_db(spectrogram)
    norm_array = (log_spectrogram - log_spectrogram.min()) / (log_spectrogram.max() - log_spectrogram.min())
     # create normalized spectrogram arrays
    specArrays.append(norm_array)
  specArrays = np.array(specArrays,dtype=np.float32)
  return specArrays

x_train = prepareDataset(data)
x_train = x_train[..., np.newaxis]

In [5]:
#@title Load NSynth Dataset

data = tfds.load("nsynth", split='train[50%:]', shuffle_files=True, data_dir="data")



In [16]:
#@title Variational Autoencoder Class
from keras import Model
from keras.layers import Input, Conv2D, ReLU, BatchNormalization, Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from keras.optimizers.legacy import Adam
from keras.losses import MeanSquaredError
from keras import backend as K
import os
import pickle

class VAE:
  def __init__(self,
             input_shape,
             conv_filters,
             conv_kernels,
             conv_strides,
             latent_space_dim):
    self.input_shape = input_shape
    self.conv_filters = conv_filters
    self.conv_kernels = conv_kernels
    self.conv_strides = conv_strides
    self.latent_space_dim = latent_space_dim
    self.reconstruction_loss_weight = 1000000

    self.encoder = None
    self.decoder = None
    self.model = None

    self.num_conv_layers = len(conv_filters)-1
    self.shape_before_bottleneck = None
    self.model_input = None

    self.build()

  def summary(self):
    self.encoder.summary()
    self.decoder.summary()
    self.model.summary()

  def compile(self, learning_rate = .0001):
    optimizer = Adam(learning_rate = learning_rate)
    self.model.compile(optimizer=optimizer,
                              loss=self.calculate_combined_loss,
                           metrics=[self.calculate_reconstruction_loss,
                                    self.calculate_kl_loss])

  def train(self, x_train, batch_size, num_epochs):
      self.model.fit(x_train,
                     x_train,
                     batch_size = batch_size,
                     epochs = num_epochs,
                     shuffle = True)

  def save(self, save_folder="."):
      self.create_folder_if_it_doesnt_exist(save_folder)
      self.save_parameters(save_folder)
      self.save_weights(save_folder)

  def load_weights(self, weights_path):
      self.model.load_weights(weights_path)

  def reconstruct(self, images):
      latent_representations = self.encoder.predict(images)
      reconstructed_images = self.decoder.predict(latent_representations)
      return reconstructed_images, latent_representations

  @classmethod
  def load(cls, save_folder="."):
      parameters_path = os.path.join(save_folder, "parameters.pkl")
      with open(parameters_path, "rb") as f:
          parameters = pickle.load(f)
      autoencoder = VAE(*parameters)
      weights_path = os.path.join(save_folder, "weights.h5")
      autoencoder.load_weights(weights_path)
      return autoencoder

  def calculate_combined_loss(self, y_target, y_predicted):
      reconstruction_loss = self.calculate_reconstruction_loss(y_target, y_predicted)
      kl_loss = self.calculate_kl_loss(y_target, y_predicted)
      combined_loss = self.reconstruction_loss_weight * reconstruction_loss\
                                                       + kl_loss
      return combined_loss

  def calculate_reconstruction_loss(self, y_target, y_predicted):
      error = y_target - y_predicted
      reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
      return reconstruction_loss

  def calculate_kl_loss(self, y_target, y_predicted):
      kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) -
                             K.exp(self.log_variance), axis=1)
      return kl_loss

  def create_folder_if_it_doesnt_exist(self, folder):
    if not os.path.exists(folder):
        os.makedirs(folder)

  def save_parameters(self, save_folder):
    parameters = [
          self.input_shape,
          self.conv_filters,
          self.conv_kernels,
          self.conv_strides,
          self.latent_space_dim
      ]
    save_path = os.path.join(save_folder, "parameters.pkl")
    with open(save_path, "wb") as f:
        pickle.dump(parameters, f)

  def save_weights(self, save_folder):
    save_path = os.path.join(save_folder, "weights.h5")
    self.model.save_weights(save_path)

  def build(self):
    self.build_encoder()
    self.build_decoder()
    self.build_autoencoder()

  def build_autoencoder(self):
    model_input = self.model_input
    model_output = self.decoder(self.encoder(model_input))
    self.model = Model(model_input, model_output, name="autoencoder")
  # decoder stuff=======================================================
  def build_decoder(self):
    input = self.add_decoder_input()
    dense = self.add_dense_layer(input)
    reshape = self.add_reshape_layer(dense)
    transposed = self.add_conv_transpose_layers(reshape)
    decoder_output = self.add_decoder_output(transposed)
    self.decoder = Model(input, decoder_output, name="decoder")

  def add_decoder_input(self):
    return Input(shape=self.latent_space_dim, name='decoder_input')

  def add_dense_layer(self, decoder_input):
    num_neurons = np.prod(self.shape_before_bottleneck)
    dense_layer = Dense(num_neurons, name="decoder_dense")(decoder_input)
    return dense_layer

  def add_reshape_layer(self, dense_layer):
    return Reshape(self.shape_before_bottleneck)(dense_layer)

  def add_conv_transpose_layers(self, x):
    for layer_idx in reversed(range(1, self.num_conv_layers)):
      x = self.add_conv_transpose_layer(layer_idx, x)
    return x

  def add_conv_transpose_layer(self, layer_index, x):
    layer_num = self.num_conv_layers - layer_index
    conv_transpose_layer = Conv2DTranspose(filters=self.conv_filters[layer_index],
                                           kernel_size=self.conv_kernels[layer_index],
                                           strides=self.conv_strides[layer_index],
                                           padding="same",
                                           name=f"decoder_conv_transpose_layer_{layer_num}"
                                             )
    x = conv_transpose_layer(x)
    x = ReLU(name=f"decoder_relu_{layer_num}")(x)
    x = BatchNormalization(name=f"decoder_bn_{layer_num}")(x)
    return x

  def add_decoder_output(self, x):
    conv_transpose_layer = Conv2DTranspose(filters=1,
                                           kernel_size=self.conv_kernels[0],
                                           strides=self.conv_strides[0],
                                           padding="same",
                                           name=f"decoder_conv_transpose_layer_{self.num_conv_layers}"
                                          )
    x = conv_transpose_layer(x)
    output_layer = Activation("sigmoid", name="sigmoid_layer")(x)
    return output_layer

  # encoder stuff=======================================================

  def build_encoder(self):
    encoder_input = self.add_encoder_input()
    convLayers = self.add_conv_layers(encoder_input)
    bottleneck = self.add_bottleneck(convLayers)
    self.model_input = encoder_input
    self.encoder  = Model(encoder_input, bottleneck, name='encoder')

  def add_encoder_input(self):
    return Input(shape=self.input_shape, name='encoder_input')

  def add_conv_layers(self, input):
    x = input
    for layer_idx in range(self.num_conv_layers):
      x = self.add_conv_layer(layer_idx, x)
    return x

  def add_conv_layer(self, idx, x):
    layerNum = idx + 1
    conv_layer = Conv2D(filters=self.conv_filters[idx],
                        kernel_size=self.conv_kernels[idx],
                        strides=self.conv_strides[idx],
                        padding='same',
                        name=f"encoder_transpose_layer_{layerNum}"
                        )
    x = conv_layer(x)
    x = ReLU(name=f"encoder_relu_{layerNum}")(x)
    x = BatchNormalization(name=f"encoder_bn_{layerNum}")(x)
    return x

  def add_bottleneck(self, x):
    self.shape_before_bottleneck = K.int_shape(x)[1:]
    x = Flatten()(x)
    self.mu = Dense(self.latent_space_dim, name="mu")(x)
    self.log_variance = Dense(self.latent_space_dim,
                                name="log_variance")(x)
    def sample_point_from_normal_distribution(args):
        mu, log_variance = args
        epsilon = K.random_normal(shape=K.shape(self.mu), mean=0.,
                                    stddev=1.)
        sampled_point = mu + K.exp(log_variance / 2) * epsilon
        return sampled_point
    x = Lambda(sample_point_from_normal_distribution,
                 name="encoder_output")([self.mu, self.log_variance])
    return x


In [10]:
x_train.shape

(1000, 256, 251, 1)

In [22]:
# @title Train the Model on the dataset
def train(train_data, learning_rate, batch_size, epochs):
  tf.compat.v1.disable_eager_execution()
  autoEncoder = VAE(input_shape=(256, 251, 1),
                            conv_filters=(512, 256, 128, 64, 32),
                            conv_kernels=(3, 3, 3, 3),
                            conv_strides=(2, 2, 2, 2, (2, 1)),
                            latent_space_dim=VECTOR_DIM
                            )
  autoEncoder.summary()
  autoEncoder.compile(learning_rate)
  autoEncoder.train(train_data, batch_size, epochs)
  return autoEncoder

autoencoder = train(train_data=x_train,
                    learning_rate=LEARNING_RATE,
                    batch_size=BATCH_SIZE,
                    epochs=EPOCHS)

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_input (InputLayer)  [(None, 256, 251, 1)]        0         []                            
                                                                                                  
 encoder_transpose_layer_1   (None, 128, 126, 512)        5120      ['encoder_input[0][0]']       
 (Conv2D)                                                                                         
                                                                                                  
 encoder_relu_1 (ReLU)       (None, 128, 126, 512)        0         ['encoder_transpose_layer_1[0]
                                                                    [0]']                         
                                                                                            

AttributeError: ignored