In [252]:
from utils.dataset import MagnaTagATune
from torch import nn
from typing import Union
from torch.optim.optimizer import Optimizer
from torch.nn import functional as F
import os, time, argparse, torch
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import numpy as np

In [253]:
DATA_PATH = os.path.join("..", "data", "MagnaTagATune", "samples")
TRAIN_LABELS_PATH = "../data/MagnaTagATune/annotations/train_labels.pkl"
VAL_LABELS_PATH = "../data/MagnaTagATune/annotations/val_labels.pkl"

train_dataset = MagnaTagATune(TRAIN_LABELS_PATH, DATA_PATH)
test_dataset = MagnaTagATune(VAL_LABELS_PATH, DATA_PATH)

Loading data from ../data/MagnaTagATune/annotations/train_labels.pkl...
Loading data from ../data/MagnaTagATune/annotations/val_labels.pkl...


In [254]:
print(f'Number of files: {len(train_dataset)}')
print("File name of file 0:")
print(train_dataset[0][0])
print(f'Samples of shape {train_dataset[0][1].shape}:')
print(train_dataset[0][1])
print(f'Labels of shape {train_dataset[0][2].shape}')
print(train_dataset[0][2])

Number of files: 16963
File name of file 0:
train/0/american_bach_soloists-j_s__bach__cantatas_volume_v-01-gleichwie_der_regen_und_schnee_vom_himmel_fallt_bwv_18_i_sinfonia-117-146.npy
Samples of shape torch.Size([10, 1, 34950]):
tensor([[[    0.,     0.,     0.,  ...,  -221.,  -498.,  -191.]],

        [[  209.,   366.,   494.,  ...,  -235.,  -283.,  -138.]],

        [[  -96.,  -388.,  -536.,  ...,   698.,   515.,   624.]],

        ...,

        [[-1510., -1214.,  -812.,  ...,  -416.,  -259.,   -37.]],

        [[  239.,   384.,   467.,  ..., -1524., -1423., -1371.]],

        [[-1308.,  -960.,  -854.,  ...,     0.,     0.,     0.]]])
Labels of shape torch.Size([50])
tensor([0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.])


In [255]:
train_dataset[0][1].shape

torch.Size([10, 1, 34950])

## Model Definition

In [256]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Conv1d(1, 1, 8, 256)
        self.initialise_layer(self.conv1)
        self.conv2 = nn.Conv1d(1, 32, 8, 1)
        self.initialise_layer(self.conv2)
        self.pool = nn.MaxPool1d(kernel_size=4)
        self.conv3 = nn.Conv1d(32, 32, 8, 1)
        self.initialise_layer(self.conv3)
        self.full1 = nn.Linear(192, 100)
        self.full2 = nn.Linear(100, 50)

    def forward(self, input: torch.Tensor) -> torch.Tensor:
        batch_size = input.shape[0]
        x = torch.flatten(input, 0, 1)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = F.relu(self.full1(x))
        x = F.sigmoid(self.full2(x))
        x = torch.reshape(x, (batch_size, 10, 50))
        x = torch.mean(x, dim=1)
        return x

    @staticmethod
    def initialise_layer(layer):
        if hasattr(layer, "bias"):
            nn.init.zeros_(layer.bias)
        if hasattr(layer, "weight"):
            nn.init.kaiming_normal_(layer.weight)

## Trainer

In [257]:
class Trainer:
    def __init__(
        self,
        model: nn.Module,
        train_loader: MagnaTagATune,
        val_loader: MagnaTagATune,
        criterion: nn.Module,
        optimizer: Optimizer,
        summary_writer,
        device: torch.device,
    ):
        self.model = model.to(device)
        self.device = device
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.summary_writer = summary_writer
        self.step = 0

    def train(
        self,
        epochs: int,
        val_frequency: int,
        print_frequency: int = 20,
        log_frequency: int = 5,
        start_epoch: int = 0
    ):
        self.model.train()
        for epoch in range(start_epoch, epochs):
            self.model.train()
            data_load_start_time = time.time()
            print("epoch started")
            for _, batch, labels in self.train_loader:
                print(batch.shape)
                batch = batch.to(self.device)
                labels = labels.to(self.device)
                data_load_end_time = time.time()

                logits = self.model.forward(batch)
                loss = self.criterion(logits, labels)

                loss.backward()

                self.optimizer.step()
                self.optimizer.zero_grad()

                with torch.no_grad():
                    preds = logits.argmax(-1)
                    print(preds.shape)
                    accuracy = compute_accuracy(labels.argmax(-1), preds)

                data_load_time = data_load_end_time - data_load_start_time
                step_time = time.time() - data_load_end_time
                if ((self.step + 1) % log_frequency) == 0:
                    self.log_metrics(epoch, accuracy, loss, data_load_time, step_time)
                if ((self.step + 1) % print_frequency) == 0:
                    self.print_metrics(epoch, accuracy, loss, data_load_time, step_time)

                self.step += 1
                data_load_start_time = time.time()

            # self.summary_writer.add_scalar("epoch", epoch, self.step)
            if ((epoch + 1) % val_frequency) == 0:
                self.validate()
                # self.validate() will put the model in validation mode,
                # so we have to switch back to train mode afterwards
                self.model.train()

    def print_metrics(self, epoch, accuracy, loss, data_load_time, step_time):
        epoch_step = self.step % len(self.train_loader)
        print(
                f"epoch: [{epoch}], "
                f"step: [{epoch_step}/{len(self.train_loader)}], "
                f"batch loss: {loss:.5f}, "
                f"batch accuracy: {accuracy * 100:2.2f}, "
                f"data load time: "
                f"{data_load_time:.5f}, "
                f"step time: {step_time:.5f}"
        )

    def log_metrics(self, epoch, accuracy, loss, data_load_time, step_time):
        self.summary_writer.add_scalar("epoch", epoch, self.step)
        self.summary_writer.add_scalars(
                "accuracy",
                {"train": accuracy},
                self.step
        )
        self.summary_writer.add_scalars(
                "loss",
                {"train": float(loss.item())},
                self.step
        )
        self.summary_writer.add_scalar(
                "time/data", data_load_time, self.step
        )
        self.summary_writer.add_scalar(
                "time/data", step_time, self.step
        )

    def validate(self):
        results = {"preds": [], "labels": []}
        total_loss = 0
        self.model.eval()

        # No need to track gradients for validation, we're not optimizing.
        with torch.no_grad():
            for _, batch, labels in self.val_loader:
                batch = batch.to(self.device)
                labels = labels.to(self.device)
                logits = self.model(batch[0])
                loss = self.criterion(logits, labels)
                total_loss += loss.item()
                preds = logits.argmax(dim=-1).cpu().numpy()
                results["preds"].extend(list(preds))
                results["labels"].extend(list(labels.cpu().numpy()))

        accuracy = compute_accuracy(
            np.array(results["labels"]), np.array(results["preds"])
        )
        average_loss = total_loss / len(self.val_loader)

        self.summary_writer.add_scalars(
                "accuracy",
                {"test": accuracy},
                self.step
        )
        self.summary_writer.add_scalars(
                "loss",
                {"test": average_loss},
                self.step
        )
        print(f"validation loss: {average_loss:.5f}, accuracy: {accuracy * 100:2.2f}")


def compute_accuracy(
    labels: Union[torch.Tensor, np.ndarray], preds: Union[torch.Tensor, np.ndarray]
) -> float:
    """
    Args:
        labels: ``(batch_size, class_count)`` tensor or array containing example labels
        preds: ``(batch_size, class_count)`` tensor or array containing model prediction
    """
    assert len(labels) == len(preds)
    return float((labels == preds).sum()) / len(labels)


def get_summary_writer_log_dir(args: argparse.Namespace) -> str:
    """Get a unique directory that hasn't been logged to before for use with a TB
    SummaryWriter.

    Args:
        args: CLI Arguments

    Returns:
        Subdirectory of log_dir with unique subdirectory name to prevent multiple runs
        from getting logged to the same TB log directory (which you can't easily
        untangle in TB).
    """
    tb_log_dir_prefix = f'CNN_bs={args.batch_size}_lr={args.learning_rate}_run_'
    i = 0
    while i < 1000:
        tb_log_dir = args.log_dir / (tb_log_dir_prefix + str(i))
        if not tb_log_dir.exists():
            return str(tb_log_dir)
        i += 1
    return str(tb_log_dir)

## Running the Model

In [258]:
batch_size = 10
worker_count = 1
learning_rate = 0.1
epochs = 2
val_frequency = 5
print_frequency = 2
log_frequency = 2
log_dir = os.path.join(".", "logs")

In [259]:
def get_summary_writer_log_dir() -> str:
    """Get a unique directory that hasn't been logged to before for use with a TB
    SummaryWriter.

    Args:
        args: CLI Arguments

    Returns:
        Subdirectory of log_dir with unique subdirectory name to prevent multiple runs
        from getting logged to the same TB log directory (which you can't easily
        untangle in TB).
    """
    tb_log_dir_prefix = f'CNN_bs={batch_size}_lr={learning_rate}_run_'
    i = 0
    while i < 1000:
        tb_log_dir = os.path.join(log_dir, (tb_log_dir_prefix + str(i)))
        if os.path.exists(tb_log_dir):
            return str(tb_log_dir)
        i += 1
    return str(tb_log_dir)

In [260]:
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    shuffle=True,
    batch_size=batch_size,
    pin_memory=True,
    num_workers=worker_count,
)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    shuffle=False,
    batch_size=batch_size,
    num_workers=worker_count,
    pin_memory=True,
)

if torch.cuda.is_available():
    DEVICE = torch.device("cuda")
else:
    DEVICE = torch.device("cpu")

model = Model()

criterion = nn.BCELoss()

optimizer = torch.optim.SGD(model.parameters(), learning_rate)

log_dir = get_summary_writer_log_dir()
print(f"Writing logs to {log_dir}")
summary_writer = SummaryWriter(
        str(log_dir),
        flush_secs=5
)

trainer = Trainer(
    model, train_loader, test_loader, criterion, optimizer, summary_writer, DEVICE
)

trainer.train(
    epochs,
    val_frequency,
    print_frequency=print_frequency,
    log_frequency=log_frequency,
)


Writing logs to ./logs/CNN_bs=10_lr=0.1_run_999
epoch started
torch.Size([10, 10, 1, 34950])


torch.Size([10])
torch.Size([10, 10, 1, 34950])
torch.Size([10])
epoch: [0], step: [1/1697], batch loss: 39.26360, batch accuracy: 20.00, data load time: 0.00607, step time: 0.09984
torch.Size([10, 10, 1, 34950])
torch.Size([10])
torch.Size([10, 10, 1, 34950])
torch.Size([10])
epoch: [0], step: [3/1697], batch loss: 34.81747, batch accuracy: 10.00, data load time: 0.00716, step time: 0.07688
torch.Size([10, 10, 1, 34950])
torch.Size([10])
torch.Size([10, 10, 1, 34950])
torch.Size([10])
epoch: [0], step: [5/1697], batch loss: 36.16635, batch accuracy: 10.00, data load time: 0.00702, step time: 0.07002
torch.Size([10, 10, 1, 34950])
torch.Size([10])
torch.Size([10, 10, 1, 34950])
torch.Size([10])
epoch: [0], step: [7/1697], batch loss: 37.60959, batch accuracy: 20.00, data load time: 0.00672, step time: 0.06633
torch.Size([10, 10, 1, 34950])
torch.Size([10])
torch.Size([10, 10, 1, 34950])
torch.Size([10])
epoch: [0], step: [9/1697], batch loss: 37.20854, batch accuracy: 0.00, data load t