In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import seaborn as sn
import pandas as pd
import pickle
from collections import defaultdict
from sklearn import metrics
from torch.optim import lr_scheduler
from torchvision import datasets
import os

In [None]:
class Dataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = self.data[idx][0]
        y = self.data[idx][1]
        
        if self.transform:
            x = self.transform(x)
        
        x = torch.Tensor(x)

        return (x, y)

drive_dir = '/COGS 181 Final Datasets/'

In [None]:
batch_size = 32
num_workers = 2

transform = transforms.Compose(
    [transforms.ToTensor(),])
     #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])




# GTZAN
GTZAN_data = pickle.load(open(drive_dir + 'GTZAN/GTZAN_165x32_normalized.train', 'rb'))
GTZAN_trainset = Dataset(GTZAN_data)
print(len(GTZAN_trainset))
GTZAN_trainloader = torch.utils.data.DataLoader(GTZAN_trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

GTZAN_data = pickle.load(open(drive_dir + 'GTZAN/GTZAN_165x32_normalized.test', 'rb'))
GTZAN_testset = Dataset(GTZAN_data)
GTZAN_testloader = torch.utils.data.DataLoader(GTZAN_testset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

GTZAN_classes = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']



# FMA
FMA_data = pickle.load(open(drive_dir + 'FMA/FMA_165x32_normalized.train', 'rb'))
FMA_trainset = Dataset(FMA_data)
print(len(FMA_trainset))
FMA_trainloader = torch.utils.data.DataLoader(FMA_trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)

FMA_data = pickle.load(open(drive_dir + 'FMA/FMA_165x32_normalized.test', 'rb'))
FMA_testset = Dataset(FMA_data)
FMA_testloader = torch.utils.data.DataLoader(FMA_testset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)

FMA_classes = ['Blues', 'Classical', 'Country', 'Easy Listening', 'Electronic', 'Experimental', 'Folk', 'Hip-Hop', 'Instrumental', 'International', 'Jazz', 'Old-Time / Historic', 'Pop', 'Rock', 'Soul-RnB', 'Spoken']


# RAVDESS
RAVDESS_data = pickle.load(open(drive_dir + 'RAVDESS/RAVDESS_165x32_normalized.train', 'rb'))
RAVDESS_trainset = Dataset(RAVDESS_data)
print(len(RAVDESS_trainset))
RAVDESS_trainloader = torch.utils.data.DataLoader(RAVDESS_trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)

RAVDESS_data = pickle.load(open(drive_dir + 'RAVDESS/RAVDESS_165x32_normalized.test', 'rb'))
RAVDESS_testset = Dataset(RAVDESS_data)
RAVDESS_testloader = torch.utils.data.DataLoader(RAVDESS_testset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)

RAVDESS_classes = ['neutral', 'calm', 'happy', 'sad', 'angry', 'fearful']



# PMC
PMC_data = pickle.load(open(drive_dir + 'PMC/PMC_165x32_normalized.train', 'rb'))
PMC_trainset = Dataset(PMC_data)
print(len(PMC_trainset))
PMC_trainloader = torch.utils.data.DataLoader(PMC_trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)

PMC_data = pickle.load(open(drive_dir + 'PMC/PMC_165x32_normalized.test', 'rb'))
PMC_testset = Dataset(PMC_data)
PMC_testloader = torch.utils.data.DataLoader(PMC_testset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)

PMC_classes = ['sad', 'happy', 'scary', 'peaceful']


# Choose
trainloader = PMC_trainloader
testloader = PMC_testloader
classes = PMC_classes
num_classes = len(classes)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)  

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.main = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3,padding=1),
            nn.BatchNorm2d(64,momentum=0.1),
            nn.Sigmoid(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 64, kernel_size=3,padding=1),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, kernel_size=3,padding=1),
            nn.BatchNorm2d(128),
            nn.Sigmoid(),
            nn.Conv2d(128, 256, kernel_size=3,padding=1),
            nn.Sigmoid(),
            nn.MaxPool2d(2, 2),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 512, kernel_size=3,padding=1),
            nn.Sigmoid(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2, 2)       
        )
   
        # flatten 
        self.fc = nn.Sequential(
            nn.Linear(512*2*10,1000),
            nn.Sigmoid(),
            nn.Dropout(0.5),
            nn.Linear(1000,100),
            nn.Sigmoid(),
            nn.Dropout(0.5),
            nn.Linear(100, num_classes),           
        )
        self.dropOut = nn.Dropout(0.1)
        
    def forward(self, x):

        x = self.main(x)
#         print ('x shape: ', x.shape)
        x = x.view(-1, 512*2*10) 
       
        x = self.fc(x) 
        x = self.dropOut(x)
        return x

In [None]:
cnn = CNN()
cnn.to(device)

In [None]:
loss_func = nn.CrossEntropyLoss()  
opt_cnn = optim.Adam(cnn.parameters(), lr=0.001, momentum=0.9) 

In [None]:
torch.autograd.set_detect_anomaly(True)

In [None]:
avg_losses_cnn = []
avg_acc = []
epochs = 50
print_freq = len(trainloader) 

try:
    for epoch in range(epochs):
        running_loss_cnn = 0.0 
        running_acc = 0.0
        cnn.train()

        for i, data in enumerate(trainloader):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            opt_cnn.zero_grad()

            outputs_cnn = cnn(inputs)
            _, preds = torch.max(outputs_cnn,1)
            loss_cnn = loss_func(outputs_cnn, labels)

            loss_cnn.backward()
            opt_cnn.step()
                     
            running_loss_cnn += loss_cnn.item()
            running_acc += torch.sum(preds == labels.data)
            if i % print_freq == print_freq - 1: # Print every several mini-batches.

                losses = running_loss_cnn / print_freq
                acc = running_acc / (print_freq*batch_size)

                print('[epoch: {}, i: {:5d}] avg mini-batch loss_cnn: {:.3f}, acc: {:.3f}'.format(
                    epoch, i, losses, acc))

                avg_losses_cnn.append(losses)
                avg_acc.append(acc)

                running_loss_cnn = 0.0
                running_acc = 0.0

    print('Finished Training.')
except KeyboardInterrupt:
    print('Halted Training')

In [None]:
plt.plot(avg_losses_cnn, 'b')
plt.xlabel('Epochs')
plt.ylabel('Average Mini-Batch Loss')
plt.show()

In [None]:
# Get test accuracy
correct_net = 0
correct_cnn = 0
total = 0
with torch.no_grad():
    
    for data in testloader:
        images, labels = data
        labels = labels - 1
        images, labels = images.to(device), labels.to(device)
        outputs_cnn = cnn(images)
        _, predicted_cnn = torch.max(outputs_cnn.data, 1)
        total += labels.size(0)
        correct_cnn += (predicted_cnn == labels).sum().item()
        print(labels)
        print(predicted_cnn)
        print()

print('Cnn: Accuracy of the network on the test images: %d %%' % (
    100 * correct_cnn / total))

In [None]:
# Get test accuracy for each class

truths = []
preds = []

class_correct = list(0. for i in range(num_classes))
class_total = list(0. for i in range(num_classes))

with torch.no_grad():
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)

        outputs = cnn(images)
        _, predicted = torch.max(outputs, 1)

        c = (predicted == labels).squeeze()
        if type(c) is not list:
            c = [c]
        for i in range(batch_size):
            preds.append(predicted[i].item())

            truths.append(labels[i].item())

            label = labels[i]
            class_correct[label] += c[0][i]
            class_total[label] += 1

for i in range(num_classes):
    try:
        print(class_total[i])
        print('Accuracy of %5s : %2d %%' % (
            classes[i], 100 * class_correct[i] / class_total[i]))
    except:
        pass
  

In [None]:
array = metrics.confusion_matrix(truths, preds)
df_cm = pd.DataFrame(array, index = classes, columns = classes)
plt.figure(figsize = (20,9))
f = sn.heatmap(df_cm, annot=True, fmt='g')

figure = f.get_figure()    
figure.savefig('fma_before.png')

In [None]:
# Choose main dataset
trainloader = FMA_trainloader
testloader = FMA_testloader
classes = FMA_classes
num_classes = len(classes)

cnn = CNN()
cnn.to(device)
cnn.load_state_dict(torch.load(drive_dir + 'Models/naivemodel_fma_50.m'))
cnn.eval()

model_conv = cnn

# Choose second dataset for transfer learning
trainloader = PMC_trainloader
testloader = PMC_testloader
classes = PMC_classes
num_classes = len(classes)
print(num_classes)

for param in model_conv.parameters():
    param.requires_grad = False

num_ftrs = 100

model_conv.fc = nn.Sequential(
            nn.Linear(512*2*10,1000),
            nn.Sigmoid(),
            nn.Dropout(0.5),
            nn.Linear(1000,100),
            nn.Sigmoid(),
            nn.Dropout(0.5),
            nn.Linear(100, num_classes),           
        )

cnn = model_conv

cnn = cnn.to(device)

criterion = nn.CrossEntropyLoss()

optimizer_conv = optim.SGD(model_conv.fc.parameters(), lr=0.001, momentum=0.9)

loss_func = nn.CrossEntropyLoss()  

opt_cnn = optim.Adam(cnn.parameters(), lr=0.001, momentum=0.9) 

In [None]:
avg_losses_cnn = []
avg_acc = []
epochs = 50
print_freq = len(trainloader) 

try:
    for epoch in range(epochs):
        running_loss_cnn = 0.0 
        running_acc = 0.0
        cnn.train()

        for i, data in enumerate(trainloader):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            opt_cnn.zero_grad()

            outputs_cnn = cnn(inputs)
            _, preds = torch.max(outputs_cnn,1)
            loss_cnn = loss_func(outputs_cnn, labels)

            loss_cnn.backward()
            opt_cnn.step()
                     
            running_loss_cnn += loss_cnn.item()
            running_acc += torch.sum(preds == labels.data)
            if i % print_freq == print_freq - 1: # Print every several mini-batches.

                losses = running_loss_cnn / print_freq
                acc = running_acc / (print_freq*batch_size)

                print('[epoch: {}, i: {:5d}] avg mini-batch loss_cnn: {:.3f}, acc: {:.3f}'.format(
                    epoch, i, losses, acc))

                avg_losses_cnn.append(losses)
                avg_acc.append(acc)

                running_loss_cnn = 0.0
                running_acc = 0.0

    print('Finished Training.')
except KeyboardInterrupt:
    print('Halted Training')

In [None]:
# Get test accuracy
correct_net = 0
correct_cnn = 0
total = 0
with torch.no_grad():
    
    for data in testloader:
        images, labels = data
        labels = labels - 1
        images, labels = images.to(device), labels.to(device)
        outputs_cnn = cnn(images)
        _, predicted_cnn = torch.max(outputs_cnn.data, 1)
        total += labels.size(0)
        correct_cnn += (predicted_cnn == labels).sum().item()
        print(labels)
        print(predicted_cnn)
        print()

print('Cnn: Accuracy of the network on the test images: %d %%' % (
    100 * correct_cnn / total))

In [None]:
# Get test accuracy for each class

truths = []
preds = []

class_correct = list(0. for i in range(num_classes))
class_total = list(0. for i in range(num_classes))

with torch.no_grad():
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)

        outputs = cnn(images)
        _, predicted = torch.max(outputs, 1)

        c = (predicted == labels).squeeze()
        if type(c) is not list:
            c = [c]
        for i in range(batch_size):
            preds.append(predicted[i].item())

            truths.append(labels[i].item())

            label = labels[i]
            class_correct[label] += c[0][i]
            class_total[label] += 1

for i in range(num_classes):
    try:
        print(class_total[i])
        print('Accuracy of %5s : %2d %%' % (
            classes[i], 100 * class_correct[i] / class_total[i]))
    except:
        pass
  

In [None]:
array = metrics.confusion_matrix(truths, preds)
df_cm = pd.DataFrame(array, index = classes, columns = classes)
plt.figure(figsize = (20,9))
f = sn.heatmap(df_cm, annot=True, fmt='g')

figure = f.get_figure()    
figure.savefig('pmc_after.png')