In [1]:
import torch
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torchvision import datasets
from torch import nn
import wandb
from PIL import Image
import random
import pandas as pd
import numpy as np
import os
import torchaudio
import librosa
from torchaudio import transforms
from torch.utils.data import DataLoader, Dataset,TensorDataset
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import f1_score

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
wandb.login()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mgustavoreis[0m ([33mtropadochatgpt[0m). Use [1m`wandb login --relogin`[0m to force relogin


True

In [3]:
# Get cpu, gpu or mps device for training.
DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {DEVICE} device")

NUM_WORKERS = 0

Using cuda device


In [4]:

class CustomAudioDataset(Dataset):
    def __init__(self, annotations_file,path_idx,label_idx, base_dir,target_sample_rate ,transform=None, target_transform=None, labe2id = None):
        self.files = pd.read_csv(annotations_file)
        self.path_idx = path_idx
        self.label_idx = label_idx
        self.base_dir = base_dir
        self.sample_rate = target_sample_rate
        self.transform = transform
        self.target_transform = target_transform
        self.label2id = {'cat':0,'dog':1}

    def get_db_spectogram(self,waveform):
        transform = torchaudio.transforms.Spectrogram(n_fft=600)
        masking = torchaudio.transforms.TimeMasking(time_mask_param=80,p=0.25)
        spectrogram = transform(waveform)
        spectrogram = transforms.AmplitudeToDB()(spectrogram)
        spectrogram = masking(spectrogram)
        return spectrogram

    def audio_padding(self, waveform,sr, max_s):
        max_len = max_s*sr
        n_rows, wav_len = waveform.shape
        if (wav_len/sr) > max_s:
            waveform = waveform[:,:max_len] # trucating
        else:
            #complete the edges of audio with zeros
            pad_begin = random.randint(0,(max_len - wav_len))
            pad_end = max_len -wav_len- pad_begin
            begin_zeros = torch.zeros(n_rows,pad_begin)
            end_zeros = torch.zeros(n_rows,pad_end)
            waveform = torch.cat((begin_zeros, waveform,end_zeros),1)
        
        return waveform
    
    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        audio_path = os.path.join(self.base_dir, self.files.iloc[idx, self.path_idx])
        waveform, sample_rate = torchaudio.load(audio_path, normalize=True)
        if sample_rate != self.sample_rate:
            transform = transforms.Resample(sample_rate, self.sample_rate)
            waveform = transform(waveform)

        waveform = self.audio_padding(waveform,sample_rate,5)
        spec = self.get_db_spectogram(waveform)

        label = self.files.iloc[idx, self.label_idx]
        label = self.label2id[label]
        if self.transform:
            waveform = self.transform(waveform)
        if self.target_transform:
            label = self.target_transform(label)
        return spec, label
   
    


    
class Data:
    def __init__(self, batch_size,dataset_train,dataset_test):
        self.batch_size = batch_size
        self.training_data = dataset_train
        self.test_data = dataset_test
    
    def get_loader(self, training: bool):
        if training:
            dataloader = DataLoader(self.training_data,batch_size=self.batch_size, shuffle=True)
        else:
            dataloader = DataLoader(self.test_data,batch_size=self.batch_size, shuffle=False)
        return dataloader


In [5]:
class Evaluator:
    def __init__(self):
        self.loss_fn = nn.CrossEntropyLoss()

    def get_loss(self, y, y_hat):
        return self.loss_fn(y_hat, y)

In [6]:
print(DEVICE)

cuda


In [7]:
# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_layer = nn.Sequential(
            nn.Conv2d(1,128, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128,64, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64,32, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(32),
            nn.Conv2d(32,16, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)

        )
        
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(16*20*18,256),
            nn.ReLU(),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.Linear(32, 4),
            nn.ReLU(),
            nn.Linear(4, 2)
        )

    def forward(self, x):
        #print(x.shape)
        x = self.conv_layer(x)
        print(x.shape)
        x = self.flatten(x)
        #print(x.shape)
        x = self.linear_relu_stack(x)
        #print(x.shape)
        return x

   
    
class Learner:
    def __init__(self):
        self.model = NeuralNetwork()
        self.model.to(DEVICE)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=2e-6)

    def predict(self, x):
        return self.model(x)

    def update(self, loss):
        # Backpropagation
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [8]:
import time

class Trainer:
    def __init__(self, data: Data, learner: Learner, evaluator: Evaluator):
        self.data = data
        self.learner = learner
        self.evaluator = evaluator

    def one_epoch(self, training: bool):
        self.learner.model.train(training)
        dataloader = self.data.get_loader(training)
        test_loss, correct = 0, 0
        train_loss = 0
        num_batches = len(dataloader)
        size = len(dataloader.dataset)
        for batch_idx, (X, y) in enumerate(dataloader):
            X, y = X.to(DEVICE), y.to(DEVICE)
            y_hat = self.learner.predict(X)
            loss = self.evaluator.get_loss(y, y_hat)
            if training:
                self.learner.update(loss)
                train_loss += loss.item()
                if batch_idx % 100 == 0:
                    loss, current = loss.item(), (batch_idx + 1) * len(X)
                    print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
                
            else:
                test_loss += loss.item()
                correct += (y_hat.argmax(1) == y).type(torch.float).sum().item()
            
        if not training:
            test_loss /= num_batches
            correct /= size
            test_acc = 100*correct
            print(f"Test Error: \n Accuracy: {test_acc:>0.1f}%, Avg loss: {test_loss:>8f} \n")
            return test_loss, test_acc
        else:
            train_loss /= num_batches
            return train_loss

    def run(self, n_epochs: int):
        wandb.init(project="CatXDogs", entity="gustavoreis")
        for t in range(n_epochs):
            print(f"Epoch {t+1}\n-------------------------------")
            #start = time.time()
            train_loss = self.one_epoch(training=True)
            #end = time.time()
            #print(f"time: {end - start:.2f}s")
            with torch.no_grad():
                test_loss, test_acc = self.one_epoch(training=False)
            wandb.log({"Loss/train per epoch": train_loss, "Loss/test per epoch": test_loss, "Accuracy/test": test_acc})
        print("Done!")
        wandb.finish()

In [9]:
dataset_train = CustomAudioDataset('dataset/train.csv',base_dir ='',path_idx=0,label_idx=1,target_sample_rate=16000)
dataset_test = CustomAudioDataset('dataset/test.csv',base_dir = '',path_idx=0,label_idx=1,target_sample_rate=16000)
data = Data(2, dataset_train,dataset_test)

In [10]:
learner = Learner()
evaluator = Evaluator()
trainer = Trainer(data, learner, evaluator)

In [11]:
trainer.run(150)

[34m[1mwandb[0m: Currently logged in as: [33mgustavoreis[0m. Use [1m`wandb login --relogin`[0m to force relogin


Epoch 1
-------------------------------
torch.Size([2, 16, 20, 18])
loss: 0.939274  [    2/  207]
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.Size([2, 16, 20, 18])
torch.

0,1
Accuracy/test,▁▁▃▄▅▆▇▇▇▇▇█▇█▇▇▇██▇▇███▇████▇█▇██▇█████
Loss/test per epoch,██▇▆▆▅▄▄▃▃▃▃▂▂▂▂▂▂▁▂▂▁▁▁▁▁▁▁▁▂▁▁▁▁▁▁▁▁▁▁
Loss/train per epoch,██▇▆▆▅▄▄▄▃▃▃▃▃▃▂▂▂▂▂▂▂▂▂▂▂▂▂▁▂▂▁▁▁▁▁▁▁▁▁

0,1
Accuracy/test,94.28571
Loss/test per epoch,0.27496
Loss/train per epoch,0.21374
