In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from google.colab import drive
import pickle

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms

## Create DataLoader

In [0]:
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
def load_pkl_data():
  X = pickle.load(open('drive/My Drive/Audio analytics/pickle/X_train_log_mel.pkl', 'rb'))
  y = pickle.load(open('drive/My Drive/Audio analytics/pickle/y_train_log_mel.pkl', 'rb'))
  X_val = pickle.load(open('drive/My Drive/Audio analytics/pickle/X_test_log_mel.pkl', 'rb'))

  return X, y, X_val

In [0]:
def prepare_shape(feature):
  tmp = feature
  N = 128
  while tmp.shape[1] < N:
    tmp = np.hstack((tmp, tmp))
  tmp = tmp[np.newaxis, :, :N]
  return tmp

In [0]:
class dataset(Dataset):
  def __init__(self, x, y=None):
    # Random shift
    shift = np.random.randint(x.shape[1])
    self.x = np.roll(x, shift, axis=1)
    #self.x = x
    self.y = y
    
  def __len__(self):
    return len(self.x)

  def __getitem__(self, idx):
    if self.y is not None:
        return self.x[idx], self.y[idx]
    return self.x[idx]

In [0]:
X, y, X_val = load_pkl_data()

le = LabelEncoder()
le.fit(y)
y = le.transform(y)
classes = le.classes_
num_classes = len(le.classes_)

X = np.asarray([prepare_shape(x) for x in X])
X_val = np.asarray([prepare_shape(x) for x in X_val])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True)

In [0]:
print('Number of class:', num_classes)
print('Train Set\nX:', X_train.shape, ', y:', y_train.shape)
print('Test Set\nX:', X_test.shape, ', y:', y_test.shape)
print('Val Set\nX:', X_val.shape)

Number of class: 41
Train Set
X: (4546, 1, 64, 128) , y: (4546,)
Test Set
X: (1137, 1, 64, 128) , y: (1137,)
Val Set
X: (3790, 1, 64, 128)


In [0]:
train_dset = dataset(X_train, y_train)
test_dset = dataset(X_test, y_test)
val_dset = dataset(X_val)

train_loader = DataLoader(train_dset, batch_size=64, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dset, batch_size=64, shuffle=False, num_workers=0)
val_loader = DataLoader(val_dset, batch_size=64, shuffle=False, num_workers=0)

## Models

In [0]:
class ConvNet(nn.Module):
  def __init__(self):
    super(ConvNet,self).__init__()

    self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1)
    self.conv4 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1)
    self.conv5 = nn.Conv2d(in_channels=512, out_channels=256, kernel_size=3, stride=1, padding=1)
        
    self.mp = nn.MaxPool2d(kernel_size=2, stride=2)
    self.globalmp = nn.MaxPool2d(kernel_size=(16,32), stride=1)

    self.dropout = nn.Dropout(p=0.3)

    self.bn1 = nn.BatchNorm2d(num_features=64)
    self.bn2 = nn.BatchNorm2d(num_features=128)
    self.bn3 = nn.BatchNorm2d(num_features=256)
    self.bn4 = nn.BatchNorm2d(num_features=512)
    self.bn5 = nn.BatchNorm2d(num_features=256)

    self.flat = nn.Flatten()
    self.fc1 = nn.Linear(256, 41)
        
  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = F.relu(x)
    x = self.mp(x)
    x = self.dropout(x)

    x = self.conv2(x)
    x = self.bn2(x)
    x = F.relu(x)
    x = self.dropout(x)

    x = self.conv3(x)
    x = self.bn3(x)
    x = F.relu(x)
    x = self.mp(x)
    x = self.dropout(x)

    x = self.conv4(x)
    x = self.bn4(x)
    x = F.relu(x)
    x = self.dropout(x)

    x = self.conv5(x)
    x = self.bn5(x)
    x = F.relu(x)

    x = self.globalmp(x)
    x = self.flat(x)
    x = self.fc1(x)
    
    return x

## Train & Test

In [0]:
# TRAINING THE NETWORK

def train(model, device, train_loader, optimizer, criterion):
  model.train()
  loss_list = []
  outputs = []
  targets = []
    
  for sample_batched in train_loader:
    data, target = sample_batched
    data = torch.autograd.Variable(data.to(device))
    target = torch.autograd.Variable(target.to(device))

    output = model(data.float())
    loss = criterion(output, target.long())

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    outputs.append(output.detach().numpy().argmax(axis=1))
    targets.append(target)
    loss_list.append(loss.item())
    
  loss = np.mean(loss_list)
  score = f1_score(np.hstack(targets), np.hstack(outputs), average='macro')
  
  print('*TRAIN*')
  print('Loss:', loss)
  print('F1-score:', score)

  return loss, score

In [0]:
# TESTING THE MODEL

def test(model, device, test_loader, criterion):
  model.eval()
  loss_list = []
  outputs = []
  targets = []

  with torch.no_grad():
    for sample_batched in test_loader:
      data, target = sample_batched
      data = torch.autograd.Variable(data.to(device))
      target = torch.autograd.Variable(target.to(device))
      
      output = model(data.float())
      loss = criterion(output, target.long())
            
      outputs.append(output.detach().numpy().argmax(axis=1))
      targets.append(target)
      loss_list.append(loss.item())

  loss = np.mean(loss_list) 
  score = f1_score(np.hstack(targets), np.hstack(outputs), average='macro')

  print("*TEST*")
  print('Loss:', loss)
  print('F1-score:', score)

  return loss, score

In [0]:
def prediction(model, val_loader, device):
  model.eval()
  outputs = []

  with torch.no_grad():
    for sample_batched in val_loader:
      data = sample_batched
      data = torch.autograd.Variable(data.to(device))
      
      output = model(data.float())
      outputs.append(output.detach().numpy().argmax(axis=1))

  predictions = np.hstack(np.array(outputs))
  return predictions

In [0]:
def save_predictions(predictions, le):
  decoder = le.inverse_transform(y_pred)
  forecast = pd.Series(decoder)
  df_pred = pd.read_csv('drive/My Drive/Audio analytics/data/sample_submission.csv')
  df_pred['label'] = forecast
  df_pred.to_csv('drive/My Drive/Audio analytics/data/test.csv', index=None)

In [0]:
def plot(train, test, n_epoch, title, ylabel):
  epochs = np.arange(1, n_epoch+1)

  axes = plt.gca()
  axes.set_ylim([0, max(max(train), max(test))])
  plt.plot(epochs, train, 'r', label='train')
  plt.plot(epochs, test, 'b', label='test')
  plt.title(title)
  plt.xlabel('epoch number')
  plt.ylabel(ylabel)
  plt.legend()
  plt.grid()
  plt.show()

In [0]:
criterion = nn.CrossEntropyLoss()
model = EffNet()
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

n_epoch = 50

train_loss = np.zeros((n_epoch))
train_score = np.zeros((n_epoch))
test_loss = np.zeros((n_epoch))
test_score = np.zeros((n_epoch))
best_score = 0.0

for e in range(n_epoch):
  print("*********************************")
  print("EPOCH #", e)
  train_loss[e], train_score[e] = train(model, device, train_loader, optimizer, criterion)
  test_loss[e], test_score[e] = test(model, device, test_loader, criterion)

  if test_score[e] > best_score:
    best_score = test_score[e]
    torch.save(model.state_dict(), 'drive/My Drive/Audio analytics/convnet.pth')
    #torch.save({'epoch': epoch, 
    #            'model_state_dict': model.state_dict(), 
    #            'optimizer_state_dict': optimizer.state_dict(), '
    #            loss': loss}, 'drive/My Drive/Audio analytics/convnet.pth')

plot(train_loss, test_loss, n_epoch, title='Loss', ylabel='loss value')
plot(train_score, test_score, n_epoch, title='F1-score', ylabel='f1-score value')

In [0]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model_dict = torch.load('drive/My Drive/Audio analytics/convnet.pth')
model = ConvNet()
model.load_state_dict(model_dict)

y_pred = prediction(model, val_loader, device)
save_predictions(y_pred, le)