# Notebook content 
* Loading GMD dataset
* Playing example audio
* Dataset statistics before and after discarding samples
* Extracting MIDI onset times (observations)
* Tuning Kalman Filter parameters on training dataset

### Import packages

In [1]:
import IPython.display
import os
import sys

import matplotlib.pyplot as plt
import numpy as np

sys.path.append(os.path.join("..", "src"))

from data_loader import GmdDataLoader
from switching_kalman_filter_tracker import SwitchingKalmanFilterTracker
from tracker_evaluator import TrackerEvaluator

In [None]:
tracker = SwitchingKalmanFilterTracker(use_ideal_switch_value=True)
print(tracker.state.shape)
print(tracker.K.shape)
print(tracker.C.shape)
print(tracker.P.shape)
print(tracker.A.shape)

In [None]:
tracker = SwitchingKalmanFilterTracker()
print(tracker.state.shape)
print(tracker.K.shape)
print(tracker.C.shape)
print(tracker.P.shape)
print(tracker.A.shape)

In [None]:
tracker.C.shape

In [None]:
tracker.state = tracker.state + tracker.K @ (onset - tracker.C @ tracker.state)
tracker.P = self.P - tracker.K @ tracker.C @ tracker.P
tracker.estimated_states.append(tracker.state)

In [None]:
print(tracker.state.shape)

In [None]:
print(tracker.K.shape)

In [None]:
test_onsets = [i for i in range(100)]

In [None]:
for onset in test_onsets:
    tracker.run(onset)

### Define paths

In [2]:
data_root_path = os.path.join("..", "data")
dataset_root_path = os.path.join(data_root_path, "groove")

### Define constants 

In [3]:
min_duration = 30.0

### Create generator for audio files

In [4]:
groove_data_loader = GmdDataLoader(dataset_root_path)

In [5]:
train_data_generator = groove_data_loader.get_data(split=GmdDataLoader.TRAIN_SPLIT, min_duration=min_duration, 
                                                  get_midi_onsets=True)
val_data_generator = groove_data_loader.get_data(split=GmdDataLoader.VALIDATION_SPLIT, min_duration=min_duration, 
                                                  get_midi_onsets=True)
test_data_generator = groove_data_loader.get_data(split=GmdDataLoader.TEST_SPLIT, min_duration=min_duration, 
                                                  get_midi_onsets=True)

### Get dataset statistics

In [6]:
original_n = groove_data_loader.get_dataset_size()
original_train_n = groove_data_loader.get_dataset_size(split="train")
original_validation_n = groove_data_loader.get_dataset_size(split="validation")
original_test_n = groove_data_loader.get_dataset_size(split="test")
print(("Number of samples before discarding stage - Total samples: {} - Training samples: {} - Validation samples: {} - " + 
      "Test samples: {}").format(original_n, original_train_n, original_validation_n, original_test_n))
n = groove_data_loader.get_dataset_size(min_duration = min_duration)
train_n = groove_data_loader.get_dataset_size(split="train", min_duration = min_duration)
validation_n = groove_data_loader.get_dataset_size(split="validation", min_duration = min_duration)
test_n = groove_data_loader.get_dataset_size(split="test", min_duration = min_duration)
print(("Number of samples after discarding stage - Total samples: {} - Training samples: {} - Validation samples: {} - " + 
      "Test samples: {}").format(n, train_n, validation_n, test_n))

Number of samples before discarding stage - Total samples: 1150 - Training samples: 897 - Validation samples: 124 - Test samples: 129
Number of samples after discarding stage - Total samples: 346 - Training samples: 250 - Validation samples: 41 - Test samples: 55


### Extract all training onset times from MIDI file

In [7]:
X_train = []
meta_data_rows = []
start_times = []
for x, meta_data_row, start_time in train_data_generator:
    X_train.append(x)
    meta_data_rows.append(meta_data_row)
    start_times.append(start_time)
for x, meta_data_row, start_time in val_data_generator:
    X_train.append(x)
    meta_data_rows.append(meta_data_row)
    start_times.append(start_time)    
X_train = np.array(X_train)

  X_train = np.array(X_train)


### Extract all test onset times from MIDI file

In [8]:
X_test = []
meta_data_rows_test = []
start_times_test = []
for x, meta_data_row, start_time in test_data_generator:
    X_test.append(x)
    meta_data_rows_test.append(meta_data_row)
    start_times_test.append(start_time)
X_test = np.array(X_test)    

  X_test = np.array(X_test)


### Tune switching state model for when using the ideal switching variable value. 

In [20]:
tune_params = False

In [21]:
y_train = [x["bpm"] for x in meta_data_rows]
y_test = [x["bpm"] for x in meta_data_rows_test]

In [22]:
if tune_params:
    onset_process_noise_std_vals = [0.005, 0.01, 0.05, 0.1, 0.2]
    tempo_process_noise_std_vals = [0.005, 0.01, 0.05, 0.1, 0.2]
    measurement_noise_std_vals = [0.01, 0.1, 0.2]
    evaluator = TrackerEvaluator()
    best_accuracy = -1
    for onset_noise in onset_process_noise_std_vals:
        for tempo_noise in tempo_process_noise_std_vals:
            for measurement_noise in measurement_noise_std_vals:
                # Create argument dictionary
                kwargs = {
                    "onset_process_noise_std": onset_noise,
                    "tempo_process_noise_std": tempo_noise,
                    "measurement_noise_std": measurement_noise,
                    "use_ideal_switch_value": True
                }
                # Evaluate
                accuracy = evaluator.evaluate(SwitchingKalmanFilterTracker, X_train, y_train, **kwargs)
                # Report metric if better than any other previous results
                if accuracy > best_accuracy:
                    print("Best accuracy so far: {}".format(accuracy))
                    print("Arguments: {}".format(str(kwargs)))
                    best_accuracy = accuracy
                    best_params = kwargs

### Evaluate on test set

In [23]:
evaluator = TrackerEvaluator()
kwargs = {
    "onset_process_noise_std": 0.01,
    "tempo_process_noise_std": 0.005,
    "measurement_noise_std": 0.1,
    "use_ideal_switch_value": True
}

In [24]:
accuracy = evaluator.evaluate(SwitchingKalmanFilterTracker, X_test, y_test, **kwargs)

In [25]:
print("Test accuracy for ideal switch variable computation {}".format(accuracy))

Test accuracy for ideal switch variable computation 0.8363636363636363


### Tune PF parameters

In [None]:
if tune_params:
    onset_process_noise_std_vals = [0.005, 0.01, 0.05, 0.1, 0.2]
    tempo_process_noise_std_vals = [0.005, 0.01, 0.05, 0.1, 0.2]
    measurement_noise_std_vals = [0.01, 0.1, 0.2]
    evaluator = TrackerEvaluator()
    best_accuracy = -1
    for onset_noise in onset_process_noise_std_vals:
        for tempo_noise in tempo_process_noise_std_vals:
            for measurement_noise in measurement_noise_std_vals:
                # Create argument dictionary
                kwargs = {
                    "onset_process_noise_std": onset_noise,
                    "tempo_process_noise_std": tempo_noise,
                    "measurement_noise_std": measurement_noise,
                    "use_ideal_switch_value": False
                }
                # Evaluate
                accuracy = evaluator.evaluate(SwitchingKalmanFilterTracker, X_train, y_train, **kwargs)
                # Report metric if better than any other previous results
                if accuracy > best_accuracy:
                    print("Best accuracy so far: {}".format(accuracy))
                    print("Arguments: {}".format(str(kwargs)))
                    best_accuracy = accuracy
                    best_params = kwargs

### Report accuracy metric

In [None]:
import math

In [None]:
x = X_train[0]
tempo_tracker = SwitchingKalmanFilterTracker(use_ideal_switch_value=True)
for onset in x:
    tempo_tracker.run(onset)
# Get last tempo estimate in BPM
last_tempo_estimate = tempo_tracker.get_tempo_estimates()[-1]
last_tempo_estimate_bpm = tempo_tracker.tempo_period_to_bpm(last_tempo_estimate)

# Round tempo estimate to closest integer
rounded_tempo = math.ceil(last_tempo_estimate_bpm) \
    if last_tempo_estimate_bpm % 1 >= 0.5 else int(last_tempo_estimate_bpm)

print(tempo_tracker.tempo_period_to_bpm(last_tempo_estimate))