In [1]:
import torch
import torchaudio
import torchaudio.functional as F
import torchaudio.transforms as T
max_audio_len = 150000

In [2]:
import os
from torch.utils.data import Dataset
import numpy as np

class PetAudio(Dataset):

    def __init__(self, cat_dir, dog_dir):
        self.cat_dir = cat_dir
        self.dog_dir = dog_dir
        self.file_names = []
        self.classes = []
        for filename in os.listdir(cat_dir):
            self.file_names.append(filename)
            self.classes.append(float(0))

        for filename in os.listdir(dog_dir):
            self.file_names.append(filename)
            self.classes.append(float(1))

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        label = self.classes[idx]
        if(label == 0):
            ad = self.cat_dir
        else:
            ad = self.dog_dir
        audio_path = os.path.join(ad, self.file_names[idx])
        signal, _ = torchaudio.load(audio_path)
        signal = signal.numpy()
        signal = np.add.reduceat(signal, np.arange(len(signal), step=2))
        signal = torch.from_numpy(signal)
        signal = torch.nn.functional.pad(signal, (0, max_audio_len - signal.shape[1]))
        return signal, label 

In [3]:
training_data = PetAudio("cats_dogs/train/cat/", "cats_dogs/train/dog/")
test_data = PetAudio("cats_dogs/test/cat/", "cats_dogs/test/dog/")

from torch.utils.data import DataLoader

batch_size = 16
train_dataloader = DataLoader(training_data, batch_size=training_data.__len__(), shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=test_data.__len__(), shuffle=False)


In [4]:
max_len = 0
for x in range(training_data.__len__()):
    item, _ = training_data.__getitem__(x)
    max_len = max(max_len, item.shape[1])

print(max_len)

150000


In [5]:
import math
from typing import Tuple

import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

In [6]:
class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.embed_artif = nn.Linear(max_audio_len, max_audio_len//150)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, 1)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor) -> Tensor:
        """
        Args:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
        # src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.embed_artif(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src)
        output = self.decoder(output)
        # output = torch.sigmoid(output)
        return output


def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [7]:
ntokens = 1  # size of vocabulary
emsize = max_audio_len//150  # embedding dimension
d_hid = 200  # dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 3  # number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 2  # number of heads in nn.MultiheadAttention
dropout = 0.3  # dropout probability
device = 'cuda'
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)

In [8]:
import copy
import time

criterion = nn.BCEWithLogitsLoss()
lr = 5e-2  # learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=1)
bptt = batch_size

def train(model: nn.Module) -> None:
    model.train()
    total_loss = 0
    log_interval = 20
    start_time = time.time()
    batch = 0

    for xb, yb in train_dataloader:
        xb, yb = xb.to(device), yb.type(torch.FloatTensor).to(device)
        batch+=1
        pred = model(xb).squeeze()
        loss = criterion(pred, yb)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()
        

        total_loss += loss.item()
        if(batch%log_interval == 0 and batch > 0):
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / (log_interval*xb.shape[0])
            print(f'| epoch {epoch:3d} | {batch:5d} batches | '
                  f'lr {lr:02.5f} | ms/batch {ms_per_batch:5.2f} | '
                  f'loss {cur_loss:5.2f}')
            total_loss = 0
            start_time = time.time()

def evaluate(model: nn.Module, dataloader) -> float:
    model.eval()
    total_loss = 0
    correct = 0
    count = 0
    with torch.no_grad():
        for xb, yb in dataloader:
            xb, yb = xb.to(device), yb.type(torch.FloatTensor).to(device)
            pred = model(xb).squeeze()
            correct += (int)(torch.sum(1 - torch.abs(torch.round(torch.sigmoid(pred) - yb))))
            count += xb.shape[0]
            # print(xb.shape[0])
            total_loss += xb.shape[0] * criterion(pred, yb).item()
    return total_loss / count, correct, count

In [9]:
epochs = 1000
best_val_loss = 999999

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(model)
    val_loss, correct, count = evaluate(model, test_dataloader)
    train_loss, tc, tcc = evaluate(model, train_dataloader)
    elapsed = time.time() - epoch_start_time
    print('-' * 89)
    print(f'| epoch {epoch:3d} | time: {elapsed:5.2f}s | '
          f'valid loss {val_loss:5.4f} | train loss {train_loss:5.4f} | acc: {correct}/{count} = {(100.0*correct)/(1.0*count):5.3f}% | train acc : {tc}/{tcc} = {(100.0*tc)/(1.0*tcc):5.3f}%')
    print('-' * 89)

    scheduler.step()

-----------------------------------------------------------------------------------------
| epoch   1 | time:  2.65s | valid loss 0.9259 | train loss 0.7270 | acc: 35/67 = 52.239% | train acc : 126/210 = 60.000%
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
| epoch   2 | time:  1.03s | valid loss 0.9469 | train loss 0.6460 | acc: 36/67 = 53.731% | train acc : 145/210 = 69.048%
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
| epoch   3 | time:  0.99s | valid loss 0.9314 | train loss 0.3986 | acc: 35/67 = 52.239% | train acc : 172/210 = 81.905%
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
| ep

KeyboardInterrupt: 