In [None]:
from contextlib import suppress
from zipfile import ZipFile, BadZipFile
import os

#set up Colab if detected
with suppress(ModuleNotFoundError):
    from google.colab import drive
    %pip install neptune > /dev/null
    try: #assumes repo zipped (zip -r spectrofy.zip Spectrofy -x \*.git* \*ipynb* \*.mp3 \*.pth) and uploaded
        with ZipFile('spectrofy.zip') as zip:
            zip.extractall()
        os.chdir('Spectrofy')
    except (FileNotFoundError, BadZipFile): #as fallback assumes repo uploaded to Drive (training will be slower)
        drive.mount('/content/drive')
        os.chdir('drive/MyDrive/Spectrofy')

In [None]:
import neptune
from getpass import getpass

#set up Neptune
log = True #enables/disables logging
if log:
    run = neptune.init_run( #the warning about interactive sessions can be ignored
        api_token = getpass('Enter your Neptune API token: '),
        project = input('Enter your Neptune project name: '))

In [None]:
from utils.dataset import SpectrogramsDataset
from utils.preprocessing import transform
from torch.utils.data import random_split
from torch import Generator

#split dataset deterministically
spec_dir = os.path.join('data', 'spec')
features_path = os.path.join('data', 'features.csv')
dataset = SpectrogramsDataset(spec_dir, features_path, transform, target='danceability')
trainset, valset, testset = random_split(dataset, [0.8, 0.1, 0.1], Generator().manual_seed(42))
len(trainset), len(valset), len(testset)

In [None]:
from torch.utils.data import DataLoader

#configure batches
loader_args = {
    'batch_size': 32, #TODO tune with Neptune
    'num_workers': 2,
    'pin_memory': True,
    'drop_last': True}
train_loader = DataLoader(trainset, shuffle=True, persistent_workers=True, **loader_args)
val_loader = DataLoader(valset, persistent_workers=True, **loader_args)
test_loader = DataLoader(testset, **loader_args)
_, channels, height, width = next(iter(train_loader))[0].shape

#log dataset
if log:
    run['dataset'] = {
        'target': dataset.target,
        'train-validation-test': (
            f'{len(trainset)}-' #whole because reshuffled at each epoch
            f"{loader_args['batch_size'] * len(val_loader)}-"
            f"{loader_args['batch_size'] * len(test_loader)}"),
        'channels-height-width': f'{channels}-{height}-{width}'}
channels, height, width

In [None]:
import torch
from utils.models import DanceabilityModel
from torch import nn, optim
import inspect
import importlib; from utils import models; importlib.reload(models) #for debugging purposes

#configure training
model_args = {}
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DanceabilityModel(**model_args).to(device) #TODO tune with Neptune
criterion = type('MSELoss', (nn.MSELoss,), {'__str__': lambda self : 'MSE'})()
optimizer = optim.Adam(model.parameters(), lr=1e-3) #TODO tune lr with Neptune

#log model
if log:
    run['model'] = {
        'architecture': inspect.getsource(DanceabilityModel),
        'arguments': model_args}
device

In [None]:
from copy import deepcopy
import numpy as np

train_losses = []
val_losses = []
best_params = None
for epoch in range(999):

    #train
    model.train()
    running_loss = 0
    for inputs, labels in train_loader:
        outputs = model(inputs.to(device))
        loss = criterion(outputs, labels.to(device)) #estimates loss of batch with current parameters
        optimizer.zero_grad() #resets gradient to avoid accumulation
        loss.backward() #computes gradient of loss w.r.t. parameters
        optimizer.step() #updates parameters to reduce loss
        running_loss += loss.item()
    train_losses.append(running_loss/len(train_loader))
    print(f'[epoch {epoch:03}] train loss: {train_losses[-1]:.4f}, ', end='')

    #evaluate on validation set
    model.eval()
    running_loss = 0
    with torch.inference_mode(): #to save memory and computations
        for inputs, labels in val_loader:
            outputs = model(inputs.to(device))
            running_loss += criterion(outputs, labels.to(device)).item()
    val_losses.append(running_loss/len(val_loader))

    #back up best parameters
    end = '\n'
    if val_losses[-1] <= min(val_losses):
        best_params = deepcopy(model.state_dict())
        end = ' (new best)\n'
    print(f'val loss: {val_losses[-1]:.4f}', end=end)

    #stop early
    patience = 3
    with suppress(IndexError):
        if all(np.array(val_losses[-patience:]) > val_losses[-patience-1]):
            break

In [None]:
import matplotlib.pyplot as plt

#plot training history
plt.plot(train_losses, label=f'Train {criterion}')
plt.plot(val_losses, label=f'Validation {criterion}')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Mean loss')
figure = plt.gcf() #will be logged

In [None]:
import torch.nn.functional as F

#evaluate on test set
model.load_state_dict(best_params)
assert not model.training
running_loss = 0
with torch.inference_mode():
    for inputs, labels in test_loader:
        outputs = model(inputs.to(device))
        running_loss += F.l1_loss(outputs, labels.to(device)).item() #MAE for interpretability
test_mae = round(running_loss/len(test_loader), ndigits=4)
print(f'Mean absolute error on test set: {test_mae}')

#log training
if log:
    run['training'] = {
        'batch_size': loader_args['batch_size'],
        'optimizer': str(optimizer),
        'test_mae': test_mae}
    run['training/history'].upload(figure)

In [None]:
#save best parameters
param_dir = os.path.join('utils', 'model_params')
os.makedirs(param_dir, exist_ok=True)
param_path = os.path.join(param_dir, 'danceability.pth')
torch.save(best_params, param_path)

In [None]:
#test model
model = DanceabilityModel(**model_args)
model.load_state_dict(torch.load(param_path, map_location='cpu'))
model.eval()
spec, true = testset[0]
pred = model(spec).item()
print(f'[danceability] true: {true:.2f}, predicted: {pred:.2f}')

In [None]:
#stop logging
if log:
    run.stop()