# TONG CODE

# Data Loading

## Imports

In [5]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import axes3d
from joblib import dump, load
import warnings
warnings.filterwarnings('ignore') 


import librosa
from librosa import display

from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.utils.multiclass import unique_labels
from sklearn.decomposition import PCA
from sklearn.cross_decomposition import CCA
from sklearn.utils import shuffle
from sklearn.preprocessing import StandardScaler

from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

## Load Data

In [6]:
# this is all sorts of messy

In [9]:
class DataLoader():
    def __init__(self):
        self.X = None
        self.X_mfcc = None
        self.X_mfcc_pca = None
        self.X_mfcc_random_crop = None
        self.X_mfcc_fixed_crop = None
        self.X_chroma = None
        self.X_chroma_pca = None
        self.pca_mfcc = None
        self.pca_chroma = None
        self.Y = None
        
    def save_raw(self, genres=all_genres, songs=num_songs):
        assert(self.X is None and self.Y is None)
        X, Y = None, None
        for g_idx, g in enumerate(genres):
            for s_idx in range(songs):
                y, sr = librosa.load(f'genres/{g}/{g}.000{s_idx:02d}.au')
                y = y[:Y_LIMIT]
                if X is None:
                    X = y.reshape(1, y.shape[0])
                    Y = np.array([[g_idx]])
                else:
                    X = np.vstack([X, y])
                    Y = np.vstack([Y, np.array([[g_idx]])])
        Y = Y.ravel()
        self.X = X
        self.Y = Y
        np.savetxt('data/X.csv', X)
        np.savetxt('data/Y.csv', Y)
        
    def save_mfcc(self, genres=all_genres, songs=num_songs):
        assert(self.X_mfcc is None)
        X_mfcc = None
        for g_idx, g in enumerate(genres):
            for s_idx in range(songs):
                y, sr = librosa.load(f'genres/{g}/{g}.000{s_idx:02d}.au')
                y = y[:Y_LIMIT]
                mfcc = librosa.feature.mfcc(y, sr=sr, hop_length=512, n_mfcc=13).flatten()
                if X_mfcc is None:
                    X_mfcc = mfcc.reshape(1, mfcc.shape[0])
                else:
                    X_mfcc = np.vstack([X_mfcc, mfcc])
        self.X_mfcc = X_mfcc
        np.savetxt('data/X_mfcc.csv', X_mfcc)
        
    def save_mfcc_pca(self, pca_dims=100):
        assert(self.X_mfcc_pca is None and self.pca_mfcc is None)
        pca_mfcc = PCA(n_components=pca_dims, random_state=1)
        X_mfcc_pca = pca_mfcc.fit_transform(self.X_mfcc)
        self.pca_mfcc = pca_mfcc
        self.X_mfcc_pca = X_mfcc_pca
        np.savetxt(f'data/X_mfcc_pca{pca_dims}.csv', X_mfcc_pca)
        dump(pca_mfcc, f'data/pca_mfcc{pca_dims}.PCA') 
    
    def save_mfcc_random_crop(self):
        X_mfcc_crop = None
        for mfcc in self.X_mfcc:
            crop = None
            SEG = 10
            SEG_LENGTH = 129
            for j in range(SEG):
                random_start = np.random.randint(0, 1290-SEG_LENGTH)
                random_seg = np.vstack([mfcc[1290*j+random_start : 1290*j+random_start+SEG_LENGTH] for j in range(13)])
                random_seg = random_seg.reshape(1, random_seg.shape[0], random_seg.shape[1])
                if crop is None:
                    crop = random_seg
                else:
                    crop = np.vstack([crop, random_seg])
            if X_mfcc_crop is None:
                X_mfcc_crop = crop
            else:
                X_mfcc_crop = np.vstack([X_mfcc_crop, crop])
        self.X_mfcc_random_crop = X_mfcc_crop
        np.savetxt('data/X_mfcc_random_crop.csv', X_mfcc_crop.reshape(10000, 1677))
    
    def save_mfcc_fixed_crop(self):
        '''
        Evenly divides each song into 10 segments,
        producing a 10000 by 13 by 129 array of MFCC coefficients for the segments.
        Reshapes into 10000*1677 in order to save as a CSV.
        '''
        X_mfcc_crop = None
        for mfcc in self.X_mfcc:
            SEG = 10
            SEG_LENGTH = int(1290/SEG)
            i = 0
            crop = np.stack([np.vstack([mfcc[1290*j+SEG_LENGTH*i : 1290*j+SEG_LENGTH*(i+1)] for j in range(13)]) for i in range(SEG)], axis=0)
            if X_mfcc_crop is None:
                X_mfcc_crop = crop
            else:
                X_mfcc_crop = np.vstack([X_mfcc_crop, crop])
        self.X_mfcc_fixed_crop = X_mfcc_crop
        np.savetxt('data/X_mfcc_fixed_crop.csv', X_mfcc_crop.reshape(10000, 1677))
    
    def save_chroma(self, genres=all_genres, songs=num_songs):
        assert(self.X_chroma is None)
        X_chroma = None
        for g_idx, g in enumerate(genres):
            for s_idx in range(songs):
                y, sr = librosa.load(f'genres/{g}/{g}.000{s_idx:02d}.au')
                y = y[:Y_LIMIT]
                chroma = librosa.feature.chroma_cqt(y, sr=sr, hop_length=512).flatten()
                if X_chroma is None:
                    X_chroma = chroma.reshape(1, chroma.shape[0])
                else:
                    X_chroma = np.vstack([X_chroma, chroma])
        self.X_chroma = X_chroma
        np.savetxt('data/X_chroma.csv', X_chroma)
    
    def save_chroma_pca(self, pca_dims=100):
        assert(self.X_chroma_pca is None and self.pca_chroma is None)
        pca_chroma = PCA(n_components=pca_dims)
        X_chroma_pca = pca_chroma.fit_transform(self.X_chroma, random_state=1)
        self.pca_chroma = pca_chroma
        self.X_chroma_pca = X_chroma_pca
        np.savetxt(f'data/X_chroma_pca{pca_dims}.csv', X_chroma_pca)
        dump(pca_chroma, f'data/pca_chroma{pca_dims}.PCA') 
    
    def load_raw(self):
        self.X_raw = np.loadtxt('data/X.csv')
        
    def load_mfcc(self):
        self.X_mfcc = np.loadtxt('data/X_mfcc.csv')
    
    def load_mfcc_random_crop(self):
        self.X_mfcc_random_crop = np.loadtxt('data/X_mfcc_random_crop.csv').reshape(10000, 13, 129)
        
    def load_mfcc_fixed_crop(self):
        self.X_mfcc_fixed_crop = np.loadtxt('data/X_mfcc_fixed_crop.csv').reshape(10000, 13, 129)
        
    def load_mfcc(self):
        self.X_mfcc = np.loadtxt('data/X_mfcc.csv')
        
    def load_mfcc_pca(self, pca_dims=100):
        self.X_mfcc_pca = np.loadtxt(f'data/X_mfcc_pca{pca_dims}.csv')
        self.pca_mfcc = load(f'data/pca_mfcc{pca_dims}.PCA')
        
    def load_chroma(self):
        self.X_chroma = np.loadtxt('data/X_chroma.csv')
    
    def load_chroma_pca(self, pca_dims=100):
        self.X_chroma_pca = np.loadtxt(f'data/X_chroma_pca{pca_dims}.csv')
        self.pca_chroma = load(f'data/pca_chroma{pca_dims}.PCA') 
    
    def load_Y(self):
        self.Y = np.loadtxt('data/Y.csv')

In [11]:
# # Only need to run this ONCE!
# # Saves features and pca objects to data/...
dl = DataLoader()
# dl.save_raw(genres=all_genres, songs=100)
dl.save_mfcc()
# dl.save_mfcc_pca()
# dl.save_chroma()
# dl.save_chroma_pca()
# dl.save_mfcc_fixed_crop()
# dl.save_mfcc_random_crop()

In [17]:
# Load from CSVs instead of saving
dl = DataLoader()
dl.load_mfcc()
dl.X_mfcc = dl.X_mfcc.reshape((1000,13,-1))
print(dl.X_mfcc.shape)
# dl.load_mfcc_pca()
# dl.load_mfcc_random_crop()
# dl.load_mfcc_fixed_crop()
# dl.load_chroma()
# dl.load_chroma_pca()
# dl.load_Y()
# print(dl.X_mfcc.shape, dl.X_mfcc_pca.shape, dl.X_chroma.shape, dl.X_chroma_pca.shape, dl.Y.shape)

(1000, 13, 1290)


In [16]:
X_mfcc = None
for g_idx, g in enumerate(all_genres):
    for s_idx in range(100):
        y, sr = librosa.load(f'genres/{g}/{g}.000{s_idx:02d}.au')
        y = y[:Y_LIMIT]
        mfcc = librosa.feature.mfcc(y, sr=sr, hop_length=512, n_mfcc=13).shape
        print(mfcc)
        break
    break

(13, 1290)


## Preprocessing

In [None]:
# dunno about this just like copy paste this in later I guess

## PyTorch DataLoader

In [None]:
# making torch style dataset and dataloader

import torch
from torch.utils import data

class MusicDataset(data.Dataset):

    def __init__(self, X, Y):
        self.X = torch.from_numpy(X)
        self.Y = torch.from_numpy(Y)

    def __len__(self):
        assert(self.X.size()[0] == self.Y.size()[0])
        return self.X.size()[0]

    def __getitem__(self, index):
        data = self.X[index,].reshape((1,self.X.size()[1]))
        
        # normalizing
        data = (data - data.mean())/data.std()
        
        label = self.Y[index]
        return (data, label)
    
trainset = MusicDataset(X_train, Y_train)
testset = MusicDataset(X_test, Y_test)

print(len(trainset))
print(len(testset))

genre_trainloader = data.DataLoader(trainset, batch_size = 64, shuffle = True)
genre_testloader = data.DataLoader(testset, batch_size = 64, shuffle = False)

# Models

In [18]:
import torch.nn as nn
import torch.nn.functional as F
import math
from torchsummary import summary



## DCNN + RNN

In [None]:
# the idea is that the input is 750 x 1 x 13 x 1290
# or more generally, N x Cin x Feat x Time
# don't forget to unsqueeze


# X is 750 x 1 x 13 x 1290

params = {
#     'conv_in_size' = tuple(X.size)
    'cnn_in_size' = (750, 1, 13, 1290)
    'cnn_out_channels' = 10
    'cnn_kernel' = 7
    'rnn_in_size' = 5
}

class CNN(nn.Module):
    def __init__(self, params):
        super(CNN, self).__init__()
        
        num_features = params['cnn_in_size'][2]
        out_channels = params['cnn_out_channels']
        ker = params['cnn_kernel']
        out_features = params['rnn_in_size']
        
        out_size = num_features - ker[0] + 1
        
        self.conv = nn.Conv2d(1, out_channels, ker)
        self.fc = nn.Linear(out_channels*out_size, out_features)
        
    def forward(self, x):
        out = self.conv(x)
        # out.size() = N x out_channels x out_size x time
        out = out.permute(0,3,1,2)
        # out.size() = N x time x out_channels x out_size
        out = out.view(out.size()[0], out.size()[1], -1)
        # out.size() = N x time x out_channels*out_size
        out = self.fc(out)
        # out.size() = N x time x out_features
        out = F.relu(out)
        out = out.permute(1, 0, 2)
        # out.size() = time x N x out_features
        return out
    
class RNN(nn.Module):
    def __init__(self, params):
        super(RNN, self).__init__()
        
    

## CNN

In [None]:
# class Conv_Block(nn.Module):
    
#     def __init__(self, in_planes, planes, kernel_size, stride, pool_size):
        
#         super(Conv_Block, self).__init__()
        
#         self.conv = nn.Conv1d(in_channels = in_planes, out_channels = planes, kernel_size = kernel_size, stride=stride)
#         self.dropout = nn.Dropout(p=0.5)
#         self.maxpool = nn.AvgPool1d(kernel_size = pool_size)
#         self.bnorm = nn.BatchNorm1d(num_features = planes)
    
#     def forward(self, x):
#         x = self.conv(x)
#         x = self.dropout(x)
#         x = self.maxpool(x)
#         x = self.bnorm(x)
#         x = F.relu(x)
        
#         return x

# class Conv(nn.Module):
    
#     def __init__(self, params):    
#         super(Conv, self).__init__()
        
#         filters = params['filters']
#         kernel_sizes = params['kernel_sizes']
#         strides = params['strides']
#         avg_pool_sizes = params['max_pool_sizes']
        
#         assert(len(filters) == len(kernel_sizes))
#         assert(len(filters) == len(strides))
#         assert(len(filters) == len(avg_pool_sizes))
        
#         prev_outplanes = 1
#         prev_outsize = params['input_length']
        
#         layers = []
        
#         for i in range(len(filters)):
#             inplanes = prev_outplanes
#             outplanes = inplanes * filters[i]
#             out_size = math.floor((math.floor((prev_outsize - kernel_sizes[i]) / float(strides[i]))+\
#                                 1)/float(avg_pool_sizes[i]))
            
#             prev_outsize = out_size
#             prev_outplanes = outplanes
            
#             new_block = Conv_Block(inplanes, outplanes, kernel_sizes[i], strides[i], avg_pool_sizes[i])
            
#             layers.append(new_block)
            
#         self.convs = nn.Sequential(*layers)
        
#         in_features = prev_outsize * prev_outplanes
        
#         self.fc = nn.Linear(in_features = in_features, out_features = 10)
        
#     def forward(self, x):
#         x = self.convs(x)
#         x = x.view(x.size()[0], -1)
#         x = self.fc(x)
#         return x

In [None]:
# Debugging

# big big convnet
# conv_params = {
#     'input_length' : 16770,
#     'filters' : [8, 8, 8, 8],
#     'kernel_sizes' : [64, 32, 16, 8],
#     'strides' : [8, 4, 2, 1],
#     'max_pool_sizes' : [2, 2, 2, 2]
# }

# smaller convnet
# conv_params = {
#     'input_length' : 16770,
#     'filters' : [8, 16, 64],
#     'kernel_sizes' : [64, 8, 4],
#     'strides' : [8, 4, 2],
#     'max_pool_sizes' : [16, 4, 2]
# }

# smallest convnet
conv_params = {
    'input_length' : 16770,
    'filters' : [8, 64],
    'kernel_sizes' : [64, 4],
    'strides' : [8, 2],
    'max_pool_sizes' : [32, 4]
}

# observation: this net works as long as we don't run out of elements to convolute across (no padding)

conv_test = Conv(conv_params)
# clipper = WeightClipper()
# conv_test.apply(clipper)
summary(conv_test, (1,16770))

## DCNN

In [None]:
# class DConv_Block(nn.Module):
    
#     def __init__(self, in_planes, planes, kernel_size, dialation, pool_size):
        
#         super(DConv_Block, self).__init__()
        
#         padding = int((kernel_size + (kernel_size - 1)*(dialation - 1) - 1)/2)
        
#         self.dconv = nn.Conv1d(in_channels = in_planes, out_channels = planes,
#                                kernel_size = kernel_size, stride=1, padding=padding, dilation=dialation)
#         self.dropout = nn.Dropout(p=0.5)
#         self.avgpool = nn.AvgPool1d(kernel_size = pool_size)
#         self.bnorm = nn.BatchNorm1d(num_features = planes)
    
#     def forward(self, x):
#         x = self.dconv(x)
#         x = self.dropout(x)
#         x = self.avgpool(x)
#         x = self.bnorm(x)
#         x = F.relu(x)
#         return x

# class DConv(nn.Module):
    
#     def __init__(self, params):    
#         super(DConv, self).__init__()
        
#         filters = params['filters']
#         kernel_sizes = params['kernel_sizes']
#         dialations = params['dialations']
#         avg_pool_sizes = params['avg_pool_sizes']
        
#         assert(len(filters) == len(kernel_sizes))
#         assert(len(filters) == len(dialations))
#         assert(len(filters) == len(avg_pool_sizes))
        
#         prev_outplanes = 1
#         prev_outsize = params['input_length']

#         layers = []
        
#         for i in range(len(filters)):
#             inplanes = prev_outplanes
#             outplanes = inplanes * filters[i]
            
#             out_size = math.floor(prev_outsize / float(avg_pool_sizes[i]))
            
#             prev_outsize = out_size
#             prev_outplanes = outplanes
            
#             new_block = DConv_Block(inplanes, outplanes, kernel_sizes[i], dialations[i], avg_pool_sizes[i])
            
#             layers.append(new_block)
            
#         self.dconvs = nn.Sequential(*layers)
        
#         in_features = prev_outsize * prev_outplanes
        
#         self.fc = nn.Linear(in_features = in_features, out_features = 10)
        
#     def forward(self, x):
#         x = self.dconvs(x)
#         x = x.view(x.size()[0], -1)
#         x = self.fc(x)
#         return x


In [None]:
# Debugging
dconv_params = {
    'input_length' : 16770,
    'filters' : [16, 4],
    'kernel_sizes': [64, 16],
    'dialations': [8, 2],
    'avg_pool_sizes': [32, 4]
}

# observation: this net works as long as dialation_2 is even

dconv_test = DConv(dconv_params)
summary(dconv_test, (1,16770))

## Training and Testing Code

In [None]:
# defining training and testing functions

def train(net, criterion, optimizer, num_epochs, trainloader):
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    net.to(device)
    
    net.train()
    
    for epoch in range(num_epochs):
        
        correct = 0
        total = 0
        
        print("Epoch: " + str(epoch+1))
        running_loss = 0.0
        for data in trainloader:
            
            inputs, labels = data
            
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            outputs = net(inputs)
            
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()            
            running_loss += loss.item()
            
        print("Loss: " + str(running_loss / 750.0) + ' Accuracy: ' + str((100 * correct / total)) + '%')
    print('Finished Training')

def test(net, testloader):
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    net.to(device)
    
    net.eval()
    
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data in testloader:
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = net(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    # todo: calculate length of testloader instead of hardcoding
            
    print('Accuracy of the network on 250 test datapoints: ' + str((100 * correct / total)) + '%')

# Evaluating Models

In [None]:
# from brendan
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.utils.multiclass import unique_labels
from sklearn.decomposition import PCA
from sklearn.cross_decomposition import CCA
from sklearn.utils import shuffle
import seaborn as sns
import matplotlib.pyplot as plt

def plot_confusion_matrix(test_labels, predictions, title):
    ax= plt.subplot()
    cm = confusion_matrix(test_labels, predictions)
    sns.heatmap(cm, annot=True, ax = ax, cmap = sns.cm.rocket_r); #annot=True to annotate cells

    # labels, title and ticks
    ax.set_xlabel('Predicted labels');ax.set_ylabel('True labels'); 
    ax.set_title(f'{title} Confusion Matrix'); 
    ax.set_ylim(top=0, bottom=10)
    ax.xaxis.set_ticklabels(all_genres); ax.yaxis.set_ticklabels(all_genres);
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
                 rotation_mode="anchor")
    plt.setp(ax.get_yticklabels(), rotation=45, ha="right",
                 rotation_mode="anchor")
    plt.show()

In [None]:
# test_labels = Y_test

def net_confusion_matrix(net, testloader, test_labels, title):
    
    separate_predictions = []
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    net.to(device)
    
    net.eval()
    
    with torch.no_grad():
        for data in testloader:
            
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = net(inputs)
            _, predicted = torch.max(outputs.data, 1)
            separate_predictions.append(predicted.numpy())
    
    predictions = np.concatenate(separate_predictions)
    plot_confusion_matrix(test_labels, predictions, title)            

In [None]:
# attempt at regularization

class WeightClipper():

    def __init__(self):
        pass

    def __call__(self, module):
        if hasattr(module, 'weight'):
            w = module.weight.data
            w = w.clamp(-1,1)

clipper = WeightClipper()

In [None]:
conv_net = Conv(conv_params)

# regularizing
conv_net.apply(clipper)

conv_crit = nn.CrossEntropyLoss()
conv_opt = torch.optim.Adam(conv_net.parameters(), lr=0.001)

In [None]:
dconv_net = DConv(dconv_params)

# regularizing
dconv_net.apply(clipper)

dconv_crit = nn.CrossEntropyLoss()
dconv_opt = torch.optim.Adam(dconv_net.parameters(), lr=0.001)

In [None]:
# %%time
# # training and evaluating CNN

# train(conv_net, conv_crit, conv_opt, 30, genre_trainloader)
# test(conv_net, genre_testloader)
# net_confusion_matrix(conv_net, genre_testloader, Y_test, "Preliminary CNN")

In [None]:
# %%time
# # training and evaluating DCNN

# train(dconv_net, dconv_crit, dconv_opt, 30, genre_trainloader)
# test(dconv_net, genre_testloader)
# net_confusion_matrix(dconv_net, genre_testloader, Y_test, "Preliminary DCNN")

TODO:
- try different datapreprocessing: change # of channels for 1d stuff so that we get slices through all the different features of the MFCC 
- rewrite code for DCNN generation to support more versitile dimensionalities
- make RNN: LSTM + (D)CNN
- uncomment out bnorm in CNN generation code