# Beat SOTA

In [None]:
# IMPORTS

import os
import time

import pandas as pd
import numpy as np

import madmom
from madmom.utils import search_files

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset as Dataset
import torch.optim as optim

from sklearn.metrics import accuracy_score, precision_score, recall_score

In [None]:
# GLOBAL VARIABLES

# random seed
SEED = 1

# cuda configuration
DISABLE_CUDA = False
USE_CUDA = not DISABLE_CUDA and torch.cuda.is_available()
DEVICE = torch.device("cuda:0" if USE_CUDA else "cpu")
print("CURRENT DEVICE:", DEVICE)

# paths
CURRENT_PATH = os.getcwd()

MODEL_NAME = 'beat_sota'
MODEL_PATH = os.path.join(CURRENT_PATH, 'models')
if not os.path.exists(MODEL_PATH):
    os.makedirs(MODEL_PATH)  
    
FEATURE_EXT = '.feat.npy'
FEATURE_PATH = os.path.join(CURRENT_PATH, 'features')
ANNOTATION_EXT = '.beats'
ANNOTATION_PATH = '../../../datasets/beat_boeck'

In [None]:
# COMMAND LINE SUPPORT

# TODO:

VERBOSE = True # args.verbose

if VERBOSE:
    print('\n---- EXECUTION STARTED ----\n')
    # print('Command line arguments:\n\n', args, '\n')

In [None]:
# LOAD FEATURES

feat_paths = search_files(FEATURE_PATH, FEATURE_EXT)
anno_paths = search_files(ANNOTATION_PATH, ANNOTATION_EXT)

features = [np.load(p) for p in feat_paths]
annotations = [madmom.io.load_beats(p) for p in anno_paths]

if VERBOSE:
    print(len(features), 'feature spectrogram files loaded')
    print(len(annotations), 'feature annotation files loaded')
  
# Conacatenate spectrograms along the time axis
# features = np.concatenate(features, axis=1)
# print(features.shape)

In [None]:
# NETWORK PARAMETERS

# depth
in_size = 1
h1_size = 16
h2_size = 32

# kernel size
k_conv_size = 5
k_pool_size = 2

# fully connected parameters
fc_size = 512
out_size = 10

# number of epochs
num_epochs = 10

# learning rate
lr = 0.001

In [None]:
# FEATURES

# import data
mnist_train = pd.read_csv('../../../datasets/mnist_csv/mnist_train.csv')
mnist_test = pd.read_csv('../../../datasets/mnist_csv/mnist_test.csv')

# prepare data
train = mnist_train.dropna()
train_feat = mnist_train.drop('label', axis=1)
train_target = mnist_train['label']

test = mnist_test.dropna()
test_feat = mnist_test.drop('label', axis=1)
test_target = mnist_test['label']

# convert to tensors
train_f = torch.tensor(train_feat.values, dtype=torch.float)
train_t = torch.tensor(train_target.values, dtype=torch.long)
test_f = torch.tensor(test_feat.values, dtype=torch.float)
test_t = torch.tensor(test_target.values, dtype=torch.long)
train_f = train_f.reshape(-1,1,28,28)
test_f = test_f.reshape(-1,1,28,28)

valid_f = train_f[:10000].numpy()
valid_t = train_t[:10000].numpy()
train_f = train_f[10000:].numpy()
train_t = train_t[10000:].numpy()

print(train_f.shape)
print(train_t.shape)
print(valid_f.shape)
print(valid_t.shape)
print(test_f.shape)
print(test_t.shape)

In [None]:
# BEAT NETWORK CLASS and DATA SET CLASS for DATA LOADER

class BeatNet(nn.Module):
    def __init__(self):
        super(BeatNet, self).__init__()
        
        self.l1 = nn.Sequential(
            nn.Conv2d(in_size, h1_size, k_conv_size),
            nn.BatchNorm2d(h1_size),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = k_pool_size)
        )
        
        self.l2 = nn.Sequential(
            nn.Conv2d(h1_size, h2_size, k_conv_size),
            nn.BatchNorm2d(h2_size),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = k_pool_size)
        )
        
        self.fc = nn.Linear(fc_size, out_size)
    
    def forward(self, x):
        
        out = self.l1(x)
        # print(out.shape)
        
        out = self.l2(out)
        # print(out.shape)
        
        out = out.reshape(out.size(0), -1)
        # print(out.shape)
        
        out = self.fc(out)
        # print(out.shape)
        
        return out



# Dataset for beats
class BeatSet(Dataset):
    def __init__(self, feat_list, targ_list):
        self.features = feat_list
        self.targets = targ_list
        self.length = len(self.features)
        super(BeatSet, self).__init__()

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        # get 1 feature/target pair
        # convert to PyTorch tensor and return
        return torch.from_numpy(self.features[index]), torch.from_numpy(np.asarray(self.targets[index])) #torch.from_numpy(self.targets[index])



# helper class for arguments
class Args:
    pass

In [None]:
# train / test / predict

def train_one_epoch(args, model, device, train_loader, optimizer, epoch):
    """
    Training for one epoch.
    """
    # set model to training mode (activate dropout / batch normalization).
    model.train()
    t = time.time()
    # iterate through all data using the loader
    for batch_idx, (data, target) in enumerate(train_loader):
        # move data to device
        data, target = data.to(device), target.to(device)
        
        # reset optimizer (clear previous gradients)
        optimizer.zero_grad()
        # forward pass (calculate output of network for input)
        output = model(data.float())
        # calculate loss        
        loss = F.binary_cross_entropy(output, target)
        # do a backward pass (calculate gradients using automatic differentiation and backpropagation)
        loss.backward()
        # udpate parameters of network using calculated gradients
        optimizer.step()
        
        # print logs
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}, took {:.2f}s'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item(), time.time()-t))
            t = time.time()


       
def calculate_unseen_loss(model, device, unseen_loader):
    """
    Calculate loss for unseen data (validation or testing)
    :return: cumulative loss
    """
    # set model to inference mode (deactivate dropout / batch normalization).
    model.eval()
    # init cumulative loss
    unseen_loss = 0
    # no gradient calculation    
    with torch.no_grad():
        # iterate over test data
        for data, target in unseen_loader:
            # move data to device
            data, target = data.to(device), target.to(device)
            # forward pass (calculate output of network for input)
            output = model(data.float())
            # claculate loss and add it to our cumulative loss
            unseen_loss += F.binary_cross_entropy(output, target, reduction='sum').item() # sum up batch loss

    # output results of test run
    unseen_loss /= len(unseen_loader.dataset)
    print('Average loss: {:.4f}\n'.format(
        unseen_loss, len(unseen_loader.dataset)))

    return unseen_loss
  
    
    
def predict(model, device, data):
    """
    Predict beat
    :return: prediction
    """
    # set model to inference mode (deactivate dropout / batch normalization).
    model.eval()
    output = None
    # move data to device
    data = torch.from_numpy(data)
    data = data.to(device)
    # no gradient calculation
    with torch.no_grad():
        output = model(data.float())
    return output

In [None]:
def run_training():
    print('Training network...')

    # parameters for NN training
    args = Args()
    args.batch_size = 8 #64
    args.max_epochs = 1 #25 #1000
    args.patience = 4
    args.lr = 0.01 # 0.001, 0.0001
    args.momentum = 0.5 #UNUSED
    args.log_interval = 100 #100

    # setup pytorch
    torch.manual_seed(SEED)
    
    # create model and optimizer
    model = BeatNet().to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # setup our datasets for training, evaluation and testing
    kwargs = {'num_workers': 4, 'pin_memory': True} if USE_CUDA else {'num_workers': 4}
    train_loader = torch.utils.data.DataLoader(BeatSet(train_f, train_t),
                                               batch_size=args.batch_size, shuffle=True, **kwargs)
    valid_loader = torch.utils.data.DataLoader(BeatSet(valid_f, valid_t),
                                               batch_size=args.batch_size, shuffle=False, **kwargs)
    test_loader = torch.utils.data.DataLoader(BeatSet(test_f, test_t),
                                              batch_size=args.batch_size, shuffle=False, **kwargs)

    # main training loop
    best_validation_loss = 9999
    cur_patience = args.patience
    for epoch in range(1, args.max_epochs + 1):
        # run one epoch of NN training
        train_one_epoch(args, model, DEVICE, train_loader, optimizer, epoch)
        # validate on validation set
        print('\nValidation Set:')
        validation_loss = calculate_unseen_loss(model, DEVICE, valid_loader)
        # check for early stopping
        if validation_loss < best_validation_loss:
            torch.save(model.state_dict(), os.path.join(MODEL_PATH, MODEL_NAME + '.model'))
            best_validation_loss = validation_loss
            cur_patience = args.patience
        else:
            # if performance does not improve, we do not stop immediately but wait for 4 iterations (patience)
            if cur_patience <= 0:
                print('Early stopping, no improvement for %d epochs...' % args.patience)
                break
            else:
                print('No improvement, patience: %d' % cur_patience)
                cur_patience -= 1

    # testing on test data
    print('Evaluate network...')
    print('Test Set:')
    # calculate loss for test set
    calculate_unseen_loss(model, DEVICE, test_loader)

In [None]:
run_training()

In [None]:
def run_prediction(test_features): 
    torch.manual_seed(SEED)
    
    # load model
    model = BeatNet().to(DEVICE)
    model.load_state_dict(torch.load(os.path.join(MODEL_PATH, MODEL_NAME + '.model')))
    print('model loaded...')
    
    # calculate actual output for the test data
    results_cnn = [None for _ in range(len(test_features))]
    # iterate over test tracks
    for test_idx, cur_test_feat in enumerate(test_features):
        if test_idx % 100 == 0:
            completion = int((test_idx / len(test_features))*100)
            print(str(completion)+'% complete...')
        
        # run the inference method
        result = predict(model, DEVICE, cur_test_feat)
        results_cnn[test_idx] = result.numpy()[0]

    return results_cnn

In [None]:
predicted = run_prediction(test_f)