# Data preparation
This notebook is used to analyze the used dataset and transform it to a format usable in the models.

The Beethoven Piano Sonata with Function Harmony (BPS-FH) dataset will be used. It is available here: https://github.com/Tsung-Ping/functional-harmony/tree/master.

## Imports

In [12]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.nn import Transformer

import os
import math

## Data analysis and exploration

In [13]:
df_notes = pd.read_csv('data/functional-harmony/BPS_FH_Dataset/1/notes.csv', header=None)
df_notes

Unnamed: 0,0,1,2,3,4,5
0,-1.0,60,60,1.0,0,0
1,0.0,65,63,1.0,0,1
2,1.0,68,65,1.0,0,1
3,2.0,72,67,1.0,0,1
4,3.0,77,70,1.0,0,1
...,...,...,...,...,...,...
2184,796.0,53,56,1.0,1,152
2185,796.0,65,63,1.0,0,152
2186,796.0,68,65,1.0,0,152
2187,796.0,72,67,1.0,0,152


In [14]:
df_chords = pd.read_excel('data/functional-harmony/BPS_FH_Dataset/1/chords.xlsx', header=None)
df_chords

Unnamed: 0,0,1,2,3,4,5,6
0,-1,8,f,1,m,0,i
1,8,16,f,5,D7,1,V65
2,16,20,f,1,m,0,i
3,20,24,f,5,D7,2,V43
4,24,26,f,1,m,1,i6
...,...,...,...,...,...,...,...
244,790,791,f,1,m,0,i
245,791,792,f,6,M,0,VI
246,792,794,f,2,h7,1,ii=65
247,794,796,f,5,D7,0,V7


The dataset's columns should be given descriptive names. The information about the columns is written on the dataset's Github README.md.

In [15]:
df_notes = df_notes.rename(columns={
    0: 'onset_time',
    1: 'midi_note',
    2: 'morphetic_pitch_number',
    3: 'duration',
    4: 'staff_number',
    5: 'measure'
})
df_notes

Unnamed: 0,onset_time,midi_note,morphetic_pitch_number,duration,staff_number,measure
0,-1.0,60,60,1.0,0,0
1,0.0,65,63,1.0,0,1
2,1.0,68,65,1.0,0,1
3,2.0,72,67,1.0,0,1
4,3.0,77,70,1.0,0,1
...,...,...,...,...,...,...
2184,796.0,53,56,1.0,1,152
2185,796.0,65,63,1.0,0,152
2186,796.0,68,65,1.0,0,152
2187,796.0,72,67,1.0,0,152


In [16]:
df_chords = df_chords.rename(columns={
    0: 'onset_time',
    1: 'offset_time',
    2: 'key',
    3: 'degree',
    4: 'quality',
    5: 'inversion',
    6: 'roman_numeral_notation'
})
df_chords

Unnamed: 0,onset_time,offset_time,key,degree,quality,inversion,roman_numeral_notation
0,-1,8,f,1,m,0,i
1,8,16,f,5,D7,1,V65
2,16,20,f,1,m,0,i
3,20,24,f,5,D7,2,V43
4,24,26,f,1,m,1,i6
...,...,...,...,...,...,...,...
244,790,791,f,1,m,0,i
245,791,792,f,6,M,0,VI
246,792,794,f,2,h7,1,ii=65
247,794,796,f,5,D7,0,V7


For this task, the model's input needs to be a monophonic melody (i.e. only one note played at any given moment). This means that we need to handle moments where multiple notes are played at the same time (chords). This will be done by keeping the note with the highest pitch. 

In [17]:
# Sort the notes in the dataset first by their onset time ascending, and then by the note pitch descending.
df_notes = df_notes.sort_values(by=['onset_time', 'midi_note'], ascending=[True, False])

monophonic_melody = []
current_onset = None

for _, row in df_notes.iterrows():
    onset_time = row['onset_time']
    midi_note = row['midi_note']
    
    # Since the dataset is sorted, if there are multiple notes played at the same time, we only keep the first one (the one with the highest pitch).
    if onset_time != current_onset:
        monophonic_melody.append(row)
        current_onset = onset_time

df_notes_clean = pd.DataFrame(monophonic_melody).reset_index(drop=True)
df_notes_clean

Unnamed: 0,onset_time,midi_note,morphetic_pitch_number,duration,staff_number,measure
0,-1.0,60.0,60.0,1.0,0.0,0.0
1,0.0,65.0,63.0,1.0,0.0,1.0
2,1.0,68.0,65.0,1.0,0.0,1.0
3,2.0,72.0,67.0,1.0,0.0,1.0
4,3.0,77.0,70.0,1.0,0.0,1.0
...,...,...,...,...,...,...
1278,790.0,72.0,67.0,1.0,0.0,150.0
1279,791.0,77.0,70.0,1.0,0.0,150.0
1280,792.0,77.0,70.0,1.0,0.0,151.0
1281,794.0,76.0,69.0,1.0,0.0,151.0


## Data processing

Now we will do this processing for all dataframes in the dataset and save the results in CSVs separated into directories based on the sonata number.

In [18]:
def process_notes_df(df_notes):
    df_notes = df_notes.rename(columns={
        0: 'onset_time',
        1: 'midi_note',
        2: 'morphetic_pitch_number',
        3: 'duration',
        4: 'staff_number',
        5: 'measure'
    })

    df_notes = df_notes.sort_values(by=['onset_time', 'midi_note'], ascending=[True, False])

    monophonic_melody = []
    current_onset = None

    for _, row in df_notes.iterrows():
        onset_time = row['onset_time']
        midi_note = row['midi_note']
        
        # Since the dataset is sorted, if there are multiple notes played at the same time, we only keep the first one (the one with the highest pitch).
        if onset_time != current_onset:
            monophonic_melody.append(row)
            current_onset = onset_time

    df_notes_clean = pd.DataFrame(monophonic_melody).reset_index(drop=True)
    return df_notes_clean

In [40]:
chord_quality_to_int = {
    'M': 0,
    'm': 1,
    'M7': 2,
    'm7': 3,
    'D7': 4,
    'a': 5,
    'a6': 6,
    'd': 7,
    'd7': 8,
    'h7': 9
}

scale_degree_to_int = {
    '1': 1,
    '+1': 2,
    '-2': 2,
    '2': 3,
    '+2': 4,
    '-3': 4,
    '3': 5,
    '4': 6,
    '+4': 7,
    '-5': 7,
    '5': 8,
    '+5': 9,
    '-6': 9,
    '6': 10,
    '+6': 11,
    '-7': 11,
    '7': 12
}
def process_chords_df(df_chords):
    df_chords_renamed = df_chords.rename(columns={
        0: 'onset_time',
        1: 'offset_time',
        2: 'key',
        3: 'degree',
        4: 'quality',
        5: 'inversion',
        6: 'roman_numeral_notation'
    })

    expanded_rows = []
    for i, row in df_chords_renamed.iterrows():
        if i == len(df_chords_renamed) - 1:
            break
        
        onset = int(row["onset_time"])
        offset = int(df_chords_renamed.iloc[i+1]["onset_time"])
        
        # Repeat the chord for every integer timestamp in the range [onset, offset)
        for t in range(onset, offset):
            # Handle secondary chords
            if type(row["degree"]) == str and '/' in row["degree"]:
                d1, d2 = row["degree"].split('/')[0], row["degree"].split('/')[1]
                degree = (int(d1) + int(d2)) % 7 
            else:
                degree = row["degree"]

            # Add the new row
            expanded_rows.append({
                "time": t,
                "key": row["key"],
                "degree": scale_degree_to_int[str(degree)],
                "quality": chord_quality_to_int[row["quality"]],
                "inversion": row["inversion"],
                "roman_numeral_notation": row["roman_numeral_notation"]
            })
    
    expanded_df = pd.DataFrame(expanded_rows).reset_index(drop=True)
    return expanded_df

In [41]:
for dir in os.listdir('data/functional-harmony/BPS_FH_Dataset'):
    if not os.path.isdir(os.path.join('data/functional-harmony/BPS_FH_Dataset', dir)):
        continue
    
    newpath = 'data/processed/' + dir
    if not os.path.exists(newpath):
        os.makedirs(newpath)
    
    df_notes = pd.read_csv(os.path.join('data/functional-harmony/BPS_FH_Dataset', dir, 'notes.csv'), header=None)
    df_notes_clean = process_notes_df(df_notes)
    df_notes_clean.to_csv(os.path.join('data/processed/', dir, 'notes.csv'), index=False)

    df_chords = pd.read_excel(os.path.join('data/functional-harmony/BPS_FH_Dataset', dir, 'chords.xlsx'), header=None)
    df_chords_clean = process_chords_df(df_chords)
    df_chords_clean.to_csv(os.path.join('data/processed/', dir, 'chords.csv'), index=False)


## Create transformer model

Transformers require positional encoding as an input to the encoder and decoder. For this, sinusoidal positional encoding will be used.

In [42]:
# This StackOverflow answer was used as reference: https://stackoverflow.com/a/77445896/21102779
class SinusoidalPositionalEncoding(nn.Module):
    def __init__(self, d_model):
        super(SinusoidalPositionalEncoding, self).__init__()
        self.d_model = d_model

    def forward(self, onset_times):
        seq_len = onset_times.size(0)
        
        pe = torch.zeros(seq_len, self.d_model)

        position = onset_times.unsqueeze(1)

        div_term = torch.exp( torch.arange(0, self.d_model, 2, dtype=torch.float32) *  (-math.log(10000.0) / self.d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.sin(position * div_term)

        return pe

In [43]:
class HarmonizerTransformer(nn.Module):
    def __init__(self, 
                 midi_vocab_size, 
                 chord_vocab_sizes, 
                 d_model=256, 
                 dropout=0.1,
                 nhead=8, 
                 num_encoder_layers=6, 
                 num_decoder_layers=6):
        super(HarmonizerTransformer, self).__init__()

        self.d_model = d_model

        # Define the embedding for midi notes
        self.midi_embedding = nn.Embedding(midi_vocab_size, d_model)

        # Define separate embeddings for each chord property
        self.key_embedding = nn.Embedding(chord_vocab_sizes['key'], d_model)
        self.degree_embedding = nn.Embedding(chord_vocab_sizes['degree'], d_model)
        self.quality_embedding = nn.Embedding(chord_vocab_sizes['quality'], d_model)
        self.inversion_embedding = nn.Embedding(chord_vocab_sizes['inversion'], d_model)

        # Define the positional encoding
        self.positional_encoding = SinusoidalPositionalEncoding(d_model)

        # Define the transformer architecture
        self.transformer = Transformer(
            d_model = d_model,
            nhead = nhead,
            num_encoder_layers = num_encoder_layers,
            num_decoder_layers = num_decoder_layers,
            dropout=dropout
        )

        # Define the output heads. The transformer will output four values:
        # - chord key
        # - chord degree
        # - chord quality
        # - chord inversion
        self.key_head = nn.Linear(d_model, chord_vocab_sizes['key'])
        self.degree_head = nn.Linear(d_model, chord_vocab_sizes['degree'])
        self.quality_head = nn.Linear(d_model, chord_vocab_sizes['quality'])
        self.inversion_head = nn.Linear(d_model, chord_vocab_sizes['inversion'])

    def forward(self, src_notes, src_onset_times, tgt_chords, tgt_onset_times):
        # Embed the midi notes
        src_notes_embeddings = self.midi_embedding(src_notes)
        src_positional_encodings = self.positional_encoding(src_onset_times)
        src = src_notes_embeddings + src_positional_encodings

        # Embed the target chords (each separately)
        key_emb = self.key_embedding(tgt_chords[:, :, 0])
        degree_emb = self.degree_embedding(tgt_chords[:, :, 1])
        quality_emb = self.quality_embedding(tgt_chords[:, :, 2])
        inversion_emb = self.inversion_embedding(tgt_chords[:, :, 3])

        # Combine the chord embeddings
        tgt_emb = key_emb + degree_emb + quality_emb + inversion_emb
        tgt_onset_emb = self.positional_encoding(tgt_onset_times.unsqueeze(-1))
        tgt = tgt_emb + tgt_onset_emb

        # Permute the values to have the shape (seq_len, batch_size, d_model)
        src = src.permute(1, 0, 2)
        tgt = tgt.permute(1, 0, 2)

        # Pass the midi embeddings through the encoder to get the memory
        memory = self.transformer.encoder(src)

        # Pass the target embeddings through the decoder to get the transformer's output
        output = self.transformer.decoder(tgt, memory)

        # Get the output for each head
        key = self.key_head(output)
        degree = self.degree_head(output)
        quality = self.quality_head(output)
        inversion = self.inversion_head(output)

        return key, degree, quality, inversion

In [44]:
midi_vocab_size = 128
chord_vocab_sizes = {
    'key': 12 * 2,    # 12 possible keys * 2 (major/minor)
    'degree': 7 * 3 + (7*3)**2,  # 7 possible degrees * 3 (natural, sharp, flat)
    'quality': 10,     # 10 possible chord qualities
    'inversion': 4    # 4 possible chord inversions (Root, 1st, 2nd, 3rd)
}

In [48]:
model = HarmonizerTransformer(midi_vocab_size, chord_vocab_sizes)

# Load the processed data
src_notes = torch.tensor(df_notes_clean["midi_note"].tolist(), dtype=torch.long)
src_onset_times = torch.tensor(df_notes_clean["onset_time"].tolist(), dtype=torch.long)

#tgt_chords = torch.tensor(df_chords_clean[["key", "degree", "quality", "inversion"]].values)
#tgt_onset_times = torch.tensor(df_chords_clean["time"].values, dtype=torch.long)

torch.tensor(df_chords_clean[["key", "degree", "quality", "inversion"]].values)

# Forward pass
# key, degree, quality, inversion = model(src_notes, src_onset_times, tgt_chords, tgt_onset_times)



TypeError: can't convert np.ndarray of type numpy.object_. The only supported types are: float64, float32, float16, complex64, complex128, int64, int32, int16, int8, uint64, uint32, uint16, uint8, and bool.