In [1]:
import os
import yaml
#
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
#
import torchaudio
import torchaudio.transforms as T
import torchaudio.functional as F
#
import torchvision
from torchvision.io import read_image
from torchvision.transforms import ConvertImageDtype
#
import matplotlib.pyplot as plt
#
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

In [2]:
print(torch.__version__)
print(torchaudio.__version__)
print(torchvision.__version__)

2.2.2+cu121
2.2.2+cu121
0.17.2+cu121


In [3]:
raw_path = 'data/raw/'
data_path = 'data/processed_spectrograms/'

In [4]:
labels = yaml.safe_load(open('labels.yaml'))

In [5]:
X, y = zip(*labels.items())
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0, shuffle=False)

In [6]:
def training(model, optimizer, criterion, dataloader):
    loss = 0
    y_true, y_score = [], []
    #
    model.train()
    for X, Y in dataloader:
        X, Y = X.to(device), Y.to(device, dtype=torch.float32)
        #
        output = model(X)
        loss = criterion(output.view(1), Y)
        #
        loss += loss.item()
        y_true.append(Y.item())
        y_score.append(torch.sigmoid(output).item())
        #
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return loss, roc_auc_score(y_true, y_score)


def testing(model, optimizer, criterion, dataloader):
    loss = 0
    y_true, y_score = [], []
    #
    model.eval()
    with torch.inference_mode():
        for X, Y in dataloader:
            X, Y = X.to(device), Y.to(device, dtype=torch.float32)
            #
            output = model(X)
            loss = criterion(output.view(1), Y)
            #
            loss += loss.item()
            y_true.append(Y.item())
            y_score.append(torch.sigmoid(output).item())
    #
    return loss, roc_auc_score(y_true, y_score)

# Modelling using images

In [8]:
device = 'cpu'

In [9]:
class Podcast(Dataset):

    def __init__(self, X, y):
        self.x = X
        self.y = y
        self.cid = ConvertImageDtype(torch.float32)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        label = self.y[idx]
        episode = self.x[idx]
        image = read_image(f'{data_path}/{episode}.png')
        return self.cid(image), int(label)

In [10]:
train = Podcast(X_train, y_train)
train_dataloader = DataLoader(train, shuffle=True)

In [11]:
test = Podcast(X_test, y_test)
test_dataloader = DataLoader(test)

In [18]:
class CNNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=4, kernel_size=(6, 3)),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(6, 3))
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=4, kernel_size=(12, 6)),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(12, 6))
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=4, kernel_size=(18, 9)),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(output_size=(64, 8))
        )
        #
        self.flatten = nn.Flatten()
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.33)
        self.linear1 = nn.Linear(2048, 128)
        self.linear2 = nn.Linear(128, 1)

    def forward(self, x):
        shapes = [x.shape]
        # print()
        x = self.layer1(x)
        shapes.append(x.shape)
        x = self.layer2(x)
        shapes.append(x.shape)
        x = self.layer3(x)
        # shapes.append(x.shape)
        #
        x = self.flatten(x)
        # print(x.shape)
        x = self.relu(self.linear1(x))
#         x = self.dropout(x, training=self.training)
        # print(x.shape, self.training)
        #
        print(shapes)
        return self.linear2(x)


model = CNNet().to(device, dtype=torch.float32)

In [19]:
model

CNNet(
  (layer1): Sequential(
    (0): Conv2d(4, 4, kernel_size=(6, 3), stride=(1, 1))
    (1): ReLU()
    (2): AvgPool2d(kernel_size=(6, 3), stride=(6, 3), padding=0)
  )
  (layer2): Sequential(
    (0): Conv2d(4, 4, kernel_size=(12, 6), stride=(1, 1))
    (1): ReLU()
    (2): AvgPool2d(kernel_size=(12, 6), stride=(12, 6), padding=0)
  )
  (layer3): Sequential(
    (0): Conv2d(4, 4, kernel_size=(18, 9), stride=(1, 1))
    (1): ReLU()
    (2): AdaptiveAvgPool2d(output_size=(64, 8))
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (relu): ReLU()
  (dropout): Dropout(p=0.33, inplace=False)
  (linear1): Linear(in_features=2048, out_features=128, bias=True)
  (linear2): Linear(in_features=128, out_features=1, bias=True)
)

In [20]:
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [22]:
# for epoch in range(10):
training(model, optimizer, criterion, train_dataloader)

[torch.Size([1, 4, 22051, 10327]), torch.Size([1, 4, 3674, 3441]), torch.Size([1, 4, 305, 572])]
[torch.Size([1, 4, 24001, 5436]), torch.Size([1, 4, 3999, 1811]), torch.Size([1, 4, 332, 301])]
[torch.Size([1, 4, 22051, 12836]), torch.Size([1, 4, 3674, 4278]), torch.Size([1, 4, 305, 712])]
[torch.Size([1, 4, 24001, 435]), torch.Size([1, 4, 3999, 144]), torch.Size([1, 4, 332, 23])]
[torch.Size([1, 4, 22051, 7198]), torch.Size([1, 4, 3674, 2398]), torch.Size([1, 4, 305, 398])]
[torch.Size([1, 4, 22051, 5888]), torch.Size([1, 4, 3674, 1962]), torch.Size([1, 4, 305, 326])]
[torch.Size([1, 4, 24001, 419]), torch.Size([1, 4, 3999, 139]), torch.Size([1, 4, 332, 22])]
[torch.Size([1, 4, 24001, 643]), torch.Size([1, 4, 3999, 213]), torch.Size([1, 4, 332, 34])]
[torch.Size([1, 4, 22051, 623]), torch.Size([1, 4, 3674, 207]), torch.Size([1, 4, 305, 33])]
[torch.Size([1, 4, 22051, 4021]), torch.Size([1, 4, 3674, 1339]), torch.Size([1, 4, 305, 222])]
[torch.Size([1, 4, 22051, 8783]), torch.Size([1, 4

(tensor(1.2643, grad_fn=<AddBackward0>), 0.5121753246753247)

In [23]:
# for epoch in range(10):
testing(model, optimizer, criterion, test_dataloader)

[torch.Size([1, 4, 22051, 6399]), torch.Size([1, 4, 3674, 2132]), torch.Size([1, 4, 305, 354])]
[torch.Size([1, 4, 24001, 694]), torch.Size([1, 4, 3999, 230]), torch.Size([1, 4, 332, 37])]
[torch.Size([1, 4, 22051, 8261]), torch.Size([1, 4, 3674, 2753]), torch.Size([1, 4, 305, 458])]
[torch.Size([1, 4, 22051, 431]), torch.Size([1, 4, 3674, 143]), torch.Size([1, 4, 305, 23])]
[torch.Size([1, 4, 22051, 9438]), torch.Size([1, 4, 3674, 3145]), torch.Size([1, 4, 305, 523])]
[torch.Size([1, 4, 22051, 743]), torch.Size([1, 4, 3674, 247]), torch.Size([1, 4, 305, 40])]
[torch.Size([1, 4, 22051, 8436]), torch.Size([1, 4, 3674, 2811]), torch.Size([1, 4, 305, 467])]
[torch.Size([1, 4, 22051, 2856]), torch.Size([1, 4, 3674, 951]), torch.Size([1, 4, 305, 157])]
[torch.Size([1, 4, 22051, 8392]), torch.Size([1, 4, 3674, 2796]), torch.Size([1, 4, 305, 465])]
[torch.Size([1, 4, 22051, 872]), torch.Size([1, 4, 3674, 290]), torch.Size([1, 4, 305, 47])]
[torch.Size([1, 4, 22051, 7519]), torch.Size([1, 4, 3

(tensor(1.2617), 0.5)

In [24]:
for epoch in range(20):
    train_loss, train_auc = training(model, optimizer, criterion, train_dataloader)
    test_loss, test_auc = testing(model, optimizer, criterion, test_dataloader)
    #
    print(f'Epoch: {epoch} | Training loss: {train_loss:.4f} | Testing loss: {test_loss:.4f} | Training AUC: {train_auc:.4f} | Testing AUC: {test_auc:.4f}')

[torch.Size([1, 4, 22051, 376]), torch.Size([1, 4, 3674, 124]), torch.Size([1, 4, 305, 19])]
[torch.Size([1, 4, 22051, 5070]), torch.Size([1, 4, 3674, 1689]), torch.Size([1, 4, 305, 280])]
[torch.Size([1, 4, 22051, 10717]), torch.Size([1, 4, 3674, 3571]), torch.Size([1, 4, 305, 594])]
[torch.Size([1, 4, 22051, 14776]), torch.Size([1, 4, 3674, 4924]), torch.Size([1, 4, 305, 819])]
[torch.Size([1, 4, 22051, 633]), torch.Size([1, 4, 3674, 210]), torch.Size([1, 4, 305, 34])]
[torch.Size([1, 4, 22051, 5888]), torch.Size([1, 4, 3674, 1962]), torch.Size([1, 4, 305, 326])]
[torch.Size([1, 4, 22051, 9611]), torch.Size([1, 4, 3674, 3203]), torch.Size([1, 4, 305, 533])]
[torch.Size([1, 4, 24001, 6543]), torch.Size([1, 4, 3999, 2180]), torch.Size([1, 4, 332, 362])]
[torch.Size([1, 4, 22051, 6260]), torch.Size([1, 4, 3674, 2086]), torch.Size([1, 4, 305, 346])]
[torch.Size([1, 4, 22051, 7198]), torch.Size([1, 4, 3674, 2398]), torch.Size([1, 4, 305, 398])]
[torch.Size([1, 4, 22051, 10032]), torch.Siz

KeyboardInterrupt: 

# Modelling `as is`

In [7]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [None]:
from utils.spectrograms import get_spectrogram

In [14]:
class Podcast(Dataset):

    def __init__(self, X, y):
        self.x = X
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        label = self.y[idx]
        episode = self.x[idx]
        #
        waveform, sample_rate = torchaudio.load(f'{raw_path}/{episode}.mp3')
        # print(sample_rate, episode)
        #
        return get_spectrogram(waveform, sample_rate), label

In [15]:
train = Podcast(X_train, y_train)
train_dataloader = DataLoader(train, shuffle=True)

In [16]:
test = Podcast(X_test, y_test)
test_dataloader = DataLoader(test)

In [17]:
class CNNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(3, 3)),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(3, 3))
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(6, 6)),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(6, 6))
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(9, 9)),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(output_size=(256, 16))
        )
        #
        self.flatten = nn.Flatten()
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.33)
        self.linear1 = nn.Linear(4096, 128)
        self.linear2 = nn.Linear(128, 1)

    def forward(self, x):
        print('.', end='')
        shapes = [x.shape]
        # print()
        x = self.layer1(x)
        shapes.append(x.shape)
        x = self.layer2(x)
        shapes.append(x.shape)
        x = self.layer3(x)
        #
        x = self.flatten(x)
        # print(x.shape)
        x = self.relu(self.linear1(x))
        # x = self.dropout(x, training=self.training)
        # print(shapes, self.training)
        #
        return self.linear2(x)


model = CNNet().to(device, dtype=torch.float32)

In [None]:
if os.exists():
    model.load_state_dict(
        torch.load('models/04_spectrogram.pt')
    )

In [18]:
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

In [None]:
for epoch in range(100):
    train_loss, train_auc = training(model, optimizer, criterion, train_dataloader)
    test_loss, test_auc = testing(model, optimizer, criterion, test_dataloader)
    #
    print(f'\nEpoch: {epoch} | Training loss: {train_loss:.4f} | Testing loss: {test_loss:.4f} | Training AUC: {train_auc:.4f} | Testing AUC: {test_auc:.4f}')
    #
    torch.save(
        model.state_dict(),
        'models/04_spectrogram.pt'
    )

........................................................................................................
Epoch: 0 | Training loss: 2.5995 | Testing loss: 1.0658 | Training AUC: 0.3304 | Testing AUC: 0.8647
.......................................

In [None]:
testing(model, optimizer, criterion, test_dataloader)