# Sound Compression and Generation using Variational Autoencoders in Python

### Project Outline

**Task 1**: Audio Preprocessing Pipeline

**Task 2**: Training/Validation Split

**Task 3**: Creating Data Loaders

**Task 4**: VAE Architecture and Model Creation

**Task 5**: Training Loop

**Task 6**: Sound Generation

**Task 7**: Results

<img src="Images/vae.png">

In [None]:
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
from torchvision import datasets, transforms
from torchvision.utils import save_image

#progress bar manager
from tqdm.notebook import tqdm

import os
#for creating data set
import shutil
import random
random.seed(5)

import matplotlib
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import librosa
import librosa.display
import pickle

import soundfile as sf
import scipy

import numpy as np
import IPython.display as ipd
import math
import statistics

In [None]:
#no of time model goes over dataset set during 
epochs = 1000
#how many samples are taken per gradient back propagation during training
batch_size = 64
#to preserve reproduceability for the model training 
torch.manual_seed(17)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

## Task 1 : Audio Preprocessing Pipeline

---



We use the Fluent Speech Commands Dataset (https://github.com/Jakobovski/free-spoken-digit-dataset).

In [None]:
"""
1- load a file
2- pad the signal (if necessary)
3 - extracting log spectrogram from signal
4 - normalise spectrogram
5 - save the normalised spectrogram

PreprocessingPipeLine
"""

class Loader:
    """Loader is responsible for loading an audio file."""

    def __init__(self, sample_rate, duration, mono):
        self.sample_rate = sample_rate
        self.duration = duration
        self.mono = mono

    def load(self, file_path):
        #signal is a tuple, we just get the first item (signal itself)
        signal = librosa.load(file_path,
                              sr=self.sample_rate,
                              duration=self.duration,
                              mono=self.mono)[0]
        return signal


class Padder:
    """Padder is responsible to apply padding to an array. Can do different types of padding"""

    def __init__(self, mode="constant"):
        self.mode = mode

    # [1,2,3] -> 2 -> [0,0,1,2,3]
    def left_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (num_missing_items, 0),
                              mode=self.mode)
        return padded_array

    # [1,2,3] -> 2 -> [1,2,3,0,0]
    def right_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (0, num_missing_items),
                              mode=self.mode)
        return padded_array


class LogSpectrogramExtractor:
    """LogSpectrogramExtractor extracts log spectrograms (in dB) from a
    time-series signal.
    """

    def __init__(self, frame_size, hop_length):
        self.frame_size = frame_size
        self.hop_length = hop_length

    def extract(self, signal):
        stft = librosa.stft(signal,
                            n_fft=self.frame_size,
                            hop_length=self.hop_length)[:-1]
        spectrogram = np.abs(stft)
        log_spectrogram = librosa.amplitude_to_db(spectrogram)

        return log_spectrogram

class MinMaxNormaliser:
    """MinMaxNormaliser applies min max normalisation to an array. Aldo denornmalises"""

    def __init__(self, min_val, max_val):
        self.min = min_val
        self.max = max_val

    def normalise(self, array):
        norm_array = (array - array.min()) / (array.max() - array.min())
        norm_array = norm_array * (self.max - self.min) + self.min
        return norm_array

    def denormalise(self, norm_array, original_min, original_max):
        array = (norm_array - self.min) / (self.max - self.min)
        array = array * (original_max - original_min) + original_min
        return array

class Saver:
    """saver is responsible to save features, and the min max values."""

    def __init__(self, feature_save_dir, min_max_values_save_dir):
        self.feature_save_dir = feature_save_dir
        self.min_max_values_save_dir = min_max_values_save_dir

    def save_feature(self, feature, file_path):
        save_path = self._generate_save_path(file_path)
        np.save(save_path, feature)
        return save_path

    def save_min_max_values(self, min_max_values):
        save_path = os.path.join(self.min_max_values_save_dir,
                                 "min_max_values.pkl")
        self._save(min_max_values, save_path)

    @staticmethod
    def _save(data, save_path):
        with open(save_path, "wb") as f:
            pickle.dump(data, f)

    def _generate_save_path(self, file_path):
        file_name = os.path.split(file_path)[1]
        save_path = os.path.join(self.feature_save_dir, file_name + ".npy")
        return save_path

class PreprocessingPipeline:
    """PreprocessingPipeline processes audio files in a directory, applying
    the following steps to each file:
        1- load a file
        2- pad the signal (if necessary)
        3- extracting log spectrogram from signal
        4- normalise spectrogram
        5- save the normalised spectrogram
    Storing the min max values for all the log spectrograms.
    """

    def __init__(self):
        self.padder = None
        self.extractor = None
        self.normaliser = None
        self.saver = None
        self.min_max_values = {}
        self._loader = None
        self._num_expected_samples = None

    @property
    def loader(self):
        return self._loader

    @loader.setter
    def loader(self, loader):
        self._loader = loader
        self._num_expected_samples = int(loader.sample_rate * loader.duration)

    def process(self, audio_files_dir):
        for root, _, files in os.walk(audio_files_dir):
            for file in files:
                file_path = os.path.join(root, file)
                if (os.path.basename(file_path)[0]!= '.'):
                    self._process_file(file_path)
                    print(f"Processed file {file_path}")
        self.saver.save_min_max_values(self.min_max_values)

    def _process_file(self, file_path):
        signal = self.loader.load(file_path)
        if self._is_padding_necessary(signal):
            signal = self._apply_padding(signal)
        feature = self.extractor.extract(signal)
        norm_feature = self.normaliser.normalise(feature)
        save_path = self.saver.save_feature(norm_feature, file_path)
        self._store_min_max_value(save_path, feature.min(), feature.max())

    def _is_padding_necessary(self, signal):
        if len(signal) < self._num_expected_samples:
            return True
        return False

    def _apply_padding(self, signal):
        num_missing_samples = self._num_expected_samples - len(signal)
        padded_signal = self.padder.right_pad(signal, num_missing_samples)
        return padded_signal

    def _store_min_max_value(self, save_path, min_val, max_val):
        self.min_max_values[save_path] = {
            "min": min_val,
            "max": max_val
        }



In [None]:
HOP_LENGTH = 128

In [None]:
FRAME_SIZE = 512
HOP_LENGTH = 128
DURATION = 3.0  # in seconds
SAMPLE_RATE = 11025
MONO = True

SPECTROGRAMS_SAVE_DIR = "/Users/Robert/Desktop/Thesis/spectrograms/all"
MIN_MAX_VALUES_SAVE_DIR = "/Users/Robert/Desktop/Thesis/minmax"
FILES_DIR = "/Users/Robert/Desktop/Thesis/recordings"


SAVE_DIR_ORIGINAL = "/Users/Robert/Desktop/Thesis/reconstructed_signals"
# instantiate all objects
loader = Loader(SAMPLE_RATE, DURATION, MONO )
padder = Padder()
log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
min_max_normaliser = MinMaxNormaliser(0, 1)
saver = Saver(SPECTROGRAMS_SAVE_DIR, MIN_MAX_VALUES_SAVE_DIR)

preprocessing_pipeline = PreprocessingPipeline()
preprocessing_pipeline.loader = loader
preprocessing_pipeline.padder = padder
preprocessing_pipeline.extractor = log_spectrogram_extractor
preprocessing_pipeline.normaliser = min_max_normaliser
preprocessing_pipeline.saver = saver

preprocessing_pipeline.process(FILES_DIR)

## Task 2: Training/Validation Split

In [None]:
spectrograms_folder = '/Users/Robert/Desktop/Thesis/spectrograms/all'
spectrograms = os.listdir(spectrograms_folder)
spectrograms = [spec for spec in spectrograms if spec[0]!= '.']
random.shuffle(spectrograms)

 #where we store training and validation set
train_folder = '/Users/Robert/Desktop/Thesis/spectrograms/train/root'
val_folder = '/Users/Robert/Desktop/Thesis/spectrograms/val/root'

In [None]:
if not train_folder:
    os.mkdir(train_folder)
if not val_folder:
    os.mkdir(val_folder)

    
split = round(0.2 * len(spectrograms))
for spec in tqdm(spectrograms[:split]):
    shutil.copy(spectrograms_folder + '/' + spec, val_folder)

for spec in tqdm(spectrograms[split:]):
    shutil.copy(spectrograms_folder + '/' + spec, train_folder)

## Task 3: Creating Data Loaders

In [None]:
resize_value = 256
train = '/Users/Robert/Desktop/Thesis/spectrograms/train'
val = '/Users/Robert/Desktop/Thesis/spectrograms/val'
def loader(spec_path):
    spectrogram = np.load(spec_path, allow_pickle=True)
    spectrogram = np.array(spectrogram) 
    return spectrogram

transforms_set = transforms.Compose([transforms.ToTensor(), transforms.Resize([256,256])])

#loaders are efficient way of reading in batches of data, can reduce ram
train_loader = torch.utils.data.DataLoader(
    datasets.DatasetFolder(train,loader, extensions = ['.npy',], transform=transforms_set),
    batch_size=batch_size, shuffle=True)
    #using shuffle introduces randomness in training, ensures not learning sequential relations between dataset for each epoch
val_loader = torch.utils.data.DataLoader(
    datasets.DatasetFolder(val,loader, extensions = ['.npy',], transform=transforms_set),
    batch_size=batch_size, shuffle=True)
    #shuffle also ensures we dont see same characters each time in results
    
    
    

In [None]:
for batch in train_loader:
    spec = batch[0][0]
    print(spec.shape)
    break

## Task 4: VAE Architecture and Model Creation

In [None]:
shape = resize_value
lower_dimension = 1000
latent_dimension = 16
#code based on pytorch documentaion on creating a VAE
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
    
        #z represents latent dimension, 32 is chosen due to good tradeoff between reconstruction loss and interesting sampling
        #can be arbituary based on experimentation or datasets
         
        
        #linear layers convert one dimensionality into another. 1st parameter is input, 2nd is output
        # dimentionality reduced slowly
        #we sample from mean and standarde deviation
        self.fc1 = nn.Linear(shape*shape, lower_dimension)
        self.fc21 = nn.Linear(lower_dimension, latent_dimension) #mean
        self.fc22 = nn.Linear(lower_dimension, latent_dimension) #standard deviation
        self.fc3 = nn.Linear(latent_dimension, lower_dimension)
        self.fc4 = nn.Linear(lower_dimension, shape*shape)

    #encode does what we specify above
    def encode(self, x):
        #creates hidden dimensionality/layer h1, does a Nonlinearity called relu, returns mean and standard deviation vectors
        h1 = F.relu(self.fc1(x))
        return self.fc21(h1), self.fc22(h1)
    
    #deals with randomness and distribution that VAE includes, allows proper backpropagation
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return mu + eps*std

    #similar to encode, takes z dimensions and increases dimensionality
    def decode(self, z):
        h3 = F.relu(self.fc3(z))
        return torch.sigmoid(self.fc4(h3))

    def forward(self, x):   
        mu, logvar = self.encode(x.view(-1, shape*shape)) #re-adjusts shape to be flat
            
        #creates latent space
        z = self.reparameterize(mu, logvar)
        print(z)
        return self.decode(z), mu, logvar

In [None]:
def loss_function(recon_x, x, mu, logvar):
    try:  
        BCE = F.binary_cross_entropy(recon_x, x.view(-1, shape*shape), reduction='sum')
    except:
        pass

    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return BCE + KLD

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#moves model onto correct device (cpu or gpu)
model = VAE().to(device)
#adjusts weights whenever we backpropagate, default learning rate is used
optimizer = optim.Adam(model.parameters(), lr=0.5e-3)

In [None]:
print(model)

## Task 5: Training Loop

In [None]:
#after each epoc, generate validation peformance metric to see how model performs on validation data alongside training data
#important to avoid overtraining and to know when model knowns enough or meets expectations
def evaluate(evaluate_data=val_loader):
    #put model into evaluate mode to ensure gradients are frozen and to avoid doing any training - 
    model.eval()
    val_loss = 0
    #not changing gradients with forward passes
    with torch.no_grad():
        for i, (data, _) in enumerate(evaluate_data):
            
            data = data.to(device)
            #run model, return reconstructed spectrogram, mean and variance vectors
            recon_batch, mu, logvar = model(data)
            
            #update validation loss
            try:          
                val_loss += loss_function(recon_batch, data, mu, logvar).item()
            except:
                #print("val error")
                #val_loss+=0
                pass

    #divide validation loss by size of all batches added up to get average loss
    val_loss /= len(evaluate_data.dataset)
    return val_loss

#sample latent space model has learnt and generate new spectrograms
def sample_latent_space():

    with torch.no_grad():
        #sample from normal distribution - 1 spectrogram, 32 is our hidden dimension. So 1 vector of 32 dimensions 
        sample = torch.randn(1, 16).to(device)
        #avoid encoder and go to middle of model to decode data
        sample = model.decode(sample).cpu()
        
        return sample


In [None]:
def train(epoch):

    #set to train mode so certain layers can act as intended
    model.train()
    train_loss = 0
    
    #show what epoch we're on
    progress_bar = tqdm(train_loader, desc='Epoch {:03d}'.format(epoch), leave=False, disable=False)
    for data,_ in progress_bar:

        data = data.to(device)
        
        #set gradients to zero
        optimizer.zero_grad()
        
        #run model, return reconstructed spectrogram batch, mean and variance vectors
        recon_batch, mu, logvar = model(data)
        try:   
            loss = loss_function(recon_batch, data, mu, logvar)
            loss.backward()
            train_loss += loss.item()
            
            #add training loss to progress bar for each loop
            progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item() / len(data))})
            #calculate gradients in order to do backwards propagation
        except:
            #print("train error")
            #train_loss += 0
            pass
    

        #update training loss with current training loss
        optimizer.step()

        

    average_train_loss = train_loss / len(train_loader.dataset)
    tqdm.write('Training set loss (average, epoch {:03d}): {:.3f}'.format(epoch, average_train_loss))
    val_loss = evaluate(val_loader)
    tqdm.write('\t\t\t\t====> Validation set loss: {:.3f}'.format(val_loss))

    train_losses.append(average_train_loss)
    val_losses.append(val_loss)
    
    #save model state dictionairy every X epochs, which contains all layers, layer weights necessary to load in fresh model
    if epoch%50==0:
        torch.save(model.state_dict(), f'/Users/Robert/Desktop/thesis/models/epoch_{epoch}.model')

In [None]:
epochs=1000

In [None]:
train_losses = []
val_losses = []

for epoch in range(1, epochs+1):
    train(epoch)
    #sample_latent_space()

In [None]:
np.savetxt('/Users/Robert/Desktop/thesis/training_losses.txt', np.array(train_losses), delimiter='\n')
np.savetxt('/Users/Robert/Desktop/thesis/validation_losses.txt', np.array(val_losses), delimiter='\n')

In [None]:
#You can use this to load model states that have been saved
#does not need to be ran if you just trained the model
model.load_state_dict(torch.load('/Users/Robert/Desktop/Thesis/models/model11/epoch_1000.model', map_location=torch.device('cpu')))

In [None]:
#just shows output of a latent representations, this is an example of what is eventually conveted to a signal
sample_latent_space()

## Task 6: Sound Generation

In [None]:
class SoundGenerator:
    
  #Responsible for generating audios from spectrograms


    def __init__(self, vae, hop_length):
        
        self.vae = vae
        self.hop_length = hop_length #needed to pass from spectrogram to signal wave form
        self._min_max_normaliser = MinMaxNormaliser(0,1)# needed to denormalise spectrograms

    def generate(self, spectrograms, min_max_values):

        #convert to signals
        #returns signals and latent representations
        
        
        #input sampled spectrograms into model and generate reconstructed specotrgrams
        generated_specs = []
        for spec in spectrograms:
            g_spec,_,_ = model(spec)
            generated_specs.append(g_spec)
        

        #generate signals based on generated specs and min_max_values
        signals = self.convert_spectrograms_to_audio(generated_specs, min_max_values, False)

        return signals

    def convert_spectrograms_to_audio(self, spectrograms, min_max_values, original):
        signals = []
        for log_spectrogram, min_max_value in zip(spectrograms, min_max_values):
            if original == True:
                #print("original signal",log_spectrogram.shape)
                #log_spectrogram = log_spectrogram.cpu().detach().numpy()
                print("original signal",log_spectrogram.shape)
                
            elif original == False:
                log_spectrogram = log_spectrogram.view(256,256)
                log_spectrogram = log_spectrogram.cpu().detach().numpy()
                print("generated signal",log_spectrogram.shape)
            #apply denormalisation
            denorm_log_spec = self._min_max_normaliser.denormalise(log_spectrogram, min_max_value["min"],min_max_value["max"])
            #convert log spectrogram to normal spectrogram
            spec = librosa.db_to_amplitude(denorm_log_spec)
            #apply Griffin-Lim (inverse short time fourier transform)
            signal = librosa.griffinlim(spec)
            #append signal to "signals"
            signals.append(signal)
            
            
        return signals

            
    def convert_latent_to_audio(self, spec):
        signals = []

        spec = spec.view(256,256 )
        spec = spec.cpu().detach().numpy()
        
        print("latent representation",spec.shape)
        spec = self._min_max_normaliser.denormalise(spec, -46.56255,33.43745)
        spec = librosa.db_to_amplitude(spec)
        #apply Griffin-Lim (inverse short time fourier transform)
        signal = librosa.griffinlim(spec)
        signals.append(signal)
        return signals


In [None]:
SPECTROGRAMS_PATH = "/Users/Robert/Desktop/Thesis/spectrograms/all"
MIN_MAX_VALUES_PATH = "/Users/Robert/Desktop/Thesis/minmax/min_max_values.pkl"

SAVE_DIR_ORIGINAL = "/Users/Robert/Desktop/Thesis/reconstructed_signals/original"
SAVE_DIR_GENERATED = "/Users/Robert/Desktop/Thesis/reconstructed_signals/generated"
SAVE_DIR_LATENT = "/Users/Robert/Desktop/Thesis/reconstructed_signals/latent"

#load spectrograms
def load_specs(spectrograms_path):
    specs = []
    file_paths = []
    for root, _, file_names in os.walk(spectrograms_path):
        for file_name in file_names:
            if (file_name[0] != '.'):
                file_path = os.path.join(root, file_name)
                spectrogram = np.load(file_path,allow_pickle=True) # (n_bins, n_frames, 1) 
                specs.append(spectrogram)
                file_paths.append(file_path)      
    specs = np.array(specs)
    #specs = specs[...,np.newaxis]
    return specs, file_paths

#save signal
def save_signals(signals, save_dir, sample_rate=11025):
    for i, signal in enumerate(signals):
        save_path = os.path.join(save_dir, str(i) + ".wav")
        sf.write(save_path, signal, sample_rate)

#select sample of spectrograms and associated min-max values
def select_spectrograms(spectrograms,
                        file_paths,
                        min_max_values,
                        num_spectrograms):
    sampled_indexes = np.random.choice(range(len(spectrograms)), num_spectrograms)
    sampled_spectrograms = spectrograms[sampled_indexes]
    file_paths = [file_paths[index] for index in sampled_indexes]
    
    sampled_min_max_values = [min_max_values[file_path] for file_path in
                           file_paths]

    return sampled_spectrograms, sampled_min_max_values, file_paths



In [None]:
#initialise sound generator
sound_generator = SoundGenerator(model,HOP_LENGTH)

#load min max values
with open(MIN_MAX_VALUES_PATH, "rb") as f:
        min_max_values = pickle.load(f)

In [None]:
#load spectrograms
specs, file_paths = load_specs(SPECTROGRAMS_PATH)
print("done")

In [None]:
male_file = "/Users/Robert/Desktop/Thesis/spectrograms/all/c0e37430-45e5-11e9-b578-494a5b19ab8b.wav.npy"
higher_pitch = "/Users/Robert/Desktop/Thesis/spectrograms/all/higher.wav.npy"

female_file = "/Users/Robert/Desktop/Thesis/spectrograms/all/643ad320-45d8-11e9-81ce-69b74fd7e64e.wav.npy"
lower_pitch = "/Users/Robert/Desktop/Thesis/spectrograms/all/lower.wav.npy"

sampled_min_max_values = [min_max_values[female_file]]
print(sampled_min_max_values)

sampled_specs = []
random_spec = np.load(female_file)
sampled_specs.append(random_spec)

tensor_specs = []
tensor_specs.append(transforms_set(random_spec))

#convert original signals to audio (skipping the model, for comparison)
original_signals = sound_generator.convert_spectrograms_to_audio(sampled_specs, sampled_min_max_values, True)
generated_signals = sound_generator.generate(tensor_specs, sampled_min_max_values)

save_signals(original_signals, SAVE_DIR_ORIGINAL)
save_signals(generated_signals, SAVE_DIR_GENERATED)


In [None]:
#Sample a number of spectrograms from the dataset
sampled_specs, sampled_min_max_values,f_paths = select_spectrograms(specs,
                                                                file_paths,
                                                                min_max_values,
                                                               1)

#perform required transformations on sampled specs for model
tensor_specs = []
for spec in sampled_specs:
    spec = transforms_set(spec)
    tensor_specs.append(spec)


#input spectrograms into the model and generate signals   
generated_signals = sound_generator.generate(tensor_specs, sampled_min_max_values)

#convert original signals to audio (skipping the model, for comparison)
original_signals = sound_generator.convert_spectrograms_to_audio(sampled_specs, sampled_min_max_values, True)


save_signals(original_signals, SAVE_DIR_ORIGINAL)
save_signals(generated_signals, SAVE_DIR_GENERATED)




In [None]:
latent_representations = sample_latent_space()
latent_signals = sound_generator.convert_latent_to_audio(latent_representations)
save_signals(latent_signals, SAVE_DIR_LATENT)

In [None]:
#DONE
#bedroom lights on
male_array = [[-0.5781,  1.7825, -4.4770,  2.3321, -1.5133,  1.5367,  2.9303,  1.9751,
         -2.7641, -2.3457, -2.0747, -0.4873,  2.6040,  6.0554, -4.8897,  1.5923]]

male_array_pitchedup = [[2.7032,  0.2202, -3.1795,  3.4626, -3.0187,  2.4574,  0.7890, -1.0400,
          0.2349, -1.6246, -2.2396, -3.7486,  0.3878,  5.2447, -7.4049,  1.8736]]

#modify male array by change top 3 most relevant features (1 & 12 changed, 8 modified)
result = [[2.7032,  1.7825, -4.4770,  2.3321, -1.5133,  1.5367,  2.9303,  0.55,
         -2.7641, -2.3457, -2.0747, -3.7486,  2.6040,  6.0554, -4.8897,  1.5923]]
new_vector = torch.Tensor(result)
print(new_vector)

In [None]:
#DONE
#Make the music softer
male_array = [[2.3603, -0.8928,  3.1577,  3.1412,  3.3199,  0.0068,  0.0714, -0.1817,
          1.5153,  0.6460, -0.8865, -2.8240,  1.1089, -2.9000, -5.1773, -0.3459]]

male_array_pitchedup = [[3.8991, -1.4469,  2.5336,  1.8094,  1.5834, -2.4974, -1.8551, -3.9157,
          3.0788,  0.5842, -2.0938, -4.3731,  2.0327, -0.8192, -3.6701, -2.7992]]

#a = male_array[0]
#b = male_array_pitchedup[0]
#result = [[statistics.mean(k) for k in zip(a,b)]]  
#modify male array and modify 8th variable with pitched
result = [[2.3603, -0.8928,  3.1577,  3.1412,  3.3199,  0.0068,  0.0714, -3.25,
          1.5153,  0.6460, -0.8865, -2.8240,  1.1089, -2.9000, -5.1773, -0.3459]]
new_vector = torch.Tensor(result)
print(new_vector)

In [None]:
#DONE
#Switch on the kitchen lights
male_array = [[1.7631, -2.7329,  2.1709,  1.4151,  2.0126,  4.3662, -1.8836, -1.2619,
          3.5545,  1.5870, -0.8313, -0.7222, -0.2389,  0.1972, -1.1918, -6.0831]]

male_array_pitchedup = [[1.3993, -1.7566,  1.5705,  1.7905,  0.8157,  1.6816, -3.2363, -0.4246,
          4.3625,  0.1268, -2.1645, -2.5726,  2.3026,  0.9361,  2.3026, -7.5896]]
 
#modify male array and modify 13th,15th variable with pitched
result = [[1.7631, -2.7329,  2.1709,  1.4151,  2.0126,  4.3662, -1.8836, -1.2619,
          3.5545,  1.5870, -0.8313, -0.7222, 2.3026,  0.1972, 0.61, -6.0831]]
new_vector = torch.Tensor(result)
print(new_vector)

In [None]:
#DONE
#Turn off the lights in the bedroom
female_array = [[5.0933, -1.6751, -1.5892, -3.3666,  0.9798, -1.3267,  0.8459, -6.3412,
         -0.6881, -1.1514,  0.8595, -1.1106,  3.4330, -0.2548,  1.1554, -0.5961]]

female_array_pitcheddown = [[0.1500, -5.5779, -5.0892, -2.7831,  4.4215, -5.3856,  1.2228, -2.1289,
         -1.9717, -2.2412, -2.7855, -1.8682,  5.4151,  6.1053,  0.5474, -0.7980]]

#modify female array by 1,2,3,4,5,6,8,11,13
result = [[0.1500, -5.5779, -5.0892, -3.3666,  4.4215, -5.3856,  0.8459, -2.1289,
         -0.6881, -1.1514, -2.7855, -1.1106,  4.0,  6.1053,  1.1554, -0.5961]]
new_vector = torch.Tensor(result)
print(new_vector)

In [None]:
#Switch on the bathroom lights
female_array = [[1.2856, -3.7600,  5.1191, -0.8282, -1.0653, -3.9675,  2.4507,  0.7030,
         -0.9069, -0.7750,  0.9804, -3.7015, -0.9143, -1.6186, -0.4156,  3.6323]]

female_array_pitcheddown = [[-1.4646, -6.1295,  1.0632, -2.6649,  5.0519, -6.7785,  3.1655, -0.0434,
         -1.3328, -4.0500, -3.3925, -3.2585,  1.0946,  2.9671, -0.9856,  3.5367]]

#modified female array by 1,2,3,4,5,9,10,12,13,
result = [[-1.4646, -6.1295,  1.0632, -0.8282,  5.0519, -6.7785,  2.4507, 0.7030,
         -0.9069, -4.0500, -3.3925, -3.7015,  1.0946,  2.9671, -0.4156,  3.6323]]  
new_vector = torch.Tensor(result)
print(new_vector)

In [None]:
#Increase the temperature in the kitchen
female_array = [[-0.7386, -4.2983,  2.2131, -2.4853,  3.2881,  3.2733,  3.0581, -0.9935,
         -1.2309, -1.7480,  2.6251, -1.9296,  1.0876,  1.0788, -1.3025,  3.2276]]

female_array_pitcheddown = [[-3.7020, -5.0420, -1.7487, -4.2840,  6.4130, -1.9832,  3.5992, -1.1902,
         -0.8086, -3.2695, -1.9878, -1.1099,  2.9419,  6.4888, -0.4898,  1.1227]]

#changes from pitched: 8,9,7,2,15, 12,10,13
result = [[-3.7020, -4.2983, -1.7487, -4.2840,  6.4130, -1.9832,  3.0581, -0.9935,
         -1.2309, -1.7480, -1.9878, -1.9296,  2.9419,  6.4888, -1.3025,  1.1227]]
new_vector = torch.Tensor(result)
print(new_vector)

In [None]:
with torch.no_grad():
    #sample from normal distribution - 1 spectrogram, 32 is our hidden dimension. So 1 vector of 32 dimensions 
    #sample = torch.randn(1, 16).to(device)
    #vector = sample
    #avoid encoder and go to middle of model to decode data
    sample = model.decode(new_vector).cpu()

In [None]:
latent_signals = sound_generator.convert_latent_to_audio(sample)
save_signals(latent_signals, SAVE_DIR_LATENT)

In [None]:
print(f_paths)

## Task 7: Results

In [None]:
train_losses = np.loadtxt('/Users/Robert/Desktop/thesis/training_losses.txt')
val_losses = np.loadtxt('/Users/Robert/Desktop/thesis/validation_losses.txt')

In [None]:
plt.plot(
    range(1, len(train_losses)+1), 
    train_losses, 
    label='Training Loss',
    linewidth=2, 
    alpha=0.7
)

plt.plot(
    range(1, len(val_losses)+1),
    val_losses,
    label='Validation Loss',
    linewidth=2,
    alpha=0.7
)

plt.title('VAE Spectrogram Training')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
pass

In [None]:
original_file = "/Users/Robert/Desktop/Thesis/reconstructed_signals/original/0.wav"
generated_file = "/Users/Robert/Desktop/Thesis/reconstructed_signals/generated/0.wav"
latent_file = "/Users/Robert/Desktop/Thesis/reconstructed_signals/latent/0.wav"

original, sr = librosa.load(original_file)
generated, _ = librosa.load(generated_file)
latent, _ = librosa.load(latent_file)
#extract short term fourier transform
S_original = librosa.stft(original, n_fft=2048, hop_length=HOP_LENGTH)
S_generated = librosa.stft(generated, n_fft=2048, hop_length=HOP_LENGTH)
S_latent = librosa.stft(latent, n_fft=2048, hop_length=HOP_LENGTH)


In [None]:
ipd.Audio(original_file)

In [None]:
ipd.Audio(generated_file)

In [None]:
ipd.Audio(latent_file)

In [None]:
#visualise spectrogram
def plot_spectrogram(Y, sr, hop_length, y_axis="linear"):
    plt.figure(figsize=(25, 10))
    librosa.display.specshow(Y, 
                             sr=sr, 
                             hop_length=hop_length, 
                             x_axis="time", 
                             y_axis=y_axis)
    plt.colorbar(format="%+2.f")
    

In [None]:
Y_log_original = librosa.power_to_db(np.abs(S_original) ** 2)
Y_log_generated = librosa.power_to_db(np.abs(S_generated) ** 2)
Y_log_latent = librosa.power_to_db(np.abs(S_latent) ** 2)

In [None]:
plt.rcParams.update({'font.size': 40})
#log frequency spectrogram
#original signal
plot_spectrogram(Y_log_original, sr, HOP_LENGTH, y_axis="log")
plt.savefig("original.png")

In [None]:
#generated/reconstructed spectrogram
plot_spectrogram(Y_log_generated, sr, HOP_LENGTH, y_axis="log")
plt.savefig("generated.png")

In [None]:
#spectrogram sampled from latent space
plot_spectrogram(Y_log_latent, sr, HOP_LENGTH, y_axis="log")
plt.savefig("latent.png")