# GAN model
Generative Adversarial Networks are used to generate images that never existed before. They learn about the world (objects, animals and so forth) and create new versions of those images that never existed.

They have two components:

1. A **Generator** - this creates the images.
2. A **Discriminator** - this assesses the images and tells the generator if they are similar to what it has been trained on. These are based off real world examples.

When training the network, both the generator and discriminator start from scratch and learn together.

## Libraries we'll need

In [None]:
from __future__ import print_function
import time
from IPython.display import Audio, display
from pathlib import Path
import librosa
import librosa.display

from sklearn.preprocessing import normalize

from PIL import Image

import math
########### UPDATING PYTORCH IMPORTS AS I INTERPRET FOR SOUNDS
import torch
import torchaudio
from torch.utils.data import DataLoader
from torchaudio.transforms import Resample, MelSpectrogram

import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
from torch.autograd import Variable
import matplotlib.pyplot as plt
import numpy as np
from torch import nn, optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torchvision.utils import save_image
########### ^^^^^^^^^^^^^^^^^^^^^^
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from tqdm import tqdm_notebook as tqdm
from tqdm import tqdm

In [None]:
import numpy as np
import os
import random
import pandas as pd

In [None]:
dset_path = '/kaggle/input/english-multispeak-corpus-to-overlay-spectrographs'
working_dir = '/kaggle/working'
# CSVs
for i in range(5):
    csvs = [csv for csv in (os.listdir(dset_path)) if os.path.splitext(csv)[1] == '.csv']
parent_df = pd.read_csv(f'{dset_path}/{csvs[0]}')
child_df =  pd.read_csv(f'{dset_path}/{csvs[1]}')
df =  pd.read_csv(f'{dset_path}/{csvs[2]}')

In [None]:
overlay_wavs_path = f'{dset_path}/overlay_data/wav/overlays'
raws_path = f'{dset_path}/raws'

In [None]:
overlay = os.listdir(overlay_wavs_path)[0]
overlay

lookup helper function to get a random wav (until we get a working dset)

In [None]:
def random_lookup():
    speaker_dirs = os.listdir(raws_path)
    rand_idx = random.randint(0, len(speaker_dirs)-1)
    speaker_dir = f'{raws_path}/{speaker_dirs[rand_idx]}'
                              
    parent = f'speaker_{rand_idx}'
    # Child gets passed into DF as int
    child = rand_idx + random.randint(1,10)
    
    print(f'PARENT PATH: {speaker_dir}\n\nPARENT: {parent}\nCHILD: {child}\n')
    return speaker_dir, parent, child

In [None]:
trio = random_lookup()

In [None]:
trio

In [None]:
def lookup_relations(overlay_path, parent, child):
    raw_path = '/kaggle/input/english-multispeak-corpus-to-overlay-spectrographs/raws'
    # Get Names
    parent_name = parent_df.loc[child, parent]
    parent_id, pfile_id = parent_name.split('_')
    
    child_name = child_df.loc[child, parent]
    child_id, chfilfe_id = child_name.split('_')
    
    # Returns
    p_path = os.path.join(f"{raw_path}/{parent_id}_samples", f"{parent_name}.wav")
    c_path = os.path.join(f"{raw_path}/{child_id}_samples", f"{child_name}.wav")
    o_path = f"{overlay_path}"

    return p_path, c_path, o_path

In [None]:
def get_parent_child(overlay_path):
    overlay_name = (str(overlay_path).split('/')[-1])[:-4]

    parent = overlay_name.split('_and_')[0]
    child = int(overlay_name.split('_and_')[1].split('_')[1])

    return parent, child

In [None]:
def lookup(overlay_path):
    p, c = get_parent_child(overlay_path)
    p_path, c_path, o_path = lookup_relations(overlay_path, p, c)

    return p_path, c_path, o_path, int(p.split('_')[1]), c

Now we have an easy way of returing parent, child, and overlay filenames

In [None]:
d, p, c = random_lookup()
p_path, c_path, o_path = lookup_relations(d, p, c)

In [None]:
o_path, p_path, c_path

## GAN's processes
First, we will give a random noise signal to the ***Generator***, this will create some sound files, which we will use to train the ***Discriminator***. The ***Discriminator*** will be given some **features** we want it to learn, and it will output probabilities.
These probabilities are assessed based on their true values, a loss is then calculated and backpropped.

`I think we can use F0 as one of these features`


Next we train the generator. We take the batch of sounds that it created and put them through the discriminator again. We do not include the feature sounds. The generator learns by tricking the discriminator into it outputting false positives.

The discriminator will provide an output of probabilities. The values are then assessed and compared to what they should have been. The error is calculated and backpropagated through the generator and the weights are updated.


Over time, this model will be able to recognize it's mistakes in it's generations, and improves because of this.

In [None]:
sounds = os.listdir(overlay_wavs_path)
print(f'Got paths of {len(sounds)} sounds.')

## Preprocessing with pytorch
Now we have paths to audio files. Here are some pytorch methods we can use

In [None]:
sample =f'{overlay_wavs_path}/{sounds[6]}'

In [None]:
metadata = torchaudio.info(sample)
print(f'Metadata:\n-----------\n{metadata}')
# Loading Audio file
waveform, sample_rate = torchaudio.load(sample)
# By default, dtype=troch.float32 and rand is normalized within [-1, 1]
print(f"----------------------------------------------\nWaveform is tensor object of shape: {waveform.shape}\n")

### We can even play the audio

In [None]:
def play_audio(waveform, sample_rate):
    waveform = waveform.numpy()
    
    num_channels, num_frames = waveform.shape
    # MONO
    if num_channels == 1:
        display(Audio(waveform[0], rate = sample_rate))
    # STEREO
    elif num_channels == 2:
        display(Audio(waveform[0], waveform[1], rate = sample_rate))
    else:
        raise ValueError("!!Waveform with more than 2 channels are not supported!!")

play_audio(waveform, sample_rate)

# Filter function
### Gets only files +-1 std from the mean

In [None]:
def sort_wavs(file_list):
    effects = [['rate', '8000']]
    print(len(file_list))
    len_sum = 0
    len_list = []
    len_dict_list = []
    len_dict = {}
    
    for i in tqdm(range(len(file_list))):
        sample = file_list[i]
        waveform, sample_rate = torchaudio.sox_effects.apply_effects_file(sample, effects)
        wf = waveform.numpy().flatten()
        
        len_list.append(len(wf))
        len_dict = {'sample': sample, 'length': len(wf)}
        len_dict_list.append(len_dict)
        len_sum += len(wf)
        
    avg = len_sum / len(file_list)
    std_dev = np.std(np.array([len_list]))
    return len_dict_list, std_dev, avg, len_sum
   
def filter_wavs(len_dict_list, std_dev, avg, len_sum):
    reject = 0
    accept = 0
    filtered_file_list = []
    max_list = []
    for line in len_dict_list:
        if line['length'] < avg - std_dev or line['length'] > avg + std_dev:
            reject += 1
            continue
        else:
            filtered_file_list.append(line['sample'])
            max_list.append([line['length']])
            accept += 1
    print(f"{accept} files Accepted\n{reject} files Rejected\n")
    return filtered_file_list, max(max_list)[0]

In [None]:
ld, st, avg, lsum = sort_wavs(list(Path(overlay_wavs_path).rglob("*.wav")))

In [None]:
st, avg, lsum

In [None]:
filtered_overlays, padding_size = filter_wavs(ld, st, avg, lsum)

# Time series embeddings w/ padding for normalization

In [None]:
def padding(matrix, desired_size):
    padding_amt = desired_size - matrix.shape[0] / 2
    if padding_amt % 2 != 0:    
        padding_amt = math.floor(padding_amt)
        return np.pad(matrix, pad_width=((padding_amt, padding_amt + 1), (0, 0)), mode='constant')
    padding_amt = int(padding_amt)
    return np.pad(matrix, pad_width=((padding_amt, padding_amt), (0, 0)), mode='constant')

In [None]:
def waveform_to_tensor(wf, padding_size, embedding_delay = 1, embedding_dimension = 64 * 64 * 3):
    wf = wf.numpy().flatten()
    
    # Initialize embedding matrix
    M = np.zeros((len(wf) - (embedding_dimension - 1) * embedding_delay, embedding_dimension))

    # Construct embedding by shifting values
    for i in range(embedding_dimension):
        M[:, i] = wf[i * embedding_delay:i * embedding_delay + M.shape[0]]
    
    M = padding(M, padding_size)
    
    # Reshape embedding matrix into 64x64x3 tensor
    final_tensor = (M.reshape((64, 64, 3, -1)).transpose((3, 0, 1, 2))).astype("float16")

    return final_tensor

In [None]:
def tensor_to_vector(tensor, embedding_delay = 1 ):
    matrix = tensor.transpose((1, 2, 3, 0)).reshape(tensor.shape[0], -1)
    print(matrix.shape)
    
    num_windows, embedding_dimension = matrix.shape

    # Calculate length of original time series
    wf_length = num_windows + (embedding_dimension - 1) * embedding_delay
    
    wf_reconstructed = np.zeros((wf_length, ))
    
    for i in range(num_windows):
            window = matrix[i, :]
            start_index = i * embedding_delay
            end_index = start_index + embedding_dimension
            wf_reconstructed[start_index:end_index] = window

    wf_tens = (torch.tensor(wf_reconstructed).unsqueeze(dim=0))
    return wf_tens

In [None]:
def tensor_generator(path, padding):
    effects = [['rate', '8000']]
    par, child, over, p_indx, c_indx = lookup(str(path))

    par_wav, par_sr = torchaudio.sox_effects.apply_effects_file(par, effects)
    over_wav, over_sr = torchaudio.sox_effects.apply_effects_file(over, effects)
    
    target = p_indx
    
    par_tensor = waveform_to_tensor(par_wav, padding)
    over_tensor = waveform_to_tensor(over_wav, padding)
    return par_tensor, over_tensor, target, 
        


# AudioDataset class 

In [None]:
class AudioDataset(torch.utils.data.Dataset):  
    def __init__(self, files, padding_size, transform=None, return_overlay = False):
        self.transform = transform
        self.return_overlay = return_overlay
        self.sound_files = files
        self.padding_size = padding_size
 
    def __len__(self):
        return len(self.sound_files)
    
    def __getitem__(self, idx):
        audio_path = self.sound_files[idx]
        p, o, t = tensor_generator(audio_path, self.padding_size)
        return p, o, t
        

In [None]:
train_data = AudioDataset(filtered_overlays, padding_size, return_overlay = True)

### Verify the pipeline is 👌

In [None]:
p, o, t = train_data[3]
print(f"Speaker-Index: {t}\nO-Shape: {o.shape}\n") 

In [None]:
v = tensor_to_vector(o)
play_audio(v, 8000)

Now we'll load the dataset and create a dataloader

In [None]:
batch_size = 5
train_loader = DataLoader(train_data, shuffle = True, batch_size = batch_size)

In [None]:
p, t, o = next(iter(train_loader))

In [None]:
v = tensor_to_vector(p)
play_audio(v, 8000)

## Weights
The below function simply initializes weights with given distribution mean and SD based on the type of model (Convolutional, or Batch Normalization)

In [None]:
def weights_init(model):
    classname = model.__class__.__name__
    if classname.find('Conv') != -1:
        model.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        model.weight.data.normal_(1.0, 0.02)
        model.bias.data.fill_(0)
    else:
        pass

### Generator
I need to do more research on audio convnets b4 i can continue

In [None]:
# Defining the discriminator
class D(nn.Module):
    def __init__(self):
        super(D, self).__init__()
        self.main = nn.Sequential(
                nn.Conv2d(3, 64, 4, stride=2, padding=1, bias=False),
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(64, 128, 4, stride=2, padding=1, bias=False),
                nn.BatchNorm2d(128),
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(128, 256, 4, stride=2, padding=1, bias=False),
                nn.BatchNorm2d(256),
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(256, 512, 4, stride=2, padding=1, bias=False),
                nn.BatchNorm2d(512),
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(512, 1, 4, stride=1, padding=0, bias=False),
                nn.Sigmoid()
                )
        
    def forward(self, input):
        output = self.main(input)
        # .view(-1) = Flattens the output into 1D instead of 2D
        return output.view(-1)
        
    
# Creating the discriminator
netD = D()
netD.apply(weights_init)

In [None]:
class G(nn.Module):
    def __init__(self):
        super(G, self).__init__()

        self.main = nn.Sequential(
            nn.ConvTranspose2d(100, 512, 4, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, 4, stride=2, padding=(1, 2), bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 3, 4, stride=2, padding=1, bias=False),
            nn.Tanh()
        )
        
        self.adaptive_pool = nn.AdaptiveAvgPool2d((128, 5000))

    def forward(self, input):
        output = self.main(input)
        return output

# Creating the generator
netG = G()
netG.apply(weights_init)


In [None]:
!mkdir results
!ls

In [None]:
EPOCH = 3
LR = 0.001
criterion = nn.BCELoss()
optimizerD = optim.Adam(netD.parameters(), lr=LR, betas=(0.5, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=LR, betas=(0.5, 0.999))

In [None]:
for epoch in range(EPOCH):
    for i, data in enumerate(train_loader, 0):
        print(data)
        # 1st Step: Updating the weights of the neural network of the discriminator
        netD.zero_grad()
        
        # Training the discriminator with a real image of the dataset
        real, _, overlay = data
        input = Variable(real)
        input = (input.permute(0, 3, 2, 1)).float()
        target = netD(Variable(torch.ones(input.size())))
        output = netD(input)
        print(f'--------------------------\n{output} : {target}\n\n')
#         target = target.view(output.size()) 
        errD_real = criterion(output, target)
        
        # Training the discriminator with a fake image generated by the generator
#         noise = Variable(torch.randn(input.size()[0], 100, 1, 1))
        overlay = (overlay.permute(0, 3, 2, 1)).float()
        fake = netG(overlay)
        target =  netD(Variable(torch.zeros(input.size())))
        output = netD(fake.detach())
        errD_fake = criterion(output, target)
        
        # Backpropagating the total error
        errD = errD_real + errD_fake
        errD.backward()
        optimizerD.step()
        
        # 2nd Step: Updating the weights of the neural network of the generator
        netG.zero_grad()
        target = netD(Variable(torch.ones(input.size())))
        output = netD(fake)
        errG = criterion(output, target)
        errG.backward()
        optimizerG.step()
        
        # 3rd Step: Printing the losses and saving the real images and the generated images of the minibatch every 100 steps
        print('[%d/%d][%d/%d] Loss_D: %.4f; Loss_G: %.4f' % (epoch, EPOCH, i, len(dataloader), errD.item(), errG.item()))
        if i % 100 == 0:
            vutils.save_image(real, '%s/real_samples.png' % "./results", normalize=True)
            fake = netG(overlay)
            vutils.save_image(fake.data, '%s/fake_samples_epoch_%03d.png' % ("./results", epoch), normalize=True)