# Key estimation challenge

This challenge was about finding out which key a piece was written in. The input was a set of .midi files containing performed pieces, and the output was a file that assigned each piece a key.

To solve this problem, a recurrent neural network (RNN) was used as described by Liu et al. in ["PERFORMANCE MIDI-TO-SCORE CONVERSION BY
NEURAL BEAT TRACKING"](https://www.turing.ac.uk/sites/default/files/2022-09/midi_quantisation_paper_ismir_2022_0.pdf). This paper is also explained in a [YouTube-Video](https://www.youtube.com/watch?v=yumxXCYSgbY) and their code is openly available at [GitHub](https://github.com/cheriell/PM2S).
The architecture of the paper's neural networks is shown below.
For our purposes, the branches ending with key signature and time signature numerators were implemented, but with more classes to support all 24 keys and 2 more numerators (9 and 12).
As the original project was about midi performance to score conversion, it was not needed to find out the root of a scale, only the scale itself, e.g. they did not know whether a pieces' scale was C or Am when the notes C, D, E, F, G, H, A, B are in the scale.

![Architecture of the paper's neural networks.](img/NN.png)

We altered their code base to work with the data set given to us and to produce output in the correct format.


## Loading Data

At first, we have to load in our data in the format that is accepted by the RNN. Every piece should be transformed into a list of notes represented of a tuple (pitch, onset_sec, duration_sec, velocity). As the RNN is able to handle key signature changes, the label is not a single key, but a list of key signature changes. In our case there is always just one key signature change at the beginning of the piece.

### Data Augmentation

Before we create the data set class, we have to take a look at data augmentation.

An easy way to upscale the size of the given data set to prevent overfitting is augmenting the data. In this case, the tempo of the piece can be changed by scaling the onset and duration of every note by a given factor. Even more importantly, the piece can be transposed by changing the pitch of all notes simultaneously. When doing this, the key label also has to be changed, because a piece in B major that is transposed by a semitone upwards is now in C major. Please note that in the original code, there was an error that caused keys to be shifted incorrectly, causing the label of a piece in B major to be set to C minor instead of C major when pitching up by a semitone.

In [12]:
class DataAugmentation:
    def __init__(self,
                 tempo_change_prob=1.0,
                 tempo_change_range=(0.8, 1.2),
                 pitch_shift_prob=1.0,
                 pitch_shift_range=(-6, 6)):

        self.tempo_change_prob = tempo_change_prob
        self.tempo_change_range = tempo_change_range
        self.pitch_shift_prob = pitch_shift_prob
        self.pitch_shift_range = pitch_shift_range

    def __call__(self, note_sequence, annotations):
        # tempo change
        if random.random() < self.tempo_change_prob:
            note_sequence, annotations = self.tempo_change(note_sequence, annotations)

        # pitch shift
        if random.random() < self.pitch_shift_prob:
            note_sequence, annotations = self.pitch_shift(note_sequence, annotations)

        return note_sequence, annotations

    def tempo_change(self, note_sequence, annotations):
        tempo_change_ratio = random.uniform(*self.tempo_change_range)
        note_sequence[:, 1:3] *= 1 / tempo_change_ratio
        annotations['time_signatures'][:, 0] *= 1 / tempo_change_ratio
        annotations['key_signatures'][:, 0] *= 1 / tempo_change_ratio
        return note_sequence, annotations

    def pitch_shift(self, note_sequence, annotations):
        shift = round(random.uniform(*self.pitch_shift_range))
        note_sequence[:, 0] += shift

        for i in range(len(annotations['key_signatures'])):
            key = annotations['key_signatures'][i, 1]
            minor_offset = 12 * (key // 12)
            scale_offset = key - minor_offset

            annotations['key_signatures'][i, 1] = minor_offset + (scale_offset + shift) % 12

        return note_sequence, annotations

### Base Data Set

As we use the same RNN for the key and meter estimation challenges, we have an abstract BaseDataSet for both of them. This data set holds the data and can then generate random batches to feed to the RNN for training.

In [23]:
import os, sys

sys.path.insert(0, os.path.join(sys.path[0], '..'))
import torch
import pandas as pd
from collections import defaultdict
import random
import partitura as pt
from pathlib import Path

class BaseDataset(torch.utils.data.Dataset):

    def __init__(self, workspace, split):

        # parameters
        self.workspace = workspace
        self.feature_folder = workspace
        self.split = split

        # Get metadata by split
        self.metadata = pd.read_csv(os.path.join(self.feature_folder, 'key-meter_train_gt.txt'), delimiter=',')
        self.metadata.reset_index(inplace=True)

        # Get distinct pieces
        self.piece2row = defaultdict(list)
        for i, row in self.metadata.iterrows():
            self.piece2row[row['filename']].append(i)
        self.pieces = list(self.piece2row.keys())

        # Initialise data augmentation
        self.dataaug = DataAugmentation()

    def __len__(self):
        if self.split == 'train' or self.split == 'all':
            # constantly update 200 steps per epoch, not related to training dataset size
            # THIS WAS LOWERED TO REDUCE TRAINING TIME IN THIS NOTEBOOK
            return batch_size * 20

        elif self.split == 'valid':
            # by istinct pieces in validation set
            # THIS WAS LOWERED TO REDUCE VALIDATION TIME IN THIS NOTEBOOK
            return batch_size * 5 #len(self.piece2row) // 10  # valid dataset size

        elif self.split == 'test':
            return len(self.metadata)

    def _sample_row(self, idx):
        # Sample one row from the metadata
        if self.split == 'train' or self.split == 'all':
            piece_id = random.choice(list(self.piece2row.keys()))   # random sampling by piece
            row_id = random.choice(self.piece2row[piece_id])
        elif self.split == 'valid':
            piece_id = self.pieces[idx // batch_size]    # by istinct pieces in validation set
            row_id = self.piece2row[piece_id][idx % batch_size % len(self.piece2row[piece_id])]
        elif self.split == 'test':
            row_id = idx
        row = self.metadata.iloc[row_id]

        return row

    def _load_data(self, row):
        # Get feature
        p = pt.load_performance_midi(str(Path(self.feature_folder, row['filename'])))
        note_array = p.note_array()
        note_sequence = np.array(list(zip(note_array['pitch'], note_array['onset_sec'], note_array['duration_sec'], note_array['velocity'])))
        annotations = {
            'time_signatures': np.array([(0., row['ts_num'])]),
            'key_signatures': np.array([(0., keyName2Number[row['key']])]),
            'tempo': np.array([(0., row['tempo'])])
        }

        # Data augmentation
        if self.split == 'train' or self.split == 'all':
            note_sequence, annotations = self.dataaug(note_sequence, annotations)

        # Randomly sample a segment that is at most max_length long
        if self.split == 'train' or self.split == 'all':
            start_idx = random.randint(0, len(note_sequence)-1)
            end_idx = start_idx + max_length
        elif self.split == 'valid':
            start_idx, end_idx = 0, max_length  # validate on the segment starting with the first note
        elif self.split == 'test':
            start_idx, end_idx = 0, len(note_sequence)  # test on the whole note sequence

        if end_idx > len(note_sequence):
            end_idx = len(note_sequence)

        note_sequence = note_sequence[start_idx:end_idx]

        return note_sequence, annotations

### Key Signature Data Set

To provide key estimation specific data, the abstract BaseDataSet is derived to return the correct label for every item in the dataset.

In [24]:
import numpy as np

from PM2S.constants import *


class KeySignatureDataset(BaseDataset):

    def __init__(self, workspace, split):
        super().__init__(workspace, split)

    def __getitem__(self, idx):

        row = self._sample_row(idx)
        note_sequence, annotations = self._load_data(row)

        # Get model output data
        key_signatures = annotations['key_signatures']

        key_numbers = np.zeros(len(note_sequence)).astype(float)

        for i in range(len(note_sequence)):
            onset = note_sequence[i,1]
            for ks in key_signatures:
                if ks[0] > onset + tolerance:
                    break
                key_numbers[i] = ks[1] % keyVocabSize

        # padding
        length = len(note_sequence)
        if length < max_length:
            note_sequence = np.concatenate([note_sequence, np.zeros((max_length - length, 4))])
            key_numbers = np.concatenate([key_numbers, np.zeros(max_length - length)])

        return note_sequence, key_numbers, length

### Data module
Pytorch Lightning uses an implementation of a LightningDataModule for the training. It is needed for loading data differently during training, evaluating and testing.

In [25]:
import pytorch_lightning as pl
from PM2S.configs import *

class Pm2sDataModule(pl.LightningDataModule):

    def __init__(self, args, feature='key_signature', full_train=True):
        super().__init__()
        
        # Parameters from input arguments
        self.workspace = args.workspace
        self.feature = feature
        self.full_train = full_train

    def _get_dataset(self, split):
        if self.feature == 'key_signature':
            dataset = KeySignatureDataset(self.workspace, split)
        elif self.feature == 'time_signature':
            dataset = TimeSignatureDataset(self.workspace, split) # will be important later
        else:
            raise ValueError('Unknown feature: {}'.format(self.feature))
        return dataset

    def train_dataloader(self):
        if self.full_train:
            dataset = self._get_dataset(split='all')
        else:
            dataset = self._get_dataset(split='train')
        sampler = torch.utils.data.sampler.RandomSampler(dataset)
        dataloader = torch.utils.data.dataloader.DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            drop_last=True
        )
        return dataloader

    def val_dataloader(self):
        dataset = self._get_dataset(split='valid')
        sampler = torch.utils.data.sampler.SequentialSampler(dataset)
        dataloader = torch.utils.data.dataloader.DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            drop_last=True
        )
        return dataloader

    def test_dataloader(self):
        dataset = self._get_dataset(split='test')
        sampler = torch.utils.data.sampler.SequentialSampler(dataset)
        dataloader = torch.utils.data.dataloader.DataLoader(
            dataset,
            batch_size=1,
            sampler=sampler,
            num_workers=num_workers,
            drop_last=False
        )
        return dataloader

This concludes loading the data, now we can generate our RNN and train it!

### Recurrent Neural Network

The architecture of the RNN we use consumes a list of notes represented by tuples that gets fed through a convolutional neural network (CNN) block, a gated recurrent unit (GRU) block and then through a linear block with a dropout layer.

In [40]:
import torch.nn as nn

from PM2S.models.blocks import ConvBlock, GRUBlock, LinearOutput
from PM2S.constants import keyVocabSize
from PM2S.models.utils import get_in_features, encode_note_sequence


class RNNKeySignatureModel(nn.Module):

    def __init__(self, hidden_size=512):
        super().__init__()

        in_features = get_in_features()

        self.convs = ConvBlock(in_features=in_features)

        self.gru = GRUBlock(in_features=hidden_size)

        self.out = LinearOutput(in_features=hidden_size, out_features=keyVocabSize, activation_type='softmax')

    def forward(self, x):
        # x: (batch_size, seq_len, len(features)==4)
        x = encode_note_sequence(x)

        x = self.convs(x) # (batch_size, seq_len, hidden_size)
        x = self.gru(x) # (batch_size, seq_len, hidden_size)
        y = self.out(x) # (batch_size, seq_len, keyVocabSize)
        y = y.transpose(1, 2) # (batch_size, keyVocabSize, seq_len)

        return y

### Module

Before we can start training, we have to wrap our model in a KeySignatureModel that handles training and validation steps by calculating the loss and f1 score.

In [41]:
import pytorch_lightning as pl
import os, sys

from PM2S.modules.utils import configure_callbacks, configure_optimizers, classification_report_framewise

sys.path.insert(0, os.path.join(sys.path[0], '..'))
import torch.nn as nn

class KeySignatureModule(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.model = RNNKeySignatureModel()

    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        return configure_optimizers(self)

    def configure_callbacks(self):
        return configure_callbacks(monitor='val_f1')

    def training_step(self, batch, batch_size):
        # Data
        x, y, length = batch
        x = x.float()
        y = y.long()
        length = length.long()

        # Forward pass
        y_hat = self(x)

        # Mask out the padding part
        mask = torch.ones(y_hat.shape).to(y_hat.device)
        for i in range(y_hat.shape[0]):
            mask[i, length[i]:] = 0
        y_hat = y_hat * mask

        # Loss
        loss = nn.NLLLoss()(y_hat, y)

        # Logging
        logs = {
            'train_loss': loss,
        }
        self.log_dict(logs, prog_bar=True)

        return {'loss': loss, 'logs': logs}

    def validation_step(self, batch, batch_size):
        # Data
        x, y, length = batch
        x = x.float()
        y = y.long()
        length = length.long()

        # Forward pass
        y_hat = self(x)

        # Mask out the padding part
        for i in range(y_hat.shape[0]):
            y_hat[i, length[i]:] = 0

        # Loss
        loss = nn.NLLLoss()(y_hat, y)

        # Metrics
        f_macro_all = 0

        for i in range(y_hat.shape[0]):
            # get sample from batch
            y_hat_i = y_hat[i, :, :length[i]].topk(1, dim=0)[1][0]
            y_i = y[i, :length[i]]

            # get accuracies
            (
                _, _, f_macro,
                _, _, _
            ) = classification_report_framewise(y_i, y_hat_i)

            f_macro_all += f_macro

        f_macro_all /= y_hat.shape[0]

        # Logging
        logs = {
            'val_loss': loss,
            'val_f1': f_macro_all,
        }
        self.log_dict(logs, prog_bar=True)

        return {'loss': loss, 'logs': logs}


## Training

Now, let's put everything together and train the model! If you have the whole dataset at hand, change the path in the code below to its location. Please make sure the first line has no '//' at the start because the column labels will not be processed then, also "tempo(bpm)" should be replaced by "tempo".

This training will continue until you stop it.

In [42]:
import warnings
warnings.filterwarnings('ignore')

def train(args):
    # Data
    data_module = Pm2sDataModule(args, feature=args.feature, full_train=args.full_train)

    # Model
    if args.feature == 'key_signature':
        model = KeySignatureModule()
    elif args.feature == 'time_signature':
        model = TimeSignatureModule() # will be important later
    else:
        raise ValueError('Invalid feature type.')

    # Trainer
    trainer = pl.Trainer(
        default_root_dir=os.path.join(args.workspace, 'mlruns'),
        log_every_n_steps=50,
        reload_dataloaders_every_n_epochs=True
    )

    # Train
    trainer.fit(model, data_module)

In [29]:
class KeyTrainArgs:
    workspace = os.path.join('.', 'train') # set to your workspace which contains the dataset
    feature = 'key_signature'
    full_train = True

train(KeyTrainArgs())

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
The following callbacks returned in `LightningModule.configure_callbacks` will override existing callbacks passed to Trainer: ModelCheckpoint
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type                 | Params
-----------------------------------------------
0 | model | RNNKeySignatureModel | 10.5 M
-----------------------------------------------
10.5 M    Trainable params
0         Non-trainable params
10.5 M    Total params
42.011    Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

## Model saving

When the model is trained long enough, checkpoints are created in the 'mlruns' folder in your workspace (root folder by default). To export these models for predicting the key of unlabelled pieces, we have to process and save them as a .pth file.

In [38]:
import warnings
warnings.filterwarnings('ignore')


def save_model(args):
    if args.feature == 'time_signature':
        module = TimeSignatureModule.load_from_checkpoint(args.model_checkpoint_path) # will be important later
        model_save_path = './_model_state_dicts/time_signature/RNNTimeSignatureModel.pth'

    elif args.feature == 'key_signature':
        module = KeySignatureModule.load_from_checkpoint(args.model_checkpoint_path)
        model_save_path = './_model_state_dicts/key_signature/RNNKeySignatureModel.pth'

    else:
        raise ValueError('Invalid feature type.')
        
    Path.mkdir(Path(model_save_path).parent, parents=True, exist_ok=True)
    torch.save(module.model.state_dict(), model_save_path)

In [43]:
class KeySaveArgs:
    model_checkpoint_path = os.path.join('.', 'mlruns', 'lightning_logs', 'version_1', 'checkpoints', 'last.ckpt') # insert path your trained model here
    feature = 'key_signature'

save_model(KeySaveArgs())

## Prediction

The RNN we trained now can be fed data from .midi files to predict when the key of the piece changes. As our performances generally are only in a single key, we have to aggregate these outputs. To do so, we sum up all the durations of the predicted keys by subtracting their start time from the time of the next key change, or the end of the piece. Then, we take the key that has the longest duration in the piece.

### Processor

We create a class that processes a collection of unlabelled inputs. Please download the trained model from out [GitHub](https://github.com/ZekReshi/Music-Informatics) by downloading the directory "_model_state_dicts" or set the path in the code below to your own model directory.

In [44]:
import torch

from PM2S.features._processor import MIDIProcessor
from PM2S.constants import keyNumber2Name


class RNNKeySignatureProcessor(MIDIProcessor):

    def __init__(self, model_state_dict_path='_model_state_dicts/key_signature/RNNKeySignatureModel.pth', **kwargs):
        super().__init__(model_state_dict_path, **kwargs)

    def load(self, state_dict_path):
        if state_dict_path:
            self._model = RNNKeySignatureModel()
            self._model.load_state_dict(torch.load(state_dict_path))
        else:
            self._model = RNNKeySignatureModel()

    def process(self, note_seq, **kwargs):
        x = torch.tensor(note_seq).unsqueeze(0)

        # Forward pass
        key_probs = self._model(x)

        # Post-processing
        key_idx = key_probs[0].topk(1, dim=0)[1].squeeze(0).cpu().detach().numpy() # (seq_len,)

        onsets = note_seq[:, 1]
        key_signature_changes = self.pps(key_idx, onsets)

        return key_signature_changes

    def pps(self, key_idx, onsets):
        ks_prev = '0'
        ks_changes = []
        for i in range(len(key_idx)):
            ks_cur = keyNumber2Name[key_idx[i]]
            if i == 0 or ks_cur != ks_prev:
                onset_cur = onsets[i]
                ks_changes.append((onset_cur, ks_cur))
                ks_prev = ks_cur
        return ks_changes

In [48]:
import glob
import os
from pathlib import Path
import warnings
warnings.filterwarnings("ignore")

class KeyPredictionArgs:
    datadir = os.path.join('.', 'test')
    modeldir = '.' # insert path to our saved model here
    
args = KeyPredictionArgs()

midi_files = glob.glob(os.path.join(args.datadir, "*.mid"))
midi_files.sort()

# Create time and key processors
processor_key_sig = RNNKeySignatureProcessor(os.path.join(args.modeldir, '_model_state_dicts', 'key_signature', 'RNNKeySignatureModel.pth'))

results = []
# Prediction
for idx, file in enumerate(midi_files):
    p = pt.load_performance_midi(Path(file))
    note_array = p.note_array()
    note_sequence = np.array(list(zip(note_array['pitch'], note_array['onset_sec'], note_array['duration_sec'], note_array['velocity'])))

    key_signature_changes = processor_key_sig.process(note_sequence)

    length = note_sequence[-1][1] + note_sequence[-1][2]

    last_onset, last_key = key_signature_changes[0]
    durations = {}
    for onset, key in key_signature_changes[1:]:
        if last_key not in durations.keys():
            durations[last_key] = 0
        durations[last_key] += onset - last_onset
        last_onset = onset
        last_key = key
    if last_key not in durations.keys():
        durations[last_key] = 0
    durations[last_key] += length - last_onset

    best_duration, best_key = 0, 0
    for key, duration in durations.items():
        if duration > best_duration:
            best_duration = duration
            best_key = key

    print(f"{str(idx + 1)}/{str(len(midi_files))}: {file}")
    print("Prediction: " + str(best_key))

    results.append((os.path.basename(file), best_key))


1/153: .\test\mi2023_key-meter_test_000.mid
Prediction: Eb
2/153: .\test\mi2023_key-meter_test_001.mid
Prediction: D#m
3/153: .\test\mi2023_key-meter_test_002.mid
Prediction: C#m
4/153: .\test\mi2023_key-meter_test_003.mid
Prediction: Em
5/153: .\test\mi2023_key-meter_test_004.mid
Prediction: Am
6/153: .\test\mi2023_key-meter_test_005.mid
Prediction: F#
7/153: .\test\mi2023_key-meter_test_006.mid
Prediction: Dm
8/153: .\test\mi2023_key-meter_test_007.mid
Prediction: Fm
9/153: .\test\mi2023_key-meter_test_008.mid
Prediction: E
10/153: .\test\mi2023_key-meter_test_009.mid
Prediction: D#m
11/153: .\test\mi2023_key-meter_test_010.mid
Prediction: C
12/153: .\test\mi2023_key-meter_test_011.mid
Prediction: Ab
13/153: .\test\mi2023_key-meter_test_012.mid
Prediction: E
14/153: .\test\mi2023_key-meter_test_013.mid
Prediction: Ab
15/153: .\test\mi2023_key-meter_test_014.mid
Prediction: F#m
16/153: .\test\mi2023_key-meter_test_015.mid
Prediction: Ab
17/153: .\test\mi2023_key-meter_test_016.mid
Pre

## Conclusion

This model, when applied to our data, is able to solve this challenge in a satisfying manner as our team is placed number 1 on the leaderboard (as of January 8th). After a couple of hours of training (about 13 epochs), we already achieved an f-score of 0.97, but this score must be taken with a grain of salt as the verification is done with pieces from the training set. We reached an average tonal distance of 0.379, which tells us that our model is right most of the time, and if it does not output the right key it is probably not that far off, i.e. a C major key is wrongly detected as an A minor key, which uses the same notes but has a different root.

# Meter estimation challenge

Very similarly to the key estimation challenge, the meter estimation challenge is about processing performed pieces to calculate which meter the piece is written in, e.g. 4/4 or 3/4. But this challenge only takes the numerator into account because the challenge already is quite hard. The provided dataset is the same as the dataset for the key estimation challenge, and our solution also uses many components that have been seen in the above paragraphs. Therefore, it is required to execute the code above for this part to work. 

At first, we will only look at the meter, not the tempo.

## Dataset

This part is nearly a carbon copy of the one above, so we will just create the classes without further elaboration.

In [49]:
class TimeSignatureDataset(BaseDataset):

    def __init__(self, workspace, split):
        super().__init__(workspace, split)

    def __getitem__(self, idx):

        row = self._sample_row(idx)
        note_sequence, annotations = self._load_data(row)

        # Get model output data
        time_signatures = annotations['time_signatures']
        ts_numerators = np.zeros(len(note_sequence)).astype(float)

        for i in range(len(note_sequence)):
            onset = note_sequence[i, 1]
            for ts in time_signatures:
                if ts[0] > onset + tolerance:
                    break
                ts_numerators[i] = tsNume2Index[int(ts[1])] if int(ts[1]) in tsNume2Index.keys() else 0

        # padding
        length = len(note_sequence)
        if length < max_length:
            note_sequence = np.concatenate([note_sequence, np.zeros((max_length - length, 4))])
            ts_numerators = np.concatenate([ts_numerators, np.zeros(max_length - length)])

        return note_sequence, ts_numerators, length

## Recurrent Neural Network

The RNN model as well as the module wrapper is also the same as for the key estimation challenge, the only difference being the last layer, which of course has to provide different (less) classes as an output.

In [50]:
class RNNTimeSignatureModel(nn.Module):

    def __init__(self, hidden_size=512):
        super().__init__()

        in_features = get_in_features()

        self.convs = ConvBlock(in_features=in_features)
        self.gru = GRUBlock(in_features=hidden_size)
        self.out_tn = LinearOutput(in_features=hidden_size, out_features=tsNumeVocabSize, activation_type='softmax')
        
    def forward(self, x):
        # x: (batch_size, seq_len, len(features)==4)
        x = encode_note_sequence(x)

        x = self.convs(x)  # (batch_size, seq_len, hidden_size)

        x_gru = self.gru(x)  # (batch_size, seq_len, hidden_size)
        y_tn = self.out_tn(x_gru)  # (batch_size, seq_len, tsNumeVocabSize)
        y_tn = y_tn.permute(0, 2, 1)  # (batch_size, tsNumeVocabSize, seq_len)

        return y_tn

In [51]:
class TimeSignatureModule(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.model = RNNTimeSignatureModel()

    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        return configure_optimizers(self)

    def configure_callbacks(self):
        return configure_callbacks(monitor='val_f1')

    def training_step(self, batch, batch_size):
        # Data
        x, y_tn, length = batch
        x = x.float()
        y_tn = y_tn.long()
        length = length.long()

        # Forward pass
        y_tn_hat = self(x)

        # Mask out the padding part
        pad_mask = torch.ones((y_tn_hat.shape[0], y_tn_hat.shape[2])).to(y_tn_hat.device)
        for i in range(y_tn_hat.shape[0]):
            pad_mask[i, length[i]:] = 0
        y_tn_hat = y_tn_hat * pad_mask.unsqueeze(1)

        # Loss
        loss = nn.NLLLoss(ignore_index=0)(y_tn_hat, y_tn)

        # Logging
        logs = {
            'train_loss': loss,
        }
        self.log_dict(logs, prog_bar=True)

        return {'loss': loss, 'logs': logs}

    def validation_step(self, batch, batch_size):
        # Data
        x, y_tn, length = batch
        x = x.float()
        y_tn = y_tn.long()
        length = length.long()

        # Forward pass
        y_tn_hat = self(x)

        # Mask out the padding part
        for i in range(y_tn_hat.shape[0]):
            y_tn_hat[i, :, length[i]:] = 0

        # Loss
        loss = nn.NLLLoss(ignore_index=0)(y_tn_hat, y_tn)

        # Metrics
        fs_macro_tn = 0

        for i in range(x.shape[0]):
            # get sample from batch
            y_tn_hat_i = y_tn_hat[i, :, :length[i]].topk(1, dim=0)[1][0]
            y_tn_i = y_tn[i, :length[i]]

            # filter out ignored indexes (the same as padding)
            y_tn_hat_i = y_tn_hat_i[y_tn_i != 0]
            y_tn_i = y_tn_i[y_tn_i != 0]

            # get accuracies
            (
                _, _, f_macro_tn,
                _, _, _
            ) = classification_report_framewise(y_tn_i, y_tn_hat_i)

            fs_macro_tn += f_macro_tn

        fs_macro_tn /= x.shape[0]

        # Logging
        logs = {
            'val_loss': loss,
            'val_f1': fs_macro_tn,
        }
        self.log_dict(logs, prog_bar=True)

        return {'loss': loss, 'logs': logs}


## Training

Now, let's train the model!

In [53]:
class MeterTrainArgs:
    workspace = os.path.join('.', 'train') # set to your workspace which contains the dataset
    feature = 'time_signature'
    full_train = True

train(MeterTrainArgs())

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
The following callbacks returned in `LightningModule.configure_callbacks` will override existing callbacks passed to Trainer: ModelCheckpoint
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type                  | Params
------------------------------------------------
0 | model | RNNTimeSignatureModel | 10.5 M
------------------------------------------------
10.5 M    Trainable params
0         Non-trainable params
10.5 M    Total params
41.976    Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

### Model saving
Now, let's save the model again.

In [55]:
class MeterSaveArgs:
    model_checkpoint_path = os.path.join('.', 'mlruns', 'lightning_logs', 'version_2', 'checkpoints', 'last.ckpt') # insert path your trained model here
    feature = 'time_signature'

save_model(MeterSaveArgs())

## Prediction
Yet again, we employ the same technique as we did when solving the key estimation challenge. The RNN outputs a list of time signature changes, and we aggregate them to find the time signature that is predicted for the longest duration.

In [56]:
class RNNTimeSignatureProcessor(MIDIProcessor):

    def __init__(self, model_state_dict_path='_model_state_dicts/time_signature/RNNTimeSignatureModel.pth', **kwargs):
        super().__init__(model_state_dict_path, **kwargs)

    def load(self, state_dict_path):
        if state_dict_path:
            self._model = RNNTimeSignatureModel()
            self._model.load_state_dict(torch.load(state_dict_path))
        else:
            self._model = RNNTimeSignatureModel()

    def process(self, note_seq, **kwargs):
        x = torch.tensor(note_seq).unsqueeze(0)

        # Forward pass
        tn_probs = self._model(x)

        # Post-processing
        tn_idx = tn_probs[0].topk(1, dim=0)[1].squeeze(0).cpu().detach().numpy() # (seq_len,)

        onsets = note_seq[:, 1]
        time_signature_changes = self.pps(tn_idx, onsets)

        return time_signature_changes

    def pps(self, tn_idx, onsets):
        ts_prev = '0/0'
        ts_changes = []
        for i in range(len(tn_idx)):
            ts_cur = '{:d}'.format(tsIndex2Nume[tn_idx[i]])
            if i == 0 or ts_cur != ts_prev:
                onset_cur = onsets[i]
                ts_changes.append((onset_cur, ts_cur))
                ts_prev = ts_cur
        return ts_changes

In [58]:
class MeterPredictionArgs:
    datadir = os.path.join('.', 'test')
    modeldir = '.' # insert path to our saved model here
    
args = MeterPredictionArgs()

midi_files = glob.glob(os.path.join(args.datadir, "*.mid"))
midi_files.sort()

# Create time and key processors
processor_time_sig = RNNTimeSignatureProcessor(os.path.join(args.modeldir, '_model_state_dicts', 'time_signature', 'RNNTimeSignatureModel.pth'))

# Prediction
for idx, file in enumerate(midi_files):
    p = pt.load_performance_midi(Path(file))
    note_array = p.note_array()
    note_sequence = np.array(list(zip(note_array['pitch'], note_array['onset_sec'], note_array['duration_sec'], note_array['velocity'])))

    time_signature_changes = processor_time_sig.process(note_sequence)

    length = note_sequence[-1][1] + note_sequence[-1][2]

    last_onset, last_ts_num = time_signature_changes[0]
    durations = {}
    for onset, ts_num in time_signature_changes[1:]:
        if last_ts_num not in durations.keys():
            durations[last_ts_num] = 0
        durations[last_ts_num] += onset - last_onset
        last_onset = onset
        last_ts_num = ts_num
    if last_ts_num not in durations.keys():
        durations[last_ts_num] = 0
    durations[last_ts_num] += length - last_onset

    best_duration, best_ts_num = 0, 0
    for ts_num, duration in durations.items():
        if duration > best_duration:
            best_duration = duration
            best_ts_num = ts_num

    print(f"{str(idx + 1)}/{str(len(midi_files))}: {file}")
    print("Prediction: " + str(best_ts_num))

1/153: .\test\mi2023_key-meter_test_000.mid
Prediction: 2
2/153: .\test\mi2023_key-meter_test_001.mid
Prediction: 4
3/153: .\test\mi2023_key-meter_test_002.mid
Prediction: 2
4/153: .\test\mi2023_key-meter_test_003.mid
Prediction: 4
5/153: .\test\mi2023_key-meter_test_004.mid
Prediction: 2
6/153: .\test\mi2023_key-meter_test_005.mid
Prediction: 3
7/153: .\test\mi2023_key-meter_test_006.mid
Prediction: 4
8/153: .\test\mi2023_key-meter_test_007.mid
Prediction: 4
9/153: .\test\mi2023_key-meter_test_008.mid
Prediction: 4
10/153: .\test\mi2023_key-meter_test_009.mid
Prediction: 4
11/153: .\test\mi2023_key-meter_test_010.mid
Prediction: 4
12/153: .\test\mi2023_key-meter_test_011.mid
Prediction: 4
13/153: .\test\mi2023_key-meter_test_012.mid
Prediction: 4
14/153: .\test\mi2023_key-meter_test_013.mid
Prediction: 4
15/153: .\test\mi2023_key-meter_test_014.mid
Prediction: 4
16/153: .\test\mi2023_key-meter_test_015.mid
Prediction: 4
17/153: .\test\mi2023_key-meter_test_016.mid
Prediction: 2
18/153

## Conclusion of meter estimation

With this model, we were able to achieve an f-score of about 0.47, but again this was done on some pieces of the training set. This might not seem like much, but when we applied it so the test set, we reached a score of 0.572 on the leaderboard, just below the current best meter estimation (0.575 as of January 8th).

Now, we have to take a look at the tempo estimation!

## Tempo

To calculate the tempo, we make use of the hidden markov model (HMM) presented in the lecture with some slight alterations.

### Subbeats from durations

Initially, the HMMs tried every other option with a sub beat count of 2 or 3, but we observed a pattern in many of the train pieces: Very often, a note is played for the duration of a beat, while other notes (mostly higher notes) are played every sub beat. So we could generate a histogram of note durations, search for peaks, and then see if the first two peaks have a factor close to 2 or 3. In this case, only that option is considered when trying out different configurations with the HMM.

To show that this is indeed the case for some pieces, we will now see some pieces that show this pattern:

![3 subbeats per beat.](img/Roll1.png)

![2 subbeats per beat.](img/Roll2.png)

![2 subbeats per beat.](img/Roll3.png)

In [59]:
from scipy.signal import find_peaks

def subbeats_from_durations(note_array: np.ndarray):
    durations = note_array["duration_sec"]
    hist, bins = np.histogram(durations, bins=100)

    peaks, _ = find_peaks(hist, prominence=20)

    if len(peaks) > 1:
        candidate = bins[peaks[1]] / bins[peaks[0]]
        if 1.9 < candidate < 2.1:
            return [2]
        if 2.9 < candidate < 3.1:
            return [3]
    return [2, 3]

### Tempi from inter onset intervals

Calculating the possible tempi by using autocorrelation causes many different tempi to be considered, and most of the time their values are too 'clean', like 120.0, 240.0 or 43.43434343. We try to use the histogram of inter onset intervals (IOIs) to detect how long a beat is by finding the first valid peak and calculating the beats per minute. Then, we calculate a couple of multipliers to add to the tempi list, precisely 1, 2, 4, and 8 as the piece might have a lot of 16th notes although it is written in 4/4, for example.

The following histograms should support our claims:

![A piece with 58.9 BPM.](img/IOI1.png)

In this case, the actual tempo was 58.9 BPM.
From this, we calculate that there are 1.0187 beats per second and 0.982 seconds per beat.
The histogram shows a peak at about 0.12 seconds per beat, which is very close to 0.982 when multiplied by 8.

![A piece with 115.8 BPM.](img/IOI2.png)

Here, the tempo was 115.8 BPM, which are 1.93 beats per second and 0.518 seconds per beat.
When divided by 2, the resulting 0.259 are close to the histrogram's peak.

In [60]:
def tempi_from_iois(note_array: np.ndarray, min_tempo: float, max_tempo: float):
    IOIs = np.diff(np.sort(note_array["onset_sec"]))
    hist, bins = np.histogram(IOIs, bins=100)

    valid_from = 0
    for i in range(len(bins)):
        if bins[i] >= 1 / 16:
            valid_from = i
            break

    new_hist = []
    new_labels = []
    for i in range(valid_from - 1, len(hist) - 1):
        new_hist.append((hist[i-1] + hist[i] + hist[i+1]) / 3)
        new_labels.append((bins[i+1] + bins[i]) / 2)

    peaks, _ = find_peaks(new_hist, prominence=5)

    if len(peaks) == 0:
        return []

    beat_duration = new_labels[peaks[0]]
    beats_per_minute = 60 / beat_duration
    tempi = []
    for i in range(4):
        tempo = beats_per_minute / (2 ** i)
        if min_tempo < tempo < max_tempo:
            tempi.append(tempo)
    return tempi

### Hidden Markov Model

Now, we just take the given HMM from the lecture which was provided via Baseline_Meter.py.

In [61]:
from typing import Tuple, Iterable

from hiddenmarkov import ConstantTransitionModel, ObservationModel

FRAMERATE = 16


class MeterObservationModel(ObservationModel):
    def __init__(
        self,
        states: int = 20,
        downbeat_idx: Iterable = [0],
        beat_idx: Iterable = [50],
        subbeat_idx: Iterable = [25],
    ):
        super().__init__()
        self.states = states
        # observation 1 = note onset present, 0 = nothing present
        self.probabilities = np.ones((2, states)) / 100
        self.probabilities[0, :] = 0.99
        for idx in subbeat_idx:
            self.probabilities[:, idx] = [0.5, 0.5]
        for idx in beat_idx:
            self.probabilities[:, idx] = [0.3, 0.7]
        for idx in downbeat_idx:
            self.probabilities[:, idx] = [0.1, 0.9]
        self.db = downbeat_idx
        self.b = beat_idx
        self.sb = subbeat_idx

    def get_beat_states(self, state_sequence: np.ndarray) -> np.ndarray:
        state_encoder = np.zeros_like(state_sequence)
        for i, state in enumerate(state_sequence):
            if state in self.sb:
                state_encoder[i] = 1
            if state in self.b:
                state_encoder[i] = 2
            if state in self.db:
                state_encoder[i] = 3
        return state_encoder

    def __call__(self, observation: np.ndarray) -> np.ndarray:
        if not self.use_log_probabilities:
            return self.probabilities[observation, :]
        else:
            return np.log(self.probabilities[observation, :])


def getTransitionMatrix(
    states: int,
    distribution: Iterable = [0.1, 0.8, 0.1],
) -> np.ndarray:
    """
    Compute transition Matrix
    """
    if states == 1:
        raise ValueError("The number of states should be > 1")
    transition_matrix = (
        np.eye(states, k=0) * distribution[0]
        + np.eye(states, k=1) * distribution[1]
        + np.eye(states, k=2) * distribution[2]
        + np.ones((states, states)) / 1e7
    )

    transition_matrix[-2, 0] = distribution[2]
    transition_matrix[-1, 0] = distribution[2] + distribution[1]

    return transition_matrix


def createHMM(
    tempo: float = 50,
    frame_rate: int = FRAMERATE,  # frames_per_beat
    beats_per_measure: int = 4,
    subbeats_per_beat: int = 2,
) -> Tuple[MeterObservationModel, ConstantTransitionModel]:
    frames_per_beat = 60 / tempo * frame_rate
    frames_per_measure = frames_per_beat * beats_per_measure
    states = int(frames_per_measure)
    downbeat_idx = [0]
    beat_idx = [int(states / beats_per_measure * k) for k in range(beats_per_measure)]
    subbeat_idx = [
        int(states / (beats_per_measure * subbeats_per_beat) * k)
        for k in range(beats_per_measure * subbeats_per_beat)
    ]

    observation_model = MeterObservationModel(
        states=states,
        downbeat_idx=downbeat_idx,
        beat_idx=beat_idx,
        subbeat_idx=subbeat_idx,
    )

    transition_matrix = getTransitionMatrix(states)
    transition_model = ConstantTransitionModel(transition_matrix)

    return observation_model, transition_model

### Tempo estimation

The method to finally estimate the tempo is nearly identical to the one we received initially, but it now makes use of the sub beat extraction from durations and the tempo candidates extracted from the IOIs. When there are no tempo candidates, we fall back to the autocorrelation again.

In [62]:
from typing import Dict
from partitura.utils.misc import PathLike
from meter_estimation_utils import get_frames_chordify, compute_autocorrelation
from hiddenmarkov import HMM


def estimate_tempo(
    filename: PathLike,
    beats_per_measure: int,
    framerate: int = FRAMERATE,
    frame_threshold: float = 0.0,
    chord_spread_time: float = 1 / 12,
    max_tempo: float = 250,
    min_tempo: float = 30,
) -> float:
    # get note array
    performance = pt.load_performance_midi(filename)
    note_array = performance.note_array()
    subbeats_per_beat = subbeats_from_durations(note_array)
    tempi = tempi_from_iois(note_array, min_tempo, max_tempo)
    if len(tempi) == 0:
        tempi = "auto"

    frames = get_frames_chordify(
        note_array=note_array,
        framerate=framerate,
        chord_spread_time=chord_spread_time,
        threshold=frame_threshold,
    )

    if tempi == "auto":
        autocorr = compute_autocorrelation(frames)
        beat_period, _ = find_peaks(autocorr[1:], prominence=None)
        tempi = 60 * framerate / (beat_period + 1)
        tempi = tempi[np.logical_and(tempi <= max_tempo, tempi >= min_tempo)]

        if len(tempi) == 0:
            tempi = np.linspace(min_tempo, max_tempo, 10)

    likelihoods = []

    for sbpb in subbeats_per_beat:
        for tempo in tempi:
            observation_model, transition_model = createHMM(
                tempo=tempo,
                frame_rate=framerate,
                beats_per_measure=beats_per_measure,
                subbeats_per_beat=sbpb,
            )

            hmm = HMM(
                observation_model=observation_model,
                transition_model=transition_model,
            )

            frames[frames < 1.0] = 0
            frames[frames >= 1.0] = 1

            observations = np.array(frames, dtype=int)
            _, log_lik = hmm.find_best_sequence(observations)

            likelihoods.append((sbpb, tempo, log_lik))

    likelihoods = np.array(likelihoods)

    best_result = likelihoods[likelihoods[:, 2].argmax()]

    best_tempo = best_result[1]

    return best_tempo

def process_file(
    mfn: PathLike, file_to_fix: Dict[str, Tuple[float, float]],
) -> Tuple[str, int, float]:
    piece: str = os.path.basename(mfn)
    meter = int(file_to_fix[piece][0])
    predicted_tempo: float = estimate_tempo(filename=mfn, beats_per_measure=meter)

    return (
        piece,
        meter,
        predicted_tempo,
    )

In [63]:
def load_submission(fn: str) -> dict:
    gt = np.loadtxt(
        fn,
        dtype=str,
        delimiter=",",
        comments="//",
    )

    if gt.shape[1] > 3:
        submission = dict([(g[0], (int(g[2]), float(g[4]))) for g in gt])
    else:
        submission = dict([(g[0], (int(g[1]), float(g[2]))) for g in gt])

    return submission

In [None]:
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm


class TempoEstimationArgs:
    datadir = os.path.join('.', 'test') # insert path to your dataset here
    outfile = os.path.join('.', 'result.txt') # insert path to your out file with meters (and dummy tempos) here


args = TempoEstimationArgs()


# Adapt this part as needed!
midi_files = glob.glob(os.path.join(args.datadir, "*.mid"))
midi_files.sort()

file_to_fix = load_submission(args.outfile)

# Parallel processing with concurrent.futures
with ProcessPoolExecutor() as executor:
    # Using executor.map for parallel processing
    results_ = list(
        tqdm(
            executor.map(
                process_file,
                midi_files,
                len(midi_files) * [file_to_fix],
            ),
            total=len(midi_files),
        )
    )

results = [res[:3] for res in results_]

  0%|          | 0/153 [00:00<?, ?it/s]


## Conclusion

Calculating the tempo in this manner scored us a tempo error of just 9.691 with the test set, which ranks us 3rd on the leaderboard (as of January 8th). Combined with the meter accuracy of 0.572, we are more than happy with our results compared to the others we see on the scoreboard. The tempo estimation can surely be enhanced further, either by improving the HMM or using another approach altogether, like an RNN again, but for this instance we think that our output is sufficient given the simplicity of its calculation as well as its dependence on the (mediocre) accuracy of the meter numerator.