# DCNN

In [None]:
import os
import numpy as np
import time
import copy
from glob import glob

import torch
import torchvision
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, models
import torchvision.transforms as transforms
from torch.utils.data.sampler import SubsetRandomSampler
from PIL import Image
from torch.autograd import Variable
import random

import librosa
import pandas as pd
from sklearn.model_selection import train_test_split
import pathlib
import csv
from sklearn.preprocessing import scale

# Extract mfcc features from GTZAN datasets

In [None]:
cmap = plt.get_cmap('inferno')
data=[]
plt.figure(figsize=(10,10))
genres = 'blues classical country disco hiphop jazz metal pop reggae rock'.split()
for g in genres:
    pathlib.Path(f'img_data/{g}').mkdir(parents=True, exist_ok=True)     
    for filename in os.listdir(f'./genres/{g}'):
        songname = f'./genres/{g}/{filename}'
        y,sr = librosa.load(songname, mono=True, duration=5)
        S=librosa.feature.mfcc(y=y,sr=sr,n_mfcc=50)
        #S = scale(S, axis=1)
        #data.append(y)
        #D= np.abs(librosa.stft(y))**2
        #S = librosa.feature.melspectrogram(S=D)
        data.append(S)
        #S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128,fmax=8000)
data1=np.array(data)
data1.shape

In [None]:
data2= pd.read_csv('data.csv')
data2.head()
# Dropping unneccesary columns
data2 = data2.drop(['filename'],axis=1)
genres = 'blues classical country disco hiphop jazz metal pop reggae rock'
genre=[x for x in genres.split(' ')]
for i in range(len(genre)):
    data2=data2.replace(genre[i],i)
    
data2['label'].values.shape

# Training Set, Testing Set, Validing Set

In [None]:
trainX, testX, trainY, testY = train_test_split(data1,data2['label'].values , test_size=0.2)
valX=testX[100:]
valY=testY[100:]
testX=testX[:100]
testY=testY[:100]
print (trainX.shape)
print (trainY.shape)
print (valX.shape)
print (valY.shape)
print (testX.shape)
print(testY.shape)

In [None]:
from torch.utils.data import TensorDataset, DataLoader
batch_size=20

train_data = TensorDataset(torch.from_numpy(trainX), torch.from_numpy(trainY))
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
val_data = TensorDataset(torch.from_numpy(valX), torch.from_numpy(valY))
valid_loader = DataLoader(val_data, shuffle=True, batch_size=batch_size)
test_data = TensorDataset(torch.from_numpy(testX), torch.from_numpy(testY))
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)

In [None]:
import logging
handler=logging.basicConfig(level=logging.INFO)
lgr = logging.getLogger(__name__)
# check if CUDA is available
use_cuda = torch.cuda.is_available()
# move tensors to GPU if CUDA is available
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
Tensor = FloatTensor

lgr.info("USE CUDA=" + str (use_cuda))

In [None]:
def XnumpyToTensor(x_data_np):
    x_data_np = np.array(x_data_np, dtype=np.float32)        
    print(x_data_np.shape)
    print(type(x_data_np))

    if use_cuda:
        lgr.info ("Using the GPU")    
        X_tensor = Variable(torch.from_numpy(x_data_np).cuda()) # Note the conversion for pytorch    
    else:
        lgr.info ("Using the CPU")
        X_tensor = Variable(torch.from_numpy(x_data_np)) # Note the conversion for pytorch
    
    print(type(X_tensor.data)) # should be 'torch.cuda.FloatTensor'            
    print((X_tensor.data.shape)) # torch.Size([108405, 29])
    return X_tensor


# Convert the np arrays into the correct dimention and type
# Note that BCEloss requires Float in X as well as in y
def YnumpyToTensor(y_data_np):    
    y_data_np=y_data_np.reshape((y_data_np.shape[0],1)) # Must be reshaped for PyTorch!
    print(y_data_np.shape)
    print(type(y_data_np))

    if use_cuda:
        lgr.info ("Using the GPU")            
    #     Y = Variable(torch.from_numpy(y_data_np).type(torch.LongTensor).cuda())
        Y_tensor = Variable(torch.from_numpy(y_data_np)).type(torch.LongTensor).cuda()  # BCEloss requires Float        
    else:
        lgr.info ("Using the CPU")        
    #     Y = Variable(torch.squeeze (torch.from_numpy(y_data_np).type(torch.LongTensor)))  #         
        Y_tensor = Variable(torch.from_numpy(y_data_np)).type(torch.LongTensor)  # BCEloss requires Float        

    print(type(Y_tensor.data)) # should be 'torch.cuda.FloatTensor'
    print(y_data_np.shape)
    print(type(y_data_np))    
    return Y_tensor

# Model

In [None]:
# use_cuda=False
X_tensor_train= XnumpyToTensor(trainX) # default order is NBC for a 3d tensor, but we have a 2d tensor
X_shape=X_tensor_train.data.size()

# Dimensions
# Number of features for the input layer
N_FEATURES=trainX.shape[1]
# Number of rows
NUM_ROWS_TRAINNING=trainX.shape[0]
# this number has no meaning except for being divisable by 2
N_MULT_FACTOR=4 # min should be 4
# Size of first linear layer
N_HIDDEN=N_FEATURES*2
# CNN kernel size
N_CNN_KERNEL=2
MAX_POOL_KERNEL=4

DEBUG_ON=False

def debug(x):
    if DEBUG_ON:
        print ('(x.size():' + str (x.size()))
    
class Net2(nn.Module):    
    def __init__(self, n_feature, n_hidden, n_output, n_cnn_kernel, n_mult_factor=N_MULT_FACTOR,n=8):
        super(Net2, self).__init__()
        self.n_feature=n_feature
        self.n_hidden=int(n_hidden)
        self.n_output= n_output 
        self.n_cnn_kernel=n_cnn_kernel
        self.n_mult_factor=n_mult_factor
        #self.n_l2_hidden=self.n_hidden * (self.n_mult_factor - self.n_cnn_kernel + 3)
        self.n_l2_hidden=8320
#         self.n_out_hidden=int (self.n_l2_hidden/2)
                        
     
        self.c1= nn.Sequential(            
            torch.nn.Conv1d(self.n_feature, self.n_hidden, 
                            kernel_size=(self.n_cnn_kernel,), stride=(1,), padding=(1,),dilation=4),
            torch.nn.AvgPool1d(self.n_cnn_kernel),
            torch.nn.Dropout(p=1 -.75),            
            torch.nn.LeakyReLU (0.1),
            torch.nn.BatchNorm1d(self.n_hidden, eps=1e-05, momentum=0.1, affine=True)        
        )         
        self.c2= nn.Sequential(            
            torch.nn.Conv1d(self.n_hidden, 2*self.n_hidden, 
                            kernel_size=(self.n_cnn_kernel,), stride=(1,),padding=(1,), dilation=4),
            torch.nn.AvgPool1d(self.n_cnn_kernel),
            torch.nn.Dropout(p=1 -.75),            
            torch.nn.LeakyReLU (0.1),
            torch.nn.BatchNorm1d(2*self.n_hidden, eps=1e-05, momentum=0.1, affine=True)        
        )    
        self.out = nn.Sequential(
            torch.nn.Linear(self.n_l2_hidden,
                            self.n_output),  
        )                
        self.sof=nn.Softmax()

        
    def forward(self, x):
        #debug(x)
        #print('l1 in: ',x.shape)
        varSize=x.data.shape[0] # must be calculated here in forward() since its is a dynamic size        
        #x=self.l1(x) 
        #print('l1 out: ',x.shape)
        #debug(x)
        # for CNN        
        #x = x.view(varSize,1,-1)
        #print('c1 input: ',x.shape)
        debug(x)
        x=self.c1(x)
        #print('c1 output: ',x.shape)
        debug(x)
        #x = x.view(varSize,self.n_feature,-1)
        #print('c2 input: ',x.shape)
        x=self.c2(x)
        #print('c2 output: ',x.shape)
        #debug(x)
        # for Linear layer
        #x = x.view(varSize, self.n_hidden * (self.n_mult_factor -self.n_cnn_kernel + 3))
        x = x.view(varSize, -1)
        #print('l2 input: ',x.shape)
        debug(x)
        x=self.out(x)   
        #print('l2 out: ',x.shape)
        debug(x)
        x=self.sof(x)
        #print('sof out: ',x.shape)
        return x
    

net = Net2(n_feature=N_FEATURES, n_hidden=N_HIDDEN, n_output=10, n_cnn_kernel=N_CNN_KERNEL)   # define the network    
if use_cuda:
    net=net.cuda() 
lgr.info(net)
#b = net(X_tensor_train)
#print ('(b.size():' + str (b.size())) 

# Loss function & Optimizer

In [None]:
loss_func = nn.CrossEntropyLoss()

# specify optimizer
optimizer = optim.Adam(net.parameters(), lr=0.0001)
if use_cuda:
    lgr.info ("Using the GPU")    
    net.cuda()
    loss_func.cuda()

lgr.info (optimizer)
lgr.info (loss_func)

# Train

In [None]:
import time
train_on_gpu=torch.cuda.is_available()
start_time = time.time()    
epochs=1500
all_losses = []
#batch_size=80
X_tensor_train= XnumpyToTensor(trainX)
#Y_tensor_train= YnumpyToTensor(trainY)
counter=0
#print(type(X_tensor_train.data), type(Y_tensor_train.data)) # should be 'torch.cuda.FloatTensor'
net.train()
# From here onwards, we must only use PyTorch Tensors
for step in range(epochs):
  train_loss=0.0

  for X_t_train, Y_t_train in train_loader:   
    counter+=1
    if (train_on_gpu):
        X_t_train,Y_t_train=X_t_train.to('cuda'),Y_t_train.to('cuda')
    
    out = net(X_t_train.float())                 # input x and predict based on x
    loss = loss_func(out, Y_t_train.squeeze_())     # must be (1. nn output, 2. target), the target label is NOT one-hotted
    #loss = loss_func(out, Y_t_train.to(dtype=torch.float))   
    optimizer.zero_grad()   # clear gradients for next train
    loss.backward()         # backpropagation, compute gradients
    optimizer.step()        # apply gradients

    #train_loss+=cost.item()

    if counter % 100 == 0:
        # Get validation loss

        val_losses = []
        net.eval()
        for inputs, labels in valid_loader:
            
            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history


            if(train_on_gpu):
                inputs, labels = inputs.cuda(), labels.cuda()

            output= net(inputs.float())
            val_loss = loss_func(output, labels)

            val_losses.append(val_loss.item())

        
        print("Epoch: {}/{}...".format(step+1, epochs),
              "Step: {}...".format(counter),
              "Loss: {:.6f}...".format(loss.item()),
              "Val Loss: {:.6f}".format(np.mean(val_losses)))      



# Test

In [None]:
test_losses = [] # track loss
num_correct = 0


net.eval()
# iterate over test data
for inputs, labels in test_loader:


    if(train_on_gpu):
        inputs, labels = inputs.cuda(), labels.cuda()
    
    # get predicted outputs
    output= net(inputs.float())
    
    # calculate loss
    test_loss = loss_func(output, labels)
    test_losses.append(test_loss.item())
    
    # convert output probabilities to predicted class (0 or 1)
    _,pred = torch.max(torch.round(output.squeeze()).data,1)  # rounds to the nearest integer
    #print('pred: ',pred)
    #print('labels: ',labels.shape)
    # compare predictions to true label
    correct_tensor = pred.eq(labels.view_as(pred))
    correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())
    num_correct += np.sum(correct)


# -- stats! -- ##
# avg test loss
print("Test loss: {:.3f}".format(np.mean(test_losses)))

# accuracy over all test data
test_acc = num_correct/len(test_loader.dataset)
print("Test accuracy: {:.3f}".format(test_acc))    
    