In [161]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler

In [162]:
# RELEVANT CONSTANTS
PATH_TO_DATASET_FOLDER = './PianoFingeringDataset_v1.2/PianoFingeringDataset_v1.2/FingeringFiles/'
PATH_TO_METADATA = './PianoFingeringDataset_v1.2/PianoFingeringDataset_v1.2/List.csv'

FINGERING_TYPE_TO_ANALYZE = "1"    # there are 8 distinct fingerings done by 8 different people


In [163]:
import pandas as pd
from collections import defaultdict

song_metadata_dir_path = PATH_TO_METADATA

print(song_metadata_dir_path)
if os.path.isfile(song_metadata_dir_path):
    song_metadata_df = pd.read_csv(song_metadata_dir_path, skiprows=1, names=["id", "composer", "piece", "num_bars", "num_notes", "num_types_of_fingerings_provided", "fingering_1", "fingering_2", "fingering_3", "fingering_4", "fingering_5", "fingering_6", "fingering_7", "fingering_8"])
    print(song_metadata_df.head(10))
else:
    print("invalid filepath!")

# Creating the dictionary mapping initials to a list of associated piece filenames
annotator_to_files_dict = {}

# Iterate over the rows to construct the mapping
for _, row in song_metadata_df.iterrows():
    piece_id = f"{int(row['id']):03d}"  # Ensuring three-digit format for IDs
    for i in range(1, 9):  # fingering_1 to fingering_8
        fingering_col = f"fingering_{i}"
        annotator = row[fingering_col]
        if pd.notna(annotator):  # Ensure it's not NaN
            filename = f"{piece_id}-{i}"
            if annotator not in annotator_to_files_dict:
                annotator_to_files_dict[annotator] = []
            annotator_to_files_dict[annotator].append(filename)

# Print or use the dictionary
print(annotator_to_files_dict)

./PianoFingeringDataset_v1.2/PianoFingeringDataset_v1.2/List.csv
   id composer                                              piece  num_bars  \
0   1     Bach                      Two-part invention in C major        22   
1   2     Bach                      Two-part invention in F major        34   
2   3     Bach  Well-Tempered Clavier, Book I, Prelude No. 23 ...        19   
3   4     Bach  Well-Tempered Clavier, Book II, Fugue No. 2 in...        14   
4   5     Bach  Well-Tempered Clavier, Book II, Fugue No. 19 i...        14   
5   6     Bach                             Partita No. 6 Corrente        38   
6   7     Bach                       French Suite No. 1 Allemande        12   
7   8     Bach                       French Suite No. 5 Sarabande        16   
8   9     Bach                        English Suite No. 3 Prelude        33   
9  10     Bach                  Goldberg Variations, Variation 13        16   

   num_notes  num_types_of_fingerings_provided fingering_1 finger

In [164]:
# This cell shows how to load in a specific fingering file and also the relevant data
directory_path = PATH_TO_DATASET_FOLDER    # modify this directory path as needed

# Sample code for loading in a the fingering dataset
for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)

    fingering_label, _ = filename.split('_')     # 001-01
    piece_id, fingering_type = fingering_label.split('-')
    piece_id = int(piece_id)
    fingering_type = int(fingering_type)

    if os.path.isfile(file_path):
        df = pd.read_table(file_path, sep="\t", skiprows=1, names=["noteID", "onset_time", "offset_time", "spelled_pitch", "onset_velocity", "offset_velocity", "channel", "finger_number"])
    else:
        print("some error in filepath name!")
        break

    # print(f"Data for {song_metadata_df.iloc[piece_id]["piece"]}")
    print(df.head(1))
    print(df.dtypes)

   noteID  onset_time  offset_time spelled_pitch  onset_velocity  \
0       0    0.004883     0.248048            E4             101   

   offset_velocity  channel  finger_number  
0               80        0              1  
noteID               int64
onset_time         float64
offset_time        float64
spelled_pitch       object
onset_velocity       int64
offset_velocity      int64
channel              int64
finger_number        int64
dtype: object
   noteID  onset_time  offset_time spelled_pitch  onset_velocity  \
0       0    0.266603     0.380373            A3              64   

   offset_velocity  channel finger_number  
0               80        1            -3  
noteID               int64
onset_time         float64
offset_time        float64
spelled_pitch       object
onset_velocity       int64
offset_velocity      int64
channel              int64
finger_number       object
dtype: object
   noteID  onset_time  offset_time spelled_pitch  onset_velocity  \
0       0         0.

Need to make sure all the fields are numbers, so we establish the dictionary mapping like follows:

In [165]:
directory_path = PATH_TO_DATASET_FOLDER

# every pitch is a combination of a note and octave
pitch_classes = ['Ab', 'A', 'A#', 'Bb', 'B', 'B#', 'Cb', 'C', 'C#', 'Db', 'D', 'D#', 'Eb', 'E', 'E#', 'Fb', 'F', 'F#', 'Gb', 'G', 'G#']
octaves = range(0, 9)
pitch_to_int_mapping = {f"{pc}{octave}": i for i, (pc, octave) in enumerate((pc, o) for o in octaves for pc in pitch_classes)}
int_to_pitch_mapping = {i: f"{pc}{octave}" for i, (pc, octave) in enumerate((pc, o) for o in octaves for pc in pitch_classes)}

fingerings = [
    '1', '2', '3', '4', '5',
    '1_', '2_', '3_', '4_', '5_', 
    '-1', '-2', '-3', '-4', '-5', 
    '1_2', '1_3', '1_4', '1_5', 
    '1_-2', '1_-3', '1_-4', '1_-5', 
    '2_1', '2_3', '2_4', '2_5', 
    '2_-1', '2_-3', '2_-4', '2_-5', 
    '3_1', '3_2', '3_4', '3_5', 
    '3_-1', '3_-2', '3_-4', '3_-5', 
    '4_1', '4_2', '4_3', '4_5', 
    '4_-1', '4_-2', '4_-3', '4_-5', 
    '5_1', '5_2', '5_3', '5_4',
    '5_-1', '5_-2', '5_-3', '5_-4',
    '-1_-2', '-1_-3', '-1_-4', '-1_-5',
    '-1_2', '-1_3', '-1_4', '-1_5', 
    '-2_-3', '-2_-4', '-2_-5', '-2_-1',
    '-2_3', '-2_4', '-2_5', '-2_1',
    '-3_-4', '-3_-5', '-3_-2', '-3_-1',
    '-3_4', '-3_5', '-3_2', '-3_1',
    '-4_-5', '-4_-3', '-4_-2', '-4_-1',
    '-4_5', '-4_3', '-4_2', '-4_1',
    '-5_-1', '-5_-2', '-5_-3', '-5_-4',
    '-5_1', '-5_2', '-5_3', '-5_4',
    '1_2_3', '1_2_4', '1_2_5', '1_3_4', '1_3_5', '1_4_5', '2_3_4', '2_3_5', '2_4_5', '3_4_5',
    '-1_1', '-2_2', '-3_3', '-4_4', '-5_5', '1_-1', '2_-2', '3_-3', '4_-4', '5_-5', '0'
]
finger_to_int_mapping = {f: i for i, f in enumerate(fingerings)}
int_to_finger_mapping = {i: f for i, f in enumerate(fingerings)}


# verify that every finger and pitch in our data can be mapped correctly
verify_spelled_pitch_values = set()
verify_fingering_map = set()
for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    if os.path.isfile(file_path):
        df = pd.read_table(file_path, sep="\t", skiprows=1, names=["noteID", "onset_time", "offset_time", "spelled_pitch", "onset_velocity", "offset_velocity", "channel", "finger_number"])
        verify_spelled_pitch_values.update(df['spelled_pitch'].unique())
        verify_fingering_map.update(df['finger_number'].unique())
verify_fingering_map = {str(x) for x in verify_fingering_map}

if not verify_fingering_map.issubset(set(finger_to_int_mapping.keys())):
    print("INVALID FINGER SYMBOL DETECTED: ", verify_fingering_map - set(finger_to_int_mapping.keys()))
elif not verify_spelled_pitch_values.issubset(set(pitch_to_int_mapping.keys())):
    print("INVALID PITCH SYMBOL DETECTED: ", verify_spelled_pitch_values - set(pitch_to_int_mapping.keys()))
else:
    print("pitch_to_int_mapping: ", pitch_to_int_mapping)
    print("\n")
    print("finger_to_int_mapping: ", finger_to_int_mapping)

pitch_to_int_mapping:  {'Ab0': 0, 'A0': 1, 'A#0': 2, 'Bb0': 3, 'B0': 4, 'B#0': 5, 'Cb0': 6, 'C0': 7, 'C#0': 8, 'Db0': 9, 'D0': 10, 'D#0': 11, 'Eb0': 12, 'E0': 13, 'E#0': 14, 'Fb0': 15, 'F0': 16, 'F#0': 17, 'Gb0': 18, 'G0': 19, 'G#0': 20, 'Ab1': 21, 'A1': 22, 'A#1': 23, 'Bb1': 24, 'B1': 25, 'B#1': 26, 'Cb1': 27, 'C1': 28, 'C#1': 29, 'Db1': 30, 'D1': 31, 'D#1': 32, 'Eb1': 33, 'E1': 34, 'E#1': 35, 'Fb1': 36, 'F1': 37, 'F#1': 38, 'Gb1': 39, 'G1': 40, 'G#1': 41, 'Ab2': 42, 'A2': 43, 'A#2': 44, 'Bb2': 45, 'B2': 46, 'B#2': 47, 'Cb2': 48, 'C2': 49, 'C#2': 50, 'Db2': 51, 'D2': 52, 'D#2': 53, 'Eb2': 54, 'E2': 55, 'E#2': 56, 'Fb2': 57, 'F2': 58, 'F#2': 59, 'Gb2': 60, 'G2': 61, 'G#2': 62, 'Ab3': 63, 'A3': 64, 'A#3': 65, 'Bb3': 66, 'B3': 67, 'B#3': 68, 'Cb3': 69, 'C3': 70, 'C#3': 71, 'Db3': 72, 'D3': 73, 'D#3': 74, 'Eb3': 75, 'E3': 76, 'E#3': 77, 'Fb3': 78, 'F3': 79, 'F#3': 80, 'Gb3': 81, 'G3': 82, 'G#3': 83, 'Ab4': 84, 'A4': 85, 'A#4': 86, 'Bb4': 87, 'B4': 88, 'B#4': 89, 'Cb4': 90, 'C4': 91, 'C#4'

In [166]:
# EXAMPLE CODE: Loading in the "x" features and "y" label for a specific piece, using piece number 14 as an example

specific_piece_id = 14

for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)

    fingering_label, _ = filename.split('_')     # 001-01
    piece_id, fingering_type = fingering_label.split('-')
    piece_id = int(piece_id)
    fingering_type = int(fingering_type)

    if piece_id == specific_piece_id:
        df = pd.read_table(file_path, sep="\t", skiprows=1, names=["noteID", "onset_time", "offset_time", "spelled_pitch", "onset_velocity", "offset_velocity", "channel", "finger_number"])

num_data, num_features = df.shape
x = df.iloc[:, 0:num_features - 1]
y = df.iloc[:, num_features - 1]


# convert "spelled pitch" field to a number
x['spelled_pitch'] = x['spelled_pitch'].map(pitch_to_int_mapping)

#  do same for "finger_number" label
y = y.map(finger_to_int_mapping)

x = torch.tensor(x.values.tolist(), dtype=torch.float32)
y = torch.tensor(y.values.astype(float).tolist())
y = y.unsqueeze(1)   # conver from size [289] to size [289, 1]

print(f"x's shape is {x.shape}")
print(f"y's shape is {y.shape}")
print(f"num features is {num_features} and num data is {num_data}")
# print(x)
# print(y)

x's shape is torch.Size([289, 7])
y's shape is torch.Size([289, 1])
num features is 8 and num data is 289


# Baseline Performance (Left/Right split)

In preparation we basically split the data into train/test/validation. Then we separate out the features into a giant X dataframe, and the labels into a giant Y dataframe. To do the baseline we just use logistic regression to just fit X and y. The finger annotations are split into left and right notes. Finger switches from left-to-right are categorized as left (i.e. -5_1) and vice versa. 

Logistic Regression Test Accuracy (Left): 0.3136
Logistic Regression Test Accuracy (Right): 0.2663


**NOTE 1:** There are 8 distinct fingering types, need to isolate by fingering type for accurate predictions
**NOTE 2:** this is a __multi-class__ logistic regression model since there are multiple outputs for fingers

In [167]:
import os
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

#################################
# 1) LOAD ALL DATA INTO ONE DF
#################################

directory_path = PATH_TO_DATASET_FOLDER

all_rows = []
for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)

    fingering_label, _ = filename.split('_')     # 001-01
    piece_id, fingering_type = fingering_label.split('-')

    if os.path.isfile(file_path) and FINGERING_TYPE_TO_ANALYZE == fingering_type:
        # Read the tab-delimited data, skipping the first line
        df = pd.read_table(
            file_path, sep="\t", skiprows=1,
            names=[
                "noteID", "onset_time", "offset_time", "spelled_pitch",
                "onset_velocity", "offset_velocity", "channel",
                "finger_number"
            ]
        )
        df["source_file"] = filename
        all_rows.append(df)

# Concatenate all piece-rows into a single DataFrame
all_data = pd.concat(all_rows, ignore_index=True)

print("Loaded shape:", all_data.shape)
print(all_data.head())

##################################################
# 1) SEPARATE NEGATIVE vs NON-NEGATIVE FINGERS
##################################################

is_negative = all_data["finger_number"].astype(str).str.startswith("-")
all_data_neg = all_data[is_negative].copy()
all_data_pos = all_data[~is_negative].copy()

print(f"Number of rows with negative finger_number: {len(all_data_neg)}")
print(f"Number of rows with non-negative finger_number: {len(all_data_pos)}")

##################################################
# 2) ENCODING FOR DATA
##################################################

all_data_neg["spelled_pitch_int"] = all_data["spelled_pitch"].map(pitch_to_int_mapping)    
all_data_neg["finger_int"] = all_data["finger_number"].astype(str).map(finger_to_int_mapping)

all_data_pos["spelled_pitch_int"] = all_data["spelled_pitch"].map(pitch_to_int_mapping)    
all_data_pos["finger_int"] = all_data["finger_number"].astype(str).map(finger_to_int_mapping)

##################################################
# 3) SPLIT INTO FEATURES (X) AND LABEL (y)
##################################################

feature_cols = [
    "spelled_pitch_int", "onset_time", "offset_time", "onset_velocity", "offset_velocity", "channel"
]
label_col = "finger_int"

# Negative-finger group
X_neg = all_data_neg[feature_cols].values
y_neg = all_data_neg[label_col].values

# Non-negative-finger group
X_pos = all_data_pos[feature_cols].values
y_pos = all_data_pos[label_col].values

# Split negative-finger data
X_temp_neg, X_test_neg, y_temp_neg, y_test_neg = train_test_split(X_neg, y_neg, test_size=0.2, random_state=42)
X_train_neg, X_val_neg, y_train_neg, y_val_neg = train_test_split(X_temp_neg, y_temp_neg, test_size=0.25, random_state=42)

# Split non-negative-finger data
X_temp_pos, X_test_pos, y_temp_pos, y_test_pos = train_test_split(X_pos, y_pos, test_size=0.2, random_state=42)
X_train_pos, X_val_pos, y_train_pos, y_val_pos = train_test_split(X_temp_pos, y_temp_pos, test_size=0.25, random_state=42)

##################################################
# 4A) LOGISTIC REGRESSION BASELINE (SEPARATE HANDS)
##################################################

print("\n============================")
print("LOGISTIC REGRESSION BASELINE")
print("============================")

# Train and evaluate for negative-finger (left-hand) data
clf_neg = LogisticRegression(multi_class='multinomial', max_iter=1000, random_state=42)
clf_neg.fit(X_train_neg, y_train_neg)
y_pred_test_neg = clf_neg.predict(X_test_neg)
test_acc_neg = accuracy_score(y_test_neg, y_pred_test_neg)
print(f"Left-hand (negative) Logistic Regression test accuracy = {test_acc_neg:.4f}")

# Train and evaluate for non-negative-finger (right-hand) data
clf_pos = LogisticRegression(multi_class='multinomial', max_iter=1000, random_state=42)
clf_pos.fit(X_train_pos, y_train_pos)
y_pred_test_pos = clf_pos.predict(X_test_pos)
test_acc_pos = accuracy_score(y_test_pos, y_pred_test_pos)
print(f"Right-hand (positive) Logistic Regression test accuracy = {test_acc_pos:.4f}")

Loaded shape: (45976, 9)
   noteID  onset_time  offset_time spelled_pitch  onset_velocity  \
0       0    0.004883     0.248048            E4             101   
1       1    0.004883     0.133302           G#3             101   
2       2    0.141114     0.255373            B3              96   
3       3    0.263185     0.376955            A3              96   
4       4    0.384768     0.503421           G#3              96   

   offset_velocity  channel finger_number          source_file  
0               80        0             1  129-1_fingering.txt  
1               80        1            -3  129-1_fingering.txt  
2               80        1            -1  129-1_fingering.txt  
3               80        1            -2  129-1_fingering.txt  
4               80        1            -3  129-1_fingering.txt  
Number of rows with negative finger_number: 20837
Number of rows with non-negative finger_number: 25139

LOGISTIC REGRESSION BASELINE


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


Left-hand (negative) Logistic Regression test accuracy = 0.3786
Right-hand (positive) Logistic Regression test accuracy = 0.3323


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


# Baseline RNN (Left/Right split) with full piece (Variant 0)

In [117]:
directory_path = PATH_TO_DATASET_FOLDER

all_sequences = []   # will hold raw DataFrames for each piece
all_piece_names = []

FINGERING_TYPE_TO_ANALYZE = "YI"
yi_filenames = set(annotator_to_files_dict.get(FINGERING_TYPE_TO_ANALYZE, []))


for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    fingering_label, _ = filename.split('_')  # e.g. "001-01"
    piece_id, fingering_type = fingering_label.split('-')

    if os.path.isfile(file_path) and fingering_label in yi_filenames:
        df = pd.read_table(
            file_path, sep="\t", skiprows=1,
            names=[
                "noteID", "onset_time", "offset_time", "spelled_pitch",
                "onset_velocity", "offset_velocity", "channel",
                "finger_number"
            ]
        )
        all_piece_names.append(filename)
        all_sequences.append(df)

print(f"Total number of pieces found: {len(all_sequences)}")
raw_encoded_sequences_variant0 = []  # STORES TUPLES OF (X_SEQ, Y_LIST_OF_FINGERINGS_FOR_EVERY_X_IN_THE_SEQ)

for df in all_sequences:
    X_list = []
    y_list = []
    for row in df.itertuples(index=False):
        spelled_pitch = row.spelled_pitch
        finger_str = str(row.finger_number)

        # encode pitch and finger
        pitch_int = pitch_to_int_mapping.get(spelled_pitch, 0)  # fallback 0 if unseen
        finger_int = finger_to_int_mapping.get(finger_str, 0)   # fallback 0 if unseen
        feature_row = [
            pitch_int,
            float(row.onset_time),
            float(row.offset_time),
            float(row.onset_velocity),
            float(row.offset_velocity),
            float(row.channel)
        ]
        X_list.append(feature_row)
        y_list.append(finger_int)

    raw_encoded_sequences_variant0.append((X_list, y_list))

Total number of pieces found: 51


In [150]:
import os
import pandas as pd

SEQUENCE_LENGTH = 30
HALF_WINDOW = SEQUENCE_LENGTH // 2  # For centering the window (15 before, 15 after)

# Define pad tokens: a pad feature row (6 features) and pad finger value.
PAD_X = [0, 0.0, 0.0, 0.0, 0.0, 0.0]  # Adjust if you wish a different pad
PAD_Y = 0  # Use a value that is not in your finger mapping

directory_path = PATH_TO_DATASET_FOLDER
all_sequences = []   # will hold raw DataFrames for each piece
all_piece_names = []
FINGERING_TYPE_TO_ANALYZE = "YI"
yi_filenames = set(annotator_to_files_dict.get(FINGERING_TYPE_TO_ANALYZE, []))

for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    fingering_label, _ = filename.split('_')  # e.g. "001-01"
    piece_id, fingering_type = fingering_label.split('-')

    if os.path.isfile(file_path) and fingering_label in yi_filenames:
        df = pd.read_table(
            file_path, sep="\t", skiprows=1,
            names=[
                "noteID", "onset_time", "offset_time", "spelled_pitch",
                "onset_velocity", "offset_velocity", "channel",
                "finger_number"
            ]
        )
        all_piece_names.append(filename)
        all_sequences.append(df)

print(f"Total number of pieces found: {len(all_sequences)}")

# Create fixed-length sequences (centered windows) for every note in each piece.
raw_encoded_sequences_variant1_left = []  # Each element is a tuple (X_seq, y_seq) of length 30
raw_encoded_sequences_variant1_right = []  # Each element is a tuple (X_seq, y_seq) of length 30


for df in all_sequences:
    X_list_left = []
    y_list_left = []
    
    X_list_right = []
    y_list_right = []
    # Build feature lists for the piece (noteID is used only for ordering; not added to features)
    for row in df.itertuples(index=False):
        spelled_pitch = row.spelled_pitch
        finger_str = str(row.finger_number)

        # Encode pitch and finger (using your mappings)
        pitch_int = pitch_to_int_mapping.get(spelled_pitch, 0)  # fallback 0 if unseen
        finger_int = finger_to_int_mapping.get(finger_str, 0)    # fallback 0 if unseen

        feature_row = [
            pitch_int,
            float(row.onset_time),
            float(row.offset_time),
            float(row.onset_velocity),
            float(row.offset_velocity),
            float(row.channel)
        ]
        if finger_str[0] == '-':
            X_list_left.append(feature_row)
            y_list_left.append(finger_int)
        else:
            X_list_right.append(feature_row)
            y_list_right.append(finger_int)
    
    n_left = len(X_list_left)
    n_right = len(X_list_right)
    # For every note, create a window of SEQUENCE_LENGTH with the note in the center.
    for i in range(n_left):
        # Determine window boundaries (they might be out-of-bounds)
        start_idx = i - HALF_WINDOW
        end_idx = i + HALF_WINDOW  # This gives SEQUENCE_LENGTH elements overall
        X_seq = []
        y_seq = []
        for j in range(start_idx, end_idx):
            if j < 0 or j >= n_left:
                X_seq.append(PAD_X)
                y_seq.append(PAD_Y)
            else:
                X_seq.append(X_list_left[j])
                y_seq.append(y_list_left[j])
        raw_encoded_sequences_variant1_left.append((X_seq, y_seq))

    for i in range(n_right):
        # Determine window boundaries (they might be out-of-bounds)
        start_idx = i - HALF_WINDOW
        end_idx = i + HALF_WINDOW  # This gives SEQUENCE_LENGTH elements overall
        X_seq = []
        y_seq = []
        for j in range(start_idx, end_idx):
            if j < 0 or j >= n_right:
                X_seq.append(PAD_X)
                y_seq.append(PAD_Y)
            else:
                X_seq.append(X_list_right[j])
                y_seq.append(y_list_right[j])
        raw_encoded_sequences_variant1_right.append((X_seq, y_seq))

print("Variant 1: For a sample piece, X sequence length =", len(raw_encoded_sequences_variant1_left[0][0]))
print("Variant 1: For a sample piece, X sequence length =", len(raw_encoded_sequences_variant1_right[0][0]))

Total number of pieces found: 51
Variant 1: For a sample piece, X sequence length = 30
Variant 1: For a sample piece, X sequence length = 30


In [250]:
import os
import pandas as pd

TIME_WINDOW = 4.0  # seconds
MAX_NOTES = 20     # Maximum notes per sequence

# Define pad tokens: same as before.
PAD_X = [0, 0.0, 0.0, 0.0, 0.0, 0.0]
PAD_Y = 0

directory_path = PATH_TO_DATASET_FOLDER
all_sequences = []   # will hold raw DataFrames for each piece
all_piece_names = []
FINGERING_TYPE_TO_ANALYZE = "YI"
yi_filenames = set(annotator_to_files_dict.get(FINGERING_TYPE_TO_ANALYZE, []))

# Read in all pieces.
for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    fingering_label, _ = filename.split('_')  # e.g. "001-01"
    piece_id, fingering_type = fingering_label.split('-')

    if os.path.isfile(file_path) and fingering_label in yi_filenames:
        df = pd.read_table(
            file_path, sep="\t", skiprows=1,
            names=[
                "noteID", "onset_time", "offset_time", "spelled_pitch",
                "onset_velocity", "offset_velocity", "channel",
                "finger_number"
            ]
        )
        all_piece_names.append(filename)
        all_sequences.append(df)

print(f"Total number of pieces found: {len(all_sequences)}")

raw_encoded_sequences_variant2_left = []  # Each element is a tuple (X_seq, y_seq) of length MAX_NOTES
raw_encoded_sequences_variant2_right = []  # Each element is a tuple (X_seq, y_seq) of length MAX_NOTES


for df in all_sequences:
    X_list_left = []
    X_list_right = []
    y_list_right = []
    y_list_left = []
    onset_times_left = []
    onset_times_right = []
    # Build feature lists for the piece.
    for row in df.itertuples(index=False):
        spelled_pitch = row.spelled_pitch
        finger_str = str(row.finger_number)

        pitch_int = pitch_to_int_mapping.get(spelled_pitch, 0)
        finger_int = finger_to_int_mapping.get(finger_str, 0)
        feature_row = [
            pitch_int,
            float(row.onset_time),
            float(row.offset_time),
            float(row.onset_velocity),
            float(row.offset_velocity),
            float(row.channel)
        ]
        
        if finger_str[0] == '-':
            X_list_left.append(feature_row)
            y_list_left.append(finger_int)
            onset_times_left.append(float(row.onset_time))
        else:
            X_list_right.append(feature_row)
            y_list_right.append(finger_int)
            onset_times_right.append(float(row.onset_time))
        
    
    
    if len(X_list_left) != 0:

            # Create time windows starting at the first note and moving forward by TIME_WINDOW.
        t_start = onset_times_left[0]
        t_end = onset_times_left[-1]
        current_window_start = t_start

        while current_window_start <= t_end:
            # Collect indices of notes whose onset time falls within [current_window_start, current_window_start + TIME_WINDOW)
            indices = [i for i, t in enumerate(onset_times_left) if current_window_start <= t < current_window_start + TIME_WINDOW]
            if len(indices) == 0:
                # No notes in this window; move to the next window.
                current_window_start += TIME_WINDOW
                continue
            
            if len(indices) > MAX_NOTES:
                center = len(indices) // 2
                half_max = MAX_NOTES // 2
                if MAX_NOTES % 2 == 0:
                    indices = indices[center - half_max : center + half_max]
                else:
                    indices = indices[center - half_max : center + half_max + 1]
            
            X_seq = [X_list_left[i] for i in indices]
            y_seq = [y_list_left[i] for i in indices]
            
            pad_len = MAX_NOTES - len(X_seq)
            print(pad_len)
            left_pad = pad_len // 2
            right_pad = pad_len - left_pad  # Ensure total padding adds up correctly
            
            X_seq = [PAD_X] * left_pad + X_seq + [PAD_X] * right_pad
            y_seq = [PAD_Y] * left_pad + y_seq + [PAD_Y] * right_pad
            
            raw_encoded_sequences_variant2_left.append((X_seq, y_seq))
            
            # Advance the window by TIME_WINDOW seconds (non-overlapping segments).
            current_window_start += TIME_WINDOW
            
    if len(X_list_right) != 0:

            # Create time windows starting at the first note and moving forward by TIME_WINDOW.
        t_start = onset_times_right[0]
        t_end = onset_times_right[-1]
        current_window_start = t_start

        while current_window_start <= t_end:
            # Collect indices of notes whose onset time falls within [current_window_start, current_window_start + TIME_WINDOW)
            indices = [i for i, t in enumerate(onset_times_right) if current_window_start <= t < current_window_start + TIME_WINDOW]
            if len(indices) == 0:
                # No notes in this window; move to the next window.
                current_window_start += TIME_WINDOW
                continue
            
            # If more than MAX_NOTES, keep only the middle MAX_NOTES symmetrically
            if len(indices) > MAX_NOTES:
                center = len(indices) // 2
                half_max = MAX_NOTES // 2
                if MAX_NOTES % 2 == 0:
                    indices = indices[center - half_max : center + half_max]
                else:
                    indices = indices[center - half_max : center + half_max + 1]
            
            X_seq = [X_list_right[i] for i in indices]
            y_seq = [y_list_right[i] for i in indices]
            
            # Pad the sequence symmetrically to ensure it has exactly MAX_NOTES tokens.
            pad_len = MAX_NOTES - len(X_seq)
            left_pad = pad_len // 2
            right_pad = pad_len - left_pad  # Ensure total padding adds up correctly
            
            X_seq = [PAD_X] * left_pad + X_seq + [PAD_X] * right_pad
            y_seq = [PAD_Y] * left_pad + y_seq + [PAD_Y] * right_pad
            
            raw_encoded_sequences_variant2_right.append((X_seq, y_seq))
            
            # Advance the window by TIME_WINDOW seconds (non-overlapping segments).
            current_window_start += TIME_WINDOW
    

print("Variant 2: For a sample window, X sequence length =", len(raw_encoded_sequences_variant2_left[0][0]))
print("Variant 2: For a sample window, X sequence length =", len(raw_encoded_sequences_variant2_right[0][0]))

Total number of pieces found: 51
0
1
5
0
0
0
12
1
8
4
7
8
0
0
10
1
1
0
2
1
11
2
11
0
6
1
0
0
2
4
6
9
7
0
0
0
0
0
0
0
0
8
2
4
12
12
7
16
15
11
13
14
6
12
14
12
11
14
15
10
16
0
0
0
0
0
0
0
0
0
0
0
7
5
4
6
9
3
4
0
16
0
11
0
4
0
0
0
0
3
4
5
7
11
8
7
12
12
12
12
4
8
4
10
2
0
0
0
3
2
2
1
0
8
0
0
0
0
0
11
14
13
12
13
8
5
15
16
0
0
0
0
0
0
0
0
4
7
7
0
0
9
13
0
0
0
0
0
0
0
0
0
0
10
10
0
0
6
8
8
4
8
8
8
6
19
9
2
1
10
2
0
1
7
8
0
10
3
0
4
4
4
4
4
18
7
3
0
0
0
0
0
0
0
0
17
12
17
16
18
16
15
15
4
0
0
0
0
0
0
0
11
11
8
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
16
15
10
15
14
12
11
7
3
12
14
8
17
15
14
11
4
12
12
12
12
7
12
10
11
19
0
0
9
1
0
0
0
0
10
16
16
17
11
11
17
7
3
8
4
4
14
3
4
12
12
15
2
2
7
6
4
0
2
0
0
5
11
3
5
5
14
16
15
14
15
14
10
15
19
18
18
16
18
15
17
18
18
16
18
18
18
16
16
18
17
18
14
16
14
13
12
13
10
18
11
11
12
11
11
12
11
11
12
11
14
15
13
13
14
14
13
14
13
14
13
14
14
11
11
9
8
14
7
14
14
8
13
18
15
17
14
15
17
18
0
0
1
0
0
3
0
5
9
0
0
6
10
2
0
0
3
12
7
8
15
18
13


In [251]:
raw_encoded_sequences_left = raw_encoded_sequences_variant2_left
raw_encoded_sequences_right = raw_encoded_sequences_variant2_right

In [252]:
print("x sequence length: ", len(raw_encoded_sequences_left[0][0]))
print("y sequence length (should match): ", len(raw_encoded_sequences_left[0][-1]))

print("x sequence length: ", len(raw_encoded_sequences_right[0][0]))
print("y sequence length (should match): ", len(raw_encoded_sequences_right[0][-1]))

# potentially do scaling/some kind of layernorm???
train_val_seqs_left, test_seqs_left = train_test_split(raw_encoded_sequences_left, test_size=0.2, random_state=42)
train_seqs_left, val_seqs_left = train_test_split(train_val_seqs_left, test_size=0.25, random_state=42)

train_val_seqs_right, test_seqs_right = train_test_split(raw_encoded_sequences_right, test_size=0.2, random_state=42)
train_seqs_right, val_seqs_right = train_test_split(train_val_seqs_right, test_size=0.25, random_state=42)

print(f"Total pieces: {len(raw_encoded_sequences_left)}")
print(f"Train pieces: {len(train_seqs_left)}")
print(f"Val pieces:   {len(val_seqs_left)}")
print(f"Test pieces:  {len(test_seqs_left)}")

x sequence length:  20
y sequence length (should match):  20
x sequence length:  20
y sequence length (should match):  20
Total pieces: 503
Train pieces: 301
Val pieces:   101
Test pieces:  101


In [253]:
# DATASET LOADER CLASS

class FingeringDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        X_list, y_list = self.sequences[idx]
        X_tensor = torch.tensor(X_list, dtype=torch.float32)  # (seq_len, 6)
        y_tensor = torch.tensor(y_list, dtype=torch.long)     # (seq_len,)
        return X_tensor, y_tensor

train_dataset = FingeringDataset(train_seqs_left)
val_dataset   = FingeringDataset(val_seqs_left)
test_dataset  = FingeringDataset(test_seqs_left)

train_loader_left = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader_left   = DataLoader(val_dataset,   batch_size=1, shuffle=False)
test_loader_left  = DataLoader(test_dataset,  batch_size=1, shuffle=False)


train_dataset = FingeringDataset(train_seqs_right)
val_dataset   = FingeringDataset(val_seqs_right)
test_dataset  = FingeringDataset(test_seqs_right)

train_loader_right = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader_right   = DataLoader(val_dataset,   batch_size=1, shuffle=False)
test_loader_right  = DataLoader(test_dataset,  batch_size=1, shuffle=False)

In [155]:
# training loop
def train_sequence_model(model, train_loader, val_loader, num_epochs=5, lr=1e-3):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        for X_seq, y_seq in train_loader:
            # print("X_seq: ", X_seq)
            # print("y_seq: ", y_seq)

            X_seq = X_seq.to(device, non_blocking=True)
            y_seq = y_seq.to(device, non_blocking=True)

            optimizer.zero_grad()
            logits = model(X_seq)    # forward pass

            # reshape for CrossEntropy by flattening tokens => (batch_size * seq_len, num_fingers)
            logits_reshaped = logits.view(-1, logits.size(-1))
            y_seq_reshaped = y_seq.view(-1)

            loss = criterion(logits_reshaped, y_seq_reshaped)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        val_acc = evaluate_sequence_model(model, val_loader)
        print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | ValAcc: {val_acc:.4f}")

# doing inference for validation
def evaluate_sequence_model(model, data_loader):
    device = next(model.parameters()).device
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for X_seq, y_seq in data_loader:
            # X_seq: (batch_size, seq_len, num_features)
            # y_seq: (batch_size, seq_len)

            X_seq = X_seq.to(device, non_blocking=True)
            y_seq = y_seq.to(device, non_blocking=True)

            logits = model(X_seq)  # => (batch_size, seq_len, num_fingers)
            preds = torch.argmax(logits, dim=-1)  # => (batch_size, seq_len)

            all_preds.extend(preds.view(-1).cpu().numpy())
            all_labels.extend(y_seq.view(-1).cpu().numpy())

    return accuracy_score(all_labels, all_preds)

In [156]:
# RNN MODEL
class PianoRNNModel(nn.Module):
    def __init__(
        self,
        num_pitches,     # size of pitch vocab
        num_fingers,     # number of finger classes
        embed_dim=32,    # dimension for pitch embedding
        hidden_dim=64,   # dimension for RNN hidden layer
        numeric_dim=5,   # number of extra numeric features
    ):
        super().__init__()

        # have 2 embedding vectors: one for pitch and the other numeric features. We then concatenate them and that ends up being the hidden state of our RNN.
        self.embedding_pitch = nn.Embedding(num_pitches, embed_dim)        # Embedding for pitch (categorical)
        self.embedding_numeric = nn.Linear(numeric_dim, embed_dim)     # A simple linear projection for numeric features => same embed_dim so we can concatenate them with pitch embeddings

        self.rnn = nn.RNN(embed_dim * 2, hidden_dim, batch_first=True)         # RNN: input_dim = embed_dim(pitch) + embed_dim(numeric) = 2*embed_dim
        self.fc = nn.Linear(hidden_dim, num_fingers)   # Final linear layer to map hidden_dim => number of finger classes

    def forward(self, x):
        pitch_ids = x[..., 0].long()         # shape => (batch_size, seq_len)
        numeric_feats = x[..., 1:].float()   # shape => (batch_size, seq_len, 5)

        # Embed pitch and numeric separately
        pitch_emb = self.embedding_pitch(pitch_ids)  # (batch_size, seq_len, embed_dim)
        numeric_emb = self.embedding_numeric(numeric_feats)  # (batch_size, seq_len, embed_dim)

        # Concatenate along last dimension => (batch_size, seq_len, 2*embed_dim)
        combined_emb = torch.cat((pitch_emb, numeric_emb), dim=2)

        # RNN => (batch_size, seq_len, hidden_dim)
        rnn_out, _ = self.rnn(combined_emb)

        # Final linear => (batch_size, seq_len, num_fingers)
        logits = self.fc(rnn_out)
        return logits

Variant 1 Results

In [157]:
rnn_model = PianoRNNModel(
    num_pitches=len(pitch_to_int_mapping),
    num_fingers=len(finger_to_int_mapping),
    embed_dim=32,
    hidden_dim=64,
    numeric_dim=5
)

train_sequence_model(rnn_model, train_loader_left, val_loader_left, num_epochs=10, lr=1e-3)
test_acc_rnn = evaluate_sequence_model(rnn_model, test_loader_left)
print(f"RNN Test Accuracy Left = {test_acc_rnn:.4f}")

train_sequence_model(rnn_model, train_loader_right, val_loader_right, num_epochs=10, lr=1e-3)
test_acc_rnn = evaluate_sequence_model(rnn_model, test_loader_right)
print(f"RNN Test Accuracy Right = {test_acc_rnn:.4f}")

Epoch 1/10 | Loss: 1.4660 | ValAcc: 0.4189
Epoch 2/10 | Loss: 1.2396 | ValAcc: 0.5635
Epoch 3/10 | Loss: 1.0185 | ValAcc: 0.6413
Epoch 4/10 | Loss: 0.8429 | ValAcc: 0.7100
Epoch 5/10 | Loss: 0.7266 | ValAcc: 0.7451
Epoch 6/10 | Loss: 0.6386 | ValAcc: 0.7764
Epoch 7/10 | Loss: 0.5722 | ValAcc: 0.7925
Epoch 8/10 | Loss: 0.5228 | ValAcc: 0.8006
Epoch 9/10 | Loss: 0.4863 | ValAcc: 0.8245
Epoch 10/10 | Loss: 0.4593 | ValAcc: 0.8328
RNN Test Accuracy Left = 0.8314
Epoch 1/10 | Loss: 1.5657 | ValAcc: 0.4579
Epoch 2/10 | Loss: 1.2069 | ValAcc: 0.5278
Epoch 3/10 | Loss: 1.0922 | ValAcc: 0.5573
Epoch 4/10 | Loss: 1.0234 | ValAcc: 0.5827
Epoch 5/10 | Loss: 0.9766 | ValAcc: 0.6032
Epoch 6/10 | Loss: 0.9397 | ValAcc: 0.6240
Epoch 7/10 | Loss: 0.9074 | ValAcc: 0.6373
Epoch 8/10 | Loss: 0.8809 | ValAcc: 0.6409
Epoch 9/10 | Loss: 0.8614 | ValAcc: 0.6355
Epoch 10/10 | Loss: 0.8472 | ValAcc: 0.6623
RNN Test Accuracy Right = 0.6666


In [158]:
class LSTMModel(nn.Module):
    def __init__(
        self,
        num_pitches,     # size of pitch vocab
        num_fingers,     # number of finger classes
        embed_dim=32,    # dimension for pitch embedding
        hidden_dim=64,   # dimension for LSTM hidden layer
        numeric_dim=5,   # number of extra numeric features
    ):
        super().__init__()
        self.embedding_pitch = nn.Embedding(num_pitches, embed_dim)    # Embedding for pitch (categorical)
        self.embedding_numeric = nn.Linear(numeric_dim, embed_dim)     # Linear projection for numeric features
        self.lstm = nn.LSTM(embed_dim * 2, hidden_dim, batch_first=True)    # LSTM: input_dim = embed_dim(pitch) + embed_dim(numeric) = 2*embed_dim
        self.fc = nn.Linear(hidden_dim, num_fingers)     # Final linear layer to the num_fingers

    def forward(self, x):
        pitch_ids = x[..., 0].long()
        numeric_feats = x[..., 1:].float()

        # Embed pitch and fingering fields
        pitch_emb = self.embedding_pitch(pitch_ids)    # (batch_size, seq_len, embed_dim)
        numeric_emb = self.embedding_numeric(numeric_feats)  # (batch_size, seq_len, embed_dim)

        # Concatenate => (batch_size, seq_len, 2*embed_dim)
        combined_emb = torch.cat((pitch_emb, numeric_emb), dim=2)

        # LSTM => (batch_size, seq_len, hidden_dim)
        lstm_out, _ = self.lstm(combined_emb)

        # Final linear => (batch_size, seq_len, num_fingers)
        logits = self.fc(lstm_out)
        return logits

In [159]:
lstm_model = LSTMModel(
    num_pitches=len(pitch_to_int_mapping),
    num_fingers=len(finger_to_int_mapping),
    embed_dim=32,
    hidden_dim=64,
    numeric_dim=5
)

train_sequence_model(lstm_model, train_loader_left, val_loader_left, num_epochs=10, lr=1e-3)
test_acc_lstm = evaluate_sequence_model(lstm_model, test_loader_left)
print(f"LSTM Test Accuracy Left = {test_acc_lstm:.4f}")

train_sequence_model(lstm_model, train_loader_right, val_loader_right, num_epochs=10, lr=1e-3)
test_acc_lstm = evaluate_sequence_model(lstm_model, test_loader_right)
print(f"LSTM Test Accuracy Right = {test_acc_lstm:.4f}")

Epoch 1/10 | Loss: 1.4377 | ValAcc: 0.5100
Epoch 2/10 | Loss: 1.0984 | ValAcc: 0.6126
Epoch 3/10 | Loss: 0.8674 | ValAcc: 0.6921
Epoch 4/10 | Loss: 0.7146 | ValAcc: 0.7524
Epoch 5/10 | Loss: 0.5803 | ValAcc: 0.8022
Epoch 6/10 | Loss: 0.4717 | ValAcc: 0.8409
Epoch 7/10 | Loss: 0.3758 | ValAcc: 0.8768
Epoch 8/10 | Loss: 0.3049 | ValAcc: 0.9022
Epoch 9/10 | Loss: 0.2499 | ValAcc: 0.9251
Epoch 10/10 | Loss: 0.2094 | ValAcc: 0.9317
RNN Test Accuracy Left = 0.9311
Epoch 1/10 | Loss: 1.5476 | ValAcc: 0.5169
Epoch 2/10 | Loss: 1.0822 | ValAcc: 0.6131
Epoch 3/10 | Loss: 0.9150 | ValAcc: 0.6762
Epoch 4/10 | Loss: 0.7878 | ValAcc: 0.7179
Epoch 5/10 | Loss: 0.6885 | ValAcc: 0.7492
Epoch 6/10 | Loss: 0.6049 | ValAcc: 0.7682
Epoch 7/10 | Loss: 0.5354 | ValAcc: 0.8092
Epoch 8/10 | Loss: 0.4787 | ValAcc: 0.8149
Epoch 9/10 | Loss: 0.4330 | ValAcc: 0.8587
Epoch 10/10 | Loss: 0.3954 | ValAcc: 0.8481
RNN Test Accuracy Right = 0.8463


Variant 2 Results

In [254]:
rnn_model = PianoRNNModel(
    num_pitches=len(pitch_to_int_mapping),
    num_fingers=len(finger_to_int_mapping),
    embed_dim=16,
    hidden_dim=32,
    numeric_dim=5
)

train_sequence_model(rnn_model, train_loader_left, val_loader_left, num_epochs=10, lr=1e-3)
test_acc_rnn = evaluate_sequence_model(rnn_model, test_loader_left)
print(f"RNN Test Accuracy Left = {test_acc_rnn:.4f}")

train_sequence_model(rnn_model, train_loader_right, val_loader_right, num_epochs=10, lr=1e-3)
test_acc_rnn = evaluate_sequence_model(rnn_model, test_loader_right)
print(f"RNN Test Accuracy Right = {test_acc_rnn:.4f}")

Epoch 1/10 | Loss: 1.7271 | ValAcc: 0.5475
Epoch 2/10 | Loss: 0.9863 | ValAcc: 0.5475
Epoch 3/10 | Loss: 0.9654 | ValAcc: 0.5475
Epoch 4/10 | Loss: 0.9581 | ValAcc: 0.5431
Epoch 5/10 | Loss: 0.9546 | ValAcc: 0.5455
Epoch 6/10 | Loss: 0.9520 | ValAcc: 0.5431
Epoch 7/10 | Loss: 0.9500 | ValAcc: 0.5490
Epoch 8/10 | Loss: 0.9483 | ValAcc: 0.5426
Epoch 9/10 | Loss: 0.9491 | ValAcc: 0.5431
Epoch 10/10 | Loss: 0.9474 | ValAcc: 0.5470
RNN Test Accuracy Left = 0.6025
Epoch 1/10 | Loss: 2.2041 | ValAcc: 0.3965
Epoch 2/10 | Loss: 1.2710 | ValAcc: 0.4030
Epoch 3/10 | Loss: 1.2545 | ValAcc: 0.3906
Epoch 4/10 | Loss: 1.2481 | ValAcc: 0.3866
Epoch 5/10 | Loss: 1.2431 | ValAcc: 0.3975
Epoch 6/10 | Loss: 1.2416 | ValAcc: 0.3787
Epoch 7/10 | Loss: 1.2383 | ValAcc: 0.4054
Epoch 8/10 | Loss: 1.2317 | ValAcc: 0.4119
Epoch 9/10 | Loss: 1.2279 | ValAcc: 0.4262
Epoch 10/10 | Loss: 1.2155 | ValAcc: 0.4134
RNN Test Accuracy Right = 0.4470


In [256]:
lstm_model = LSTMModel(
    num_pitches=len(pitch_to_int_mapping),
    num_fingers=len(finger_to_int_mapping),
    embed_dim=16,
    hidden_dim=32,
    numeric_dim=5
)

train_sequence_model(lstm_model, train_loader_left, val_loader_left, num_epochs=10, lr=1e-3)
test_acc_lstm = evaluate_sequence_model(lstm_model, test_loader_left)
print(f"LSTM Test Accuracy Left = {test_acc_lstm:.4f}")

train_sequence_model(lstm_model, train_loader_right, val_loader_right, num_epochs=10, lr=1e-3)
test_acc_lstm = evaluate_sequence_model(lstm_model, test_loader_right)
print(f"LSTM Test Accuracy Right = {test_acc_lstm:.4f}")

Epoch 1/10 | Loss: 2.0340 | ValAcc: 0.5500
Epoch 2/10 | Loss: 0.9986 | ValAcc: 0.5574
Epoch 3/10 | Loss: 0.9627 | ValAcc: 0.5564
Epoch 4/10 | Loss: 0.9513 | ValAcc: 0.5594
Epoch 5/10 | Loss: 0.9450 | ValAcc: 0.5495
Epoch 6/10 | Loss: 0.9405 | ValAcc: 0.5520
Epoch 7/10 | Loss: 0.9378 | ValAcc: 0.5545
Epoch 8/10 | Loss: 0.9300 | ValAcc: 0.5589
Epoch 9/10 | Loss: 0.9235 | ValAcc: 0.5609
Epoch 10/10 | Loss: 0.9172 | ValAcc: 0.5510
LSTM Test Accuracy Left = 0.5950
Epoch 1/10 | Loss: 2.6474 | ValAcc: 0.3941
Epoch 2/10 | Loss: 1.3112 | ValAcc: 0.3812
Epoch 3/10 | Loss: 1.2613 | ValAcc: 0.4089
Epoch 4/10 | Loss: 1.2452 | ValAcc: 0.4005
Epoch 5/10 | Loss: 1.2389 | ValAcc: 0.4277
Epoch 6/10 | Loss: 1.2307 | ValAcc: 0.4248
Epoch 7/10 | Loss: 1.2245 | ValAcc: 0.4129
Epoch 8/10 | Loss: 1.2188 | ValAcc: 0.4297
Epoch 9/10 | Loss: 1.2099 | ValAcc: 0.4134
Epoch 10/10 | Loss: 1.2028 | ValAcc: 0.4465
LSTM Test Accuracy Right = 0.4525


REGULAR VERSION 

In [215]:
import os
import pandas as pd

TIME_WINDOW = 3.0  # seconds
MAX_NOTES = 20     # Maximum notes per sequence

# Define pad tokens: same as before.
PAD_X = [0, 0.0, 0.0, 0.0, 0.0, 0.0]
PAD_Y = 0

directory_path = PATH_TO_DATASET_FOLDER
all_sequences = []   # will hold raw DataFrames for each piece
all_piece_names = []
FINGERING_TYPE_TO_ANALYZE = "YI"
yi_filenames = set(annotator_to_files_dict.get(FINGERING_TYPE_TO_ANALYZE, []))

# Read in all pieces.
for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    fingering_label, _ = filename.split('_')  # e.g. "001-01"
    piece_id, fingering_type = fingering_label.split('-')

    if os.path.isfile(file_path) and fingering_label in yi_filenames:
        df = pd.read_table(
            file_path, sep="\t", skiprows=1,
            names=[
                "noteID", "onset_time", "offset_time", "spelled_pitch",
                "onset_velocity", "offset_velocity", "channel",
                "finger_number"
            ]
        )
        all_piece_names.append(filename)
        all_sequences.append(df)

print(f"Total number of pieces found: {len(all_sequences)}")

raw_encoded_sequences_variant2 = []  # Each element is a tuple (X_seq, y_seq) of length MAX_NOTES

for df in all_sequences:
    X_list = []
    y_list = []
    onset_times = []
    # Build feature lists for the piece.
    for row in df.itertuples(index=False):
        spelled_pitch = row.spelled_pitch
        finger_str = str(row.finger_number)

        pitch_int = pitch_to_int_mapping.get(spelled_pitch, 0)
        finger_int = finger_to_int_mapping.get(finger_str, 0)
        feature_row = [
            pitch_int,
            float(row.onset_time),
            float(row.offset_time),
            float(row.onset_velocity),
            float(row.offset_velocity),
            float(row.channel)
        ]
        X_list.append(feature_row)
        y_list.append(finger_int)
        onset_times.append(float(row.onset_time))
    
    if len(X_list) == 0:
        continue

        # Create time windows starting at the first note and moving forward by TIME_WINDOW.
    t_start = onset_times[0]
    t_end = onset_times[-1]
    current_window_start = t_start

    while current_window_start <= t_end:
        # Collect indices of notes whose onset time falls within [current_window_start, current_window_start + TIME_WINDOW)
        indices = [i for i, t in enumerate(onset_times) if current_window_start <= t < current_window_start + TIME_WINDOW]
        if len(indices) == 0:
            # No notes in this window; move to the next window.
            current_window_start += TIME_WINDOW
            continue
        
        # If more than MAX_NOTES, keep only the middle MAX_NOTES symmetrically
        if len(indices) > MAX_NOTES:
            center = len(indices) // 2
            half_max = MAX_NOTES // 2
            if MAX_NOTES % 2 == 0:
                indices = indices[center - half_max : center + half_max]
            else:
                indices = indices[center - half_max : center + half_max + 1]
        
        X_seq = [X_list[i] for i in indices]
        y_seq = [y_list[i] for i in indices]
        
        # Pad the sequence symmetrically to ensure it has exactly MAX_NOTES tokens.
        pad_len = MAX_NOTES - len(X_seq)
        left_pad = pad_len // 2
        right_pad = pad_len - left_pad  # Ensure total padding adds up correctly
        
        X_seq = [PAD_X] * left_pad + X_seq + [PAD_X] * right_pad
        y_seq = [PAD_Y] * left_pad + y_seq + [PAD_Y] * right_pad
        
        raw_encoded_sequences_variant2.append((X_seq, y_seq))
        
        # Advance the window by TIME_WINDOW seconds (non-overlapping segments).
        current_window_start += TIME_WINDOW

print("Variant 2: For a sample window, X sequence length =", len(raw_encoded_sequences_variant2[0][0]))

Total number of pieces found: 51
Variant 2: For a sample window, X sequence length = 20


In [216]:

SPECIFIC_VARIANT_TO_USE = raw_encoded_sequences_variant2
raw_encoded_sequences = SPECIFIC_VARIANT_TO_USE

In [217]:
print("x sequence length: ", len(raw_encoded_sequences[0][0]))
print("y sequence length (should match): ", len(raw_encoded_sequences[0][-1]))

# potentially do scaling/some kind of layernorm???
train_val_seqs, test_seqs = train_test_split(raw_encoded_sequences, test_size=0.2, random_state=42)
train_seqs, val_seqs = train_test_split(train_val_seqs, test_size=0.25, random_state=42)

print(f"Total pieces: {len(raw_encoded_sequences)}")
print(f"Train pieces: {len(train_seqs)}")
print(f"Val pieces:   {len(val_seqs)}")
print(f"Test pieces:  {len(test_seqs)}")

x sequence length:  20
y sequence length (should match):  20
Total pieces: 673
Train pieces: 403
Val pieces:   135
Test pieces:  135


In [218]:
# DATASET LOADER CLASS

class FingeringDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        X_list, y_list = self.sequences[idx]
        X_tensor = torch.tensor(X_list, dtype=torch.float32)  # (seq_len, 6)
        y_tensor = torch.tensor(y_list, dtype=torch.long)     # (seq_len,)
        return X_tensor, y_tensor

train_dataset = FingeringDataset(train_seqs)
val_dataset   = FingeringDataset(val_seqs)
test_dataset  = FingeringDataset(test_seqs)

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=1, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=1, shuffle=False)

In [219]:
rnn_model = PianoRNNModel(
    num_pitches=len(pitch_to_int_mapping),
    num_fingers=len(finger_to_int_mapping),
    embed_dim=32,
    hidden_dim=64,
    numeric_dim=5
)


train_sequence_model(rnn_model, train_loader, val_loader, num_epochs=30, lr=1e-3)
test_acc_rnn = evaluate_sequence_model(rnn_model, test_loader)
print(f"RNN Test Accuracy = {test_acc_rnn:.4f}")

Epoch 1/30 | Loss: 2.2004 | ValAcc: 0.2919
Epoch 2/30 | Loss: 2.0089 | ValAcc: 0.2978
Epoch 3/30 | Loss: 2.0020 | ValAcc: 0.2989
Epoch 4/30 | Loss: 2.0013 | ValAcc: 0.2989
Epoch 5/30 | Loss: 1.9995 | ValAcc: 0.2937
Epoch 6/30 | Loss: 1.9952 | ValAcc: 0.3044
Epoch 7/30 | Loss: 1.9756 | ValAcc: 0.2963
Epoch 8/30 | Loss: 1.8601 | ValAcc: 0.3807
Epoch 9/30 | Loss: 1.7356 | ValAcc: 0.3978
Epoch 10/30 | Loss: 1.6594 | ValAcc: 0.3830
Epoch 11/30 | Loss: 1.5955 | ValAcc: 0.4111
Epoch 12/30 | Loss: 1.5860 | ValAcc: 0.4004
Epoch 13/30 | Loss: 1.5157 | ValAcc: 0.4096
Epoch 14/30 | Loss: 1.4820 | ValAcc: 0.4167
Epoch 15/30 | Loss: 1.4587 | ValAcc: 0.4252
Epoch 16/30 | Loss: 1.4358 | ValAcc: 0.4363
Epoch 17/30 | Loss: 1.4182 | ValAcc: 0.4363
Epoch 18/30 | Loss: 1.4180 | ValAcc: 0.4263
Epoch 19/30 | Loss: 1.3990 | ValAcc: 0.4356
Epoch 20/30 | Loss: 1.3962 | ValAcc: 0.4356
Epoch 21/30 | Loss: 1.4528 | ValAcc: 0.4274
Epoch 22/30 | Loss: 1.4511 | ValAcc: 0.4389
Epoch 23/30 | Loss: 1.3830 | ValAcc: 0.43

In [220]:
lstm_model = LSTMModel(
    num_pitches=len(pitch_to_int_mapping),
    num_fingers=len(finger_to_int_mapping),
    embed_dim=16,
    hidden_dim=32,
    numeric_dim=5
)

train_sequence_model(lstm_model, train_loader, val_loader, num_epochs=30, lr=1e-3)
test_acc_lstm = evaluate_sequence_model(lstm_model, test_loader)
print(f"LSTM Test Accuracy = {test_acc_lstm:.4f}")


Epoch 1/30 | Loss: 2.5789 | ValAcc: 0.3078
Epoch 2/30 | Loss: 2.0169 | ValAcc: 0.3107
Epoch 3/30 | Loss: 1.9918 | ValAcc: 0.3226
Epoch 4/30 | Loss: 1.9520 | ValAcc: 0.3478
Epoch 5/30 | Loss: 1.8713 | ValAcc: 0.3778
Epoch 6/30 | Loss: 1.6863 | ValAcc: 0.3911
Epoch 7/30 | Loss: 1.5510 | ValAcc: 0.4207
Epoch 8/30 | Loss: 1.4836 | ValAcc: 0.4256
Epoch 9/30 | Loss: 1.4223 | ValAcc: 0.4370
Epoch 10/30 | Loss: 1.4007 | ValAcc: 0.4596
Epoch 11/30 | Loss: 1.3724 | ValAcc: 0.4596
Epoch 12/30 | Loss: 1.3507 | ValAcc: 0.4741
Epoch 13/30 | Loss: 1.3485 | ValAcc: 0.4607
Epoch 14/30 | Loss: 1.3245 | ValAcc: 0.4733
Epoch 15/30 | Loss: 1.3183 | ValAcc: 0.4796
Epoch 16/30 | Loss: 1.2994 | ValAcc: 0.4759
Epoch 17/30 | Loss: 1.2878 | ValAcc: 0.4856
Epoch 18/30 | Loss: 1.2759 | ValAcc: 0.4889
Epoch 19/30 | Loss: 1.2706 | ValAcc: 0.4911
Epoch 20/30 | Loss: 1.2672 | ValAcc: 0.5011
Epoch 21/30 | Loss: 1.2554 | ValAcc: 0.4922
Epoch 22/30 | Loss: 1.2463 | ValAcc: 0.4985
Epoch 23/30 | Loss: 1.2419 | ValAcc: 0.49