In [None]:
import codecs
import errno
import matplotlib.pyplot as plt
import numpy as np
import os
from PIL import Image
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import transforms
from tqdm import tqdm

## Preparing the data

In [None]:
def audio_to_spectogram(path):
    """
    This method transforms wav audio files into spectograms"
    :param path the path to the wav file
    """
    # TODO!

In [None]:
class CelebsVoicePair(torch.utils.data.Dataset):
    """
    Dataset that on each iteration provides two random pairs of
    celebrities voices. One pair is of the same person (positive sample), one
    is of two different voices (negative sample)
    """
    
    def __init__(self, root):
        """
        This method constructs the data set by iterating the root directory, then
        tranforming wav audio files to spectograms while maintining the speaker to 
        the voice spectogram so we can produce negative and postive samples upon __getitem__
        
        :param root the path to the speakers voices in the form of speaker_id/instances/*.wav
        """
        # TODO!
            
    
    def __getitem__(self, index):
        # TODO!
   
    
    def __len__(self):
        # TODO!
   

## The model

In [None]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.conv1 = nn.Conv2d(1, 64, 7)
        self.pool1 = nn.MaxPool2d(2)
        self.conv2 = nn.Conv2d(64, 128, 5)
        self.conv3 = nn.Conv2d(128, 256, 5)
        self.fc1 = nn.Linear(2304, 512)
        self.fc2 = nn.Linear(512, 2)
    
    def forward(self, data):
        res = []
        for i in range(2): # Siamese nets; sharing weights
            x = data[i]
            x = self.conv1(x)
            x = F.relu(x)
            x = self.pool1(x)
            x = self.conv2(x)
            x = F.relu(x)
            x = self.conv3(x)
            x = F.relu(x)
            
            x = x.view(x.shape[0], -1)
            x = self.fc1(x)
            res.append(F.relu(x))
            
        res = torch.abs(res[1] - res[0])
        res = self.fc2(res)
        return res

## Method to train and test the model

In [None]:
def train(model, device, train_loader, epoch, optimizer):
    model.train()
    
    for batch_idx, (data, target) in enumerate(train_loader):
        for i in range(len(data)):
            data[i] = data[i].to(device)
            
        optimizer.zero_grad()
        output_positive = model(data[:2]) # TODO - get from data
        output_negative = model(data[0:3:2]) # TODO - get from data
        
        target = target.type(torch.LongTensor).to(device)
        target_positive = torch.squeeze(target[:,0]) # TODO - get from data
        target_negative = torch.squeeze(target[:,1]) # TODO - get from data
        
        loss_positive = F.cross_entropy(output_positive, target_positive)
        loss_negative = F.cross_entropy(output_negative, target_negative)
        
        loss = loss_positive + loss_negative
        loss.backward()
        optimizer.step()
        
        if batch_idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, 
                                                                           batch_idx*batch_size, 
                                                                           len(train_loader.dataset), 
                                                                           100. * batch_idx*batch_size / len(train_loader.dataset), 
                                                                           loss.item()))

def test(model, device, test_loader):
    model.eval()
    
    with torch.no_grad():
        accurate_labels = 0
        all_labels = 0
        loss = 0
        
        for batch_idx, (data, target) in enumerate(test_loader):
            for i in range(len(data)):
                data[i] = data[i].to(device)
            
            output_positive = model(data[:2]) # TODO - get from data
            output_negative = model(data[0:3:2]) # TODO - get from data

            target = target.type(torch.LongTensor).to(device)
            target_positive = torch.squeeze(target[:,0]) # TODO - get from data
            target_negative = torch.squeeze(target[:,1]) # TODO - get from data

            loss_positive = F.cross_entropy(output_positive, target_positive)
            loss_negative = F.cross_entropy(output_negative, target_negative)

            loss = loss + loss_positive + loss_negative

            accurate_labels_positive = torch.sum(torch.argmax(output_positive, dim=1) == target_positive).cpu()
            accurate_labels_negative = torch.sum(torch.argmax(output_negative, dim=1) == target_negative).cpu()

            accurate_labels = accurate_labels + accurate_labels_positive + accurate_labels_negative
            all_labels = all_labels + len(target_positive) + len(target_negative)
        
        accuracy = 100. * accurate_labels / all_labels
        print('Test accuracy: {}/{} ({:.3f}%)\tLoss: {:.6f}'.format(accurate_labels, all_labels, accuracy, loss))

## Inference

In [None]:
def oneshot(model, device, data):
   model.eval()

   with torch.no_grad():
      for i in range(len(data)):
            data[i] = data[i].to(device)
      
      output = model(data)
      return torch.squeeze(torch.argmax(output, dim=1)).cpu().item()

## Train and run

In [None]:
do_learn = True
save_frequency = 2
batch_size = 16
lr = 0.001
num_epochs = 10
weight_decay = 0.0001


def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    trans = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (1.0,))])

    model = Net().to(device)

    if do_learn: # training mode
        train_loader = torch.utils.data.DataLoader(BalancedMNISTPair('../data', train=True, download=True, transform=trans), batch_size=batch_size, shuffle=True)
        test_loader = torch.utils.data.DataLoader(BalancedMNISTPair('../data', train=False, download=True, transform=trans), batch_size=batch_size, shuffle=False)

        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
        for epoch in range(num_epochs):
            train(model, device, train_loader, epoch, optimizer)
            test(model, device, test_loader)
            if epoch & save_frequency == 0:
                torch.save(model, 'siamese_{:03}.pt'.format(epoch))
    else: # prediction
        prediction_loader = torch.utils.data.DataLoader(BalancedMNISTPair('../data', train=False, download=True, transform=trans), batch_size=1, shuffle=True)
        model.load_state_dict(torch.load(load_model_path))
        data = []
        data.extend(next(iter(prediction_loader))[0][:3:2])
        same = oneshot(model, device, data)
        if same > 0:
            print('These two images are of the same number')
        else:
            print('These two images are not of the same number')


if __name__ == '__main__':
    main()