In [1]:
import functools
from dataclasses import dataclass
import os
from glob import glob
import sys

import numpy as np
import pandas as pd
import random

import torch
import torchvision
from torch import nn
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
from torchvision.transforms import Compose, Normalize, Resize, ToTensor, ColorJitter
from torchvision.transforms import RandomCrop, RandomRotation, RandomHorizontalFlip, RandomVerticalFlip
from torchvision.models import resnet18

import tqdm
from tqdm.auto import tqdm, trange
from sklearn.metrics import accuracy_score

from google.colab import drive
drive.mount('/content/drive')

from IPython.core.display import clear_output
import cv2
from PIL import Image

Mounted at /content/drive


In [None]:
!pip install wandb
!wandb login
import wandb

In [None]:
!unrar x "drive/MyDrive/Colab Notebooks/task_audio/train.part1.rar"
clear_output()

In [None]:
!unrar x "drive/MyDrive/Colab Notebooks/task_audio/val.rar"
clear_output()

In [45]:
ROOT_MODEL = "./drive/MyDrive/Colab Notebooks/task_audio"

## Dataset, Dataloader

In [5]:
def set_random_seed(seed):
    torch.backends.cudnn.deterministic = True
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
set_random_seed(42)

In [6]:
keys = {"noisy" : 0, "clean" : 1}

In [7]:
@dataclass(frozen=True)
class SpectogramDataset(Dataset):
    dirname: str
    transform: torchvision.transforms
    extension: str = "npy"

    @property
    @functools.lru_cache(1)
    def files(self):
        return glob(f"{self.dirname}/**/*.{self.extension}", recursive=True)

    def __len__(self):
        return len(self.files)

    def _load_raw(self, filename: str):
        return torch.from_numpy(np.load(filename)).unsqueeze(0)
    
    def _check_shape(self, img):
        if img.shape[1] < 80:
            n_repeats = 80 // img.shape[1] + 1
            img = img.repeat(1, n_repeats, 1)    
        return img
    
    def __getitem__(self, idx):
        mel_file = self.files[idx]
        mel = self._load_raw(mel_file)
        mel = self._check_shape(mel)
        if self.transform is not None:
            mel = self.transform(img=mel)
        label = keys[mel_file.split('/')[-3]]
        return mel, label

In [8]:
batch_size = 32
train_transform=Compose(
        [
            RandomCrop(size = (80, 80)),
            Resize((224, 224)),
            RandomHorizontalFlip(p = 0.5)
        ]
    )
val_transform = Compose(
        [
            RandomCrop(size = (80, 80)),
            Resize((224, 224)),
        ]
    )

train_dataset = SpectogramDataset("train", train_transform)
val_dataset = SpectogramDataset("val", val_transform)

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size = batch_size, shuffle = True)

In [9]:
next(iter(train_dataloader))[0].shape

torch.Size([32, 1, 224, 224])

In [10]:
for image, label in train_dataloader:
  if image.shape != torch.Size([batch_size,1, 224, 224]):
      print("not passed")
  break

##Model, training

In [None]:
best_accuracy = 0

In [28]:
def train_one_epoch(model, train_dataloader, criterion, optimizer, device="cuda:0", epoch = 0):

    model = model.to(device).train()
    total_loss = 0
    num_batches = 0
    all_losses = []
    total_predictions = np.array([])
    total_labels = np.array([])

    for images, labels in tqdm(train_dataloader, desc=f"Training, epoch {epoch}", leave=False):    
        # Move Batch to GPU
        images = images.to(device).float()
        labels = labels.to(device)
        predicted = model(images)

        loss = criterion(predicted, labels)
        accuracy = (predicted.argmax(1) == labels).float().mean()

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss.item()
        total_predictions = np.append(total_predictions, predicted.argmax(1).cpu().detach().numpy())
        total_labels = np.append(total_labels, labels.cpu().detach().numpy())
        num_batches += 1 # ?
        all_losses.append(loss.detach().item())

    metrics = {"train_loss": total_loss / num_batches}
    metrics.update({"train_accuracy": (total_predictions == total_labels).mean()})

    wandb.log({"train_loss": metrics["train_loss"]})
    wandb.log({"train_accuracy": metrics["train_accuracy"]})



def validate(model, val_dataloader, criterion, device="cuda:0", epoch = 0):

    model = model.eval()
    total_loss = 0
    num_batches = 0
    total_predictions = np.array([])
    total_labels = np.array([])

    with torch.no_grad():
        for images, labels in tqdm(val_dataloader, desc=f"Validating, epoch {epoch}", leave=False):
               
            images = images.to(device).float()
            labels = labels.to(device)
            predicted = model(images)

            loss = criterion(predicted, labels)
            accuracy = (predicted.argmax(1) == labels).float().mean()

            total_loss += loss.item()
            total_predictions = np.append(total_predictions, predicted.argmax(1).cpu().detach().numpy())
            total_labels = np.append(total_labels, labels.cpu().detach().numpy())
            num_batches += 1

        metrics = {"loss": total_loss / num_batches}
        metrics.update({"val_accuracy": (total_predictions == total_labels).mean()})

        wandb.log({"val_loss": metrics["loss"]})
        wandb.log({"val_accuracy": metrics["val_accuracy"]})
        
        #Save best model
        if metrics["val_accuracy"] > best_accuracy:
            best_accuracy = metrics["val_accuracy"]
            filename_pth = ROOT_MODEL + '/audio_classification_accuracy_{:.4f}_epoch_{}.pth'.format(
                metrics["val_accuracy"], epoch
            )
            torch.save(model.state_dict(), filename_pth)

    return total_loss, total_predictions, total_labels



def train(model, train_dataloader, val_dataloader, criterion, optimizer, device="cuda:0", n_epochs=10, scheduler=None):
    model.to(device)
    for epoch in range(n_epochs):
        # Train, evaluate, print accuracy
        train_one_epoch(model, train_dataloader, criterion, optimizer, device, epoch)
        losses, predictions, labels = validate(model, val_dataloader, criterion, device, epoch)
        clear_output()

In [33]:
lr = 0.001
n_epochs = 10


model = resnet18(pretrained = True)

optimizer = torch.optim.Adam(model.parameters(), lr = lr) 
criterion = nn.CrossEntropyLoss()

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.99) 
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

In [34]:
model.fc = nn.Linear(512, 2)

for param in model.parameters():
    param.requires_grad = True

In [24]:
model.conv1

Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

In [35]:
model.conv1 = nn.Conv2d(1, 64, kernel_size = (7,7), stride=(2, 2), padding=(3, 3), bias=False)

In [36]:
wandb_config = {
  "learning_rate": lr,
  "epochs": n_epochs,
  "batch_size": batch_size
}

wandb.init(config = wandb_config, project="Audio_classification", entity="_bro")

train(model, train_dataloader, val_dataloader, criterion, optimizer, device, n_epochs)

In [37]:
#train(model, train_dataloader, val_dataloader, criterion, optimizer, device, n_epochs)

##Check results

In [43]:
def check_result(model, val_dataloader, criterion, device="cuda:0", epoch = 0):

    model = model.eval()
    total_loss = 0
    num_batches = 0
    total_predictions = np.array([])
    total_labels = np.array([])

    with torch.no_grad():
        for images, labels in tqdm(val_dataloader, desc=f"Check results, epoch {epoch}", leave=False):
               
            images = images.to(device).float()
            labels = labels.to(device)
            predicted = model(images)

            loss = criterion(predicted, labels)
            accuracy = (predicted.argmax(1) == labels).float().mean()

            total_loss += loss.item()
            total_predictions = np.append(total_predictions, predicted.argmax(1).cpu().detach().numpy())
            total_labels = np.append(total_labels, labels.cpu().detach().numpy())
            num_batches += 1

        metrics = {"val_loss": total_loss / num_batches}
        metrics.update({"val_accuracy": (total_predictions == total_labels).mean()})
        print("val_loss: ", metrics["val_loss"])
        print("val_accuracy: ", metrics["val_accuracy"])


    return metrics["val_loss"], metrics["val_accuracy"]

In [44]:
model = model
model_link = "https://drive.google.com/file/d/1DXOOh-NsUBfiEKOEe90pEQZ-EP_T_NVc/view?usp=sharing"

#download model...

model.load_state_dict(torch.load(ROOT_MODEL + "/mel_denoising_loss_0.0301_epoch_9.pth"))
test_loss, test_accuracy = check_result(model, val_dataloader, criterion, device)

Check results, epoch 0:   0%|          | 0/125 [00:00<?, ?it/s]

val_loss:  0.09409412302821875
val_accuracy:  0.9705
