In [None]:
import tensorflow as tf

import numpy as np

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, \
    Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda

import tensorflow.keras.backend as K

from tensorflow.keras.optimizers import Adam

import numpy as np

tf.compat.v1.disable_eager_execution()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip drive/MyDrive/'PGP NLP'/fsdd.zip -d ./data/

Archive:  drive/MyDrive/PGP NLP/fsdd.zip
   creating: ./data/eval/
  inflating: ./data/eval/File1.wav   
  inflating: ./data/eval/File2.wav   
  inflating: ./data/eval/File3.wav   
  inflating: ./data/eval/File4.wav   
  inflating: ./data/eval/File5.wav   
   creating: ./data/train/
  inflating: ./data/train/0_george_0.wav  
  inflating: ./data/train/0_george_1.wav  
  inflating: ./data/train/0_george_10.wav  
  inflating: ./data/train/0_george_11.wav  
  inflating: ./data/train/0_george_12.wav  
  inflating: ./data/train/0_george_13.wav  
  inflating: ./data/train/0_george_14.wav  
  inflating: ./data/train/0_george_15.wav  
  inflating: ./data/train/0_george_16.wav  
  inflating: ./data/train/0_george_17.wav  
  inflating: ./data/train/0_george_18.wav  
  inflating: ./data/train/0_george_19.wav  
  inflating: ./data/train/0_george_2.wav  
  inflating: ./data/train/0_george_20.wav  
  inflating: ./data/train/0_george_21.wav  
  inflating: ./data/train/0_george_22.wav  
  inflating: ./

## Data Preparation

In [None]:
import librosa
import os

In [None]:
FILE_PATH = './data/'
FRAME_SIZE = 512
HOP_LENGTH=256
SAMPLE_RATE = 22050
DURATION = 0.74
num_samples = int(SAMPLE_RATE*DURATION)

In [None]:
def normalize(arr):
    min_val = arr.min()
    max_val = arr.max()
    
    return (arr-min_val)/(max_val - min_val),min_val, max_val

def denormalize(arr, min_val, max_val):
    return arr*(max_val - min_val)+ min_val

In [None]:
mat = np.array([[0,1,2],[4,5,6],[7,8,9]])
print(mat)
a, b, c = normalize(mat)
print(a)
print(b)
print(c)

[[0 1 2]
 [4 5 6]
 [7 8 9]]
[[0.         0.11111111 0.22222222]
 [0.44444444 0.55555556 0.66666667]
 [0.77777778 0.88888889 1.        ]]
0
9


In [None]:
denormalize(a,b,c)

array([[0., 1., 2.],
       [4., 5., 6.],
       [7., 8., 9.]])

In [None]:
import IPython.display as ipd
ipd.Audio('./data/train/0_george_0.wav')

In [None]:
def process_data(fp):
  spectrograms = {'file_name' : [], 'spec': [], 'spec_min': [], 'spec_max': []}
  for file_name in os.listdir(fp):
    signal,_ = librosa.load(fp+file_name, 
                            sr=SAMPLE_RATE, 
                            duration=DURATION, 
                            mono=True)
    if len(signal)<=num_samples:
      signal = np.pad(signal, (0, num_samples-len(signal)))
    else:
      signal = signal[:num_samples]
    
    log_spec = librosa.amplitude_to_db(np.abs(librosa.stft(signal, 
                                                           n_fft=FRAME_SIZE, 
                                                           hop_length=HOP_LENGTH)[:-1]))
    log_spec_n, spec_min, spec_max = normalize(log_spec)
    spectrograms['file_name'].append(file_name)
    spectrograms['spec'].append(log_spec_n)
    spectrograms['spec_min'].append(spec_min)
    spectrograms['spec_max'].append(spec_max)
  return spectrograms


In [None]:
train = process_data(FILE_PATH+"train/")
eval = process_data(FILE_PATH+"eval/")

In [None]:
len(eval['spec'])

5

In [None]:
X_train = np.array(train['spec'])
X_test = np.array(eval['spec'])

In [None]:
print(f"{X_train.shape}  and {X_test.shape}")

(3000, 256, 64)  and (5, 256, 64)


In [None]:
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2],1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2],1)

In [None]:
class VAE:
    # Variational Autoencoder has been taken from the same class as AE
    # Except that changes are made in the bottleneck layer
    # and the custom loss function which combines the KL divergence as part
    # of the total loss.

    def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,
                 conv_strides,
                 latent_space_dim):
        self.input_shape = input_shape 
        self.conv_filters = conv_filters 
        self.conv_kernels = conv_kernels 
        self.conv_strides = conv_strides 
        self.latent_space_dim = latent_space_dim
        self.reconstruction_loss_weight = 1000

        self.encoder = None
        self.decoder = None
        self.model = None

        self._num_conv_layers = len(conv_filters)
        self._shape_before_bottleneck = None
        self._model_input = None

        self._build()

    def _build(self):
        self._build_encoder()
        self._build_decoder()
        self._build_autoencoder()

    def _build_encoder(self):
        encoder_input = self._add_encoder_input()
        conv_layers = self._add_conv_layers(encoder_input)
        bottleneck = self._add_bottleneck(conv_layers)
        self._model_input = encoder_input
        self.encoder = Model(encoder_input, bottleneck, name="encoder")

    def _add_encoder_input(self):
        return Input(shape=self.input_shape, name="encoder_input")

    def _add_conv_layers(self, encoder_input):
        """Create all convolutional blocks in encoder."""
        x = encoder_input
        for layer_index in range(self._num_conv_layers):
            x = self._add_conv_layer(layer_index, x)
        return x

    def _add_conv_layer(self, layer_index, x):
        """Add a convolutional block to a graph of layers, consisting of
        conv 2d + ReLU + batch normalization.
        """
        layer_number = layer_index + 1
        conv_layer = Conv2D(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"encoder_conv_layer_{layer_number}"
        )
        x = conv_layer(x)
        x = ReLU(name=f"encoder_relu_{layer_number}")(x)
        x = BatchNormalization(name=f"encoder_bn_{layer_number}")(x)
        return x

    # Change from AE
    def _add_bottleneck(self, x):
        """Flatten data and add bottleneck with Guassian sampling (Dense
        layer).
        """
        self._shape_before_bottleneck = K.int_shape(x)[1:]
        x = Flatten()(x)
        # The change with AE is with the bottleneck layer
        # We have two Dense layers to estimate the Mean and Variance
        self.mu = Dense(self.latent_space_dim, name="mu")(x)
        self.log_variance = Dense(self.latent_space_dim,
                                  name="log_variance")(x)
        # We define a function to compute the sample point from the distribution
        def sample_point_from_normal_distribution(args):
            mu, log_variance = args
            epsilon = K.random_normal(shape=K.shape(self.mu), mean=0.,
                                      stddev=1.)
            sampled_point = mu + K.exp(log_variance / 2) * epsilon
            return sampled_point

        x = Lambda(sample_point_from_normal_distribution,
                        name="encoder_output")([self.mu, self.log_variance])
        return x

    def _build_decoder(self):
        decoder_input = self._add_decoder_input()
        dense_layer = self._add_dense_layer(decoder_input)
        reshape_layer = self._add_reshape_layer(dense_layer)
        conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
        decoder_output = self._add_decoder_output(conv_transpose_layers)
        self.decoder = Model(decoder_input, decoder_output, name="decoder")

    def _add_decoder_input(self):
        return Input(shape=self.latent_space_dim, name="decoder_input")

    def _add_dense_layer(self, decoder_input):
        num_neurons = np.prod(self._shape_before_bottleneck) # [1, 2, 4] -> 8
        dense_layer = Dense(num_neurons, name="decoder_dense")(decoder_input)
        return dense_layer

    def _add_reshape_layer(self, dense_layer):
        return Reshape(self._shape_before_bottleneck)(dense_layer)

    def _add_conv_transpose_layers(self, x):
        # loop through all the conv layers in reverse order and stop at the
        # first layer
        for layer_index in reversed(range(1, self._num_conv_layers)):
            x = self._add_conv_transpose_layer(layer_index, x)
        return x

    def _add_conv_transpose_layer(self, layer_index, x):
        layer_num = self._num_conv_layers - layer_index
        conv_transpose_layer = Conv2DTranspose(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"decoder_conv_transpose_layer_{layer_num}"
        )
        x = conv_transpose_layer(x)
        x = ReLU(name=f"decoder_relu_{layer_num}")(x)
        x = BatchNormalization(name=f"decoder_bn_{layer_num}")(x)
        return x

    def _add_decoder_output(self, x):
        conv_transpose_layer = Conv2DTranspose(
            filters=1,
            kernel_size=self.conv_kernels[0],
            strides=self.conv_strides[0],
            padding="same",
            name=f"decoder_conv_transpose_layer_{self._num_conv_layers}"
        )
        x = conv_transpose_layer(x)
        output_layer = Activation("sigmoid", name="sigmoid_layer")(x)
        return output_layer

    def _build_autoencoder(self):
        model_input = self._model_input
        model_output = self.decoder(self.encoder(model_input))
        self.model = Model(model_input, model_output, name="autoencoder")

    def summary(self):
        self.encoder.summary()
        self.decoder.summary()
        self.model.summary()

    def compile(self, learning_rate=0.0001):
        optimizer = Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=optimizer,
                           loss=self._calculate_combined_loss)

    def train(self, x_train, batch_size, num_epochs):
        self.model.fit(x_train,
                       x_train,
                       batch_size=batch_size,
                       epochs=num_epochs,
                       shuffle=True)
    # We define a custom loss function to combine reconstruction loss
    # and kl loss
    def _calculate_combined_loss(self, y_target, y_predicted):
        reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
        kl_loss = self._calculate_kl_loss(y_target, y_predicted)
        combined_loss = self.reconstruction_loss_weight * reconstruction_loss\
                                                         + kl_loss
        return combined_loss

    def _calculate_reconstruction_loss(self, y_target, y_predicted):
        error = y_target - y_predicted
        reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
        return reconstruction_loss

    def _calculate_kl_loss(self, y_target, y_predicted):
        kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) -
                               K.exp(self.log_variance), axis=1)
        return kl_loss

In [None]:
ve = VAE(input_shape=(256, 64, 1),
         conv_filters=(256,256, 256, 128, 64, 32),
         conv_kernels = (3,3,3,3,3,3),
         conv_strides = (2,2,2,2,2,(2,1)),
         latent_space_dim=128)

Instructions for updating:
Colocations handled automatically by placer.


In [None]:
LEARNING_RATE = 0.001
BATCH_SIZE = 32
EPOCHS = 5

In [None]:
ve.compile(LEARNING_RATE)

In [None]:
ve.train(X_train, batch_size = BATCH_SIZE, num_epochs = EPOCHS)

Train on 3000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [None]:
generated_log_spec = ve.model.predict(X_test)

  updates=self.state_updates,


In [None]:
generated_log_spec.shape

(5, 256, 64, 1)

In [None]:
pred_val = generated_log_spec[:,:,:,0]
np.reshape(generated_log_spec, (5,256,64)).shape

(5, 256, 64)

In [None]:
pred_val.shape

(5, 256, 64)

In [None]:
signals = []
for i in range(pred_val.shape[0]):
  sig = denormalize(pred_val[i], min_val = eval['spec_min'][i], max_val = eval['spec_max'][i])
  sig_db_amp = librosa.db_to_amplitude(sig)
  outp = librosa.istft(sig_db_amp, hop_length=HOP_LENGTH)
  outp = librosa.db_to_amplitude(librosa.istft(sig, hop_length=HOP_LENGTH))
  fn = eval['file_name'][i].split(".")
  sf.write(file="./data/"+fn[0]+"-gen1."+fn[1],data = outp, samplerate=SAMPLE_RATE)
  signals.append(sig)

In [None]:
os.listdir('./data/')

['File2-gen.wav',
 'File2-gen1.wav',
 'File1-gen1.wav',
 'File5-gen1.wav',
 'File4-gen.wav',
 'File5-gen.wav',
 'File3-gen1.wav',
 'File3-gen.wav',
 'train',
 'File4-gen1.wav',
 'eval',
 'File1-gen.wav']

In [None]:
import soundfile as sf

In [None]:
os.listdir('./data')

['File2-gen.wav',
 'File4-gen.wav',
 'File5-gen.wav',
 'File3-gen.wav',
 'train',
 'eval',
 'File1-gen.wav']

In [None]:
import IPython.display as ipd

In [None]:
ipd.Audio('./data/File1-gen1.wav')

In [None]:
ipd.Audio('./data/eval/File1.wav')