In [None]:
import numpy as np
import os.path
import partitura as pt
from basismixer.performance_codec import get_performance_codec
from matplotlib import pyplot as plt
import glob
import re
from scipy import stats
import json

import warnings
warnings.filterwarnings("ignore")

from tqdm import tqdm

In [None]:
xml_fn = glob.glob(os.path.join("asap-dataset", "**", "*.musicxml"), recursive=True)
dump_json = False
perf_parameters = []
piece_dict = {}
match_blacklist = []
all_feature_names = set()

for xml in tqdm(xml_fn):
    
    json_dict = {"matches": [], "targets":[], "matched_basis": [], "xml": xml}
    
    os.makedirs(os.path.join(os.path.dirname(xml), "modified_matches"), exist_ok=True)
    
    if not dump_json:
        piece_dict.update({os.path.dirname(xml): {"matches": [], "targets": [], "basis_idxs": [], "xml": xml}})
    
    match_fn = glob.glob(os.path.join(os.path.dirname(xml), "*.match"))

    score = pt.load_score(xml)
    score = pt.score.merge_parts(score)
    score = pt.score.unfold_part_maximal(score, update_ids=True)
    
    nid_dict = dict((n.id, i) for i, n in enumerate(score.notes_tied))
    
    pt.score.expand_grace_notes(score)
    
    basis, bf_names = pt.musicanalysis.make_note_feats(score, "all")
    all_feature_names.update(bf_names)

    for match in match_fn:

        try:
            performance, alignment = pt.load_match(match)

            parameter_names = ["velocity_trend", "beat_period"]

            pc = get_performance_codec(parameter_names)

            targets, snote_ids, unique_onset_idxs = pc.encode(
                part=score,
                ppart=performance[0],
                alignment=alignment,
                return_u_onset_idx=True
            )
            
            matched_subset_idxs = np.array([nid_dict[nid] for nid in snote_ids])
            basis_matched = basis[matched_subset_idxs]
            
            perf_parameters.append((targets, match))

        except Exception as e:
            print(e)
            print(match)
            match_blacklist.append(match)
            continue

In [None]:
def clip_parameters(array, bp=[0, 2.5], t=[-0.1, 0.1]):
    copy = array.copy()
    
    for perf, name in copy:
        np.clip(perf["beat_period"], a_max=2.5, a_min=0, out=perf["beat_period"])
        np.clip(perf["timing"], a_max=0.01, a_min=-0.01, out=perf["timing"])
        
    return copy

In [None]:
import pandas as pd

indices = [perf[1] for perf in perf_parameters]
expressiveness_params = ["beat_period", "timing", "articulation_log", "velocity_trend", "velocity_dev"]
descriptions = []
dataframes = []

perf_parameters = clip_parameters(perf_parameters, bp=[0, 3], t=[-1, 1])

for perf in perf_parameters:
    param_df = pd.DataFrame(perf[0], columns=expressiveness_params)
    dataframes.append(param_df)
    descriptions.append(param_df.describe())

In [None]:
dataframes[0]

In [None]:
def get_outliers(z_threshold=3):
    
    outlier_dict = {"piece_names": []}
    
    for param_name in expressiveness_params:

        print(f"{param_name} outliers:")

        param_df = pd.DataFrame(columns=["min", "max", "mean"], index=indices)

        param_df["min"] = [desc[param_name].T["min"] for desc in descriptions]
        param_df["max"] = [desc[param_name].T["max"] for desc in descriptions]
        param_df["mean"] = [desc[param_name].T["mean"] for desc in descriptions]
        
        min_mean = np.mean(param_df["min"])
        max_mean = np.mean(param_df["max"])
        mean_mean = np.mean(param_df["mean"])
        
        min_median = np.median(param_df["min"])
        max_median = np.median(param_df["max"])
        mean_median = np.median(param_df["mean"])

        z_min = np.abs(stats.zscore(param_df["min"]))
        z_max = np.abs(stats.zscore(param_df["max"]))
        z_mean = np.abs(stats.zscore(param_df["max"]))
        
        outliers = param_df[(z_min > z_threshold) | (z_max > z_threshold) | (z_mean > z_threshold)]
        
        print(f"""Mean values for comparison:
                    min: {min_mean}
                    max: {max_mean}
                    mean: {mean_mean}\n""")
        
        print(f"""Median values for comparison:
                    min: {min_median}
                    max: {max_median}
                    mean: {mean_median}\n""")
        
        print(f"{outliers.to_markdown()}\n\n")
        outlier_dict[param_name] = outliers
        outlier_dict["piece_names"] += [idx for idx in outliers.index]
        
    return outlier_dict

In [None]:
outlier_dict = get_outliers(8)
outlier_dict["piece_names"] += match_blacklist
outlier_dict["piece_names"] = list(set(outlier_dict["piece_names"]))

In [None]:
outlier_dict["piece_names"]

In [None]:
from matplotlib import pyplot as plt

for param_name in expressiveness_params:
    
    if len(outlier_dict[param_name]) != 0:
    
        fig, ax = plt.subplots(len(outlier_dict[param_name])+1, figsize=(10, 20))
        for i, piece_name in enumerate(outlier_dict[param_name].index):

            idx = indices.index(piece_name)
            df = dataframes[idx]

            ax[i].plot(df[param_name])
            ax[i].set(title=f"{param_name.upper()}: {piece_name}")

        fig.tight_layout()

In [None]:
valid_perf_params = [perf for perf in perf_parameters if perf[1] not in outlier_dict["piece_names"]]

print(len(perf_parameters), len(valid_perf_params))

In [None]:
all_feature_names

In [None]:
basis_list = []
feature_frame = pd.DataFrame(columns=feature_names)
feature_names = None

counter = 0
for perf, perf_name in tqdm(valid_perf_params):
    
    xml = glob.glob(os.path.join(os.path.dirname(perf_name), "*.musicxml"))[0]
    
    score = pt.load_score(xml)
    score = pt.score.merge_parts(score)
    score = pt.score.unfold_part_maximal(score, update_ids=True)
    pt.score.expand_grace_notes(score)

    basis, bf_names = pt.musicanalysis.make_note_feats(score, "all", force_fixed_size=True)
    basis = np.mean(basis, axis=0)
    basis_list.append(basis)
    
    feature_names = bf_names

In [None]:
df = pd.DataFrame(np.vstack(basis_list), columns=bf_names)
df.corr()

In [None]:
corr_matrix = df.corr().fillna(0)
drop_cols = []

for j, col in enumerate(corr_matrix.columns):
    
    if j == 0:
        i = np.argmax(np.abs(corr_matrix[col][1:]))
        corr = corr_matrix[col][i]
    else:
        i = np.argmax(np.abs(corr_matrix[col][:j:]))
        corr = corr_matrix[col][i]
        
    
    if abs(corr) > 0.85 or corr == 0:
        drop_cols.append(col)
    
    print(f"{col} has the highest correlation with {corr_matrix.index[i]}: {corr}\n")

In [None]:
new_df = df.drop(columns=drop_cols)
len(new_df.columns)

In [None]:
import seaborn as sns

sns.heatmap(df.corr())

In [None]:
sns.heatmap(new_df.corr())

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class MyDataset(Dataset):
    
    def __init__(self,
                 match_paths,
                 seq_len, param_names=["beat_period", "timing", "articulation_log", "velocity_trend", "velocity_dev"],
                 feat_names="all",
                 fixed_features=True,
                 drop_features=[]
                ):   
        
        self.data = []
        self.target_data = None
        self.param_names = param_names
        self.parameter_dict = {name: [] for name in param_names}
        
        self.feature_size = 0
        self.seq_len = seq_len
        
        print("Processing score data...")
        for match_file in tqdm(match_paths):
            
            try:
                
                xml = glob.glob(os.path.join(os.path.dirname(match_file), "*.musicxml"))[0]

                score = pt.load_score(xml)
                score = pt.score.merge_parts(score)
                score = pt.score.unfold_part_maximal(score, update_ids=True)

                nid_dict = dict((n.id, i) for i, n in enumerate(score.notes_tied))

                pt.score.expand_grace_notes(score)

                basis, bf_names = pt.musicanalysis.make_note_feats(score, feat_names, force_fixed_size=fixed_features)

                performance, alignment = pt.load_match(match_file)

                parameter_names = param_names

                pc = get_performance_codec(parameter_names)

                targets, snote_ids, unique_onset_idxs = pc.encode(
                    part=score,
                    ppart=performance[0],
                    alignment=alignment,
                    return_u_onset_idx=True
                )

                matched_subset_idxs = np.array([nid_dict[nid] for nid in snote_ids])
                basis_matched = basis[matched_subset_idxs]
                
                basis_matched = pd.DataFrame(basis_matched, columns=bf_names)
                basis_matched = basis_matched.drop(columns=drop_features).to_numpy()
                
                # clipping outliers
                
                np.clip(targets["beat_period"], a_min=0, a_max=3, out=targets["beat_period"])
                np.clip(targets["timing"], a_min=-1, a_max=1, out=targets["timing"])
                
            except Exception as e:
                print(match_file)
                print(e)
                continue
            
            padding_len = len(targets) % seq_len
            
            for name in param_names:
                new_targets = targets[name]
                t_padding_array = np.zeros(shape=seq_len - padding_len)
                new_targets = np.concatenate((new_targets, t_padding_array))
                new_targets = np.split(new_targets, len(new_targets) / seq_len)
                
                self.parameter_dict[name] += [target for target in new_targets]
            
            bm_padding_array = np.zeros(shape=(seq_len - padding_len, basis_matched.shape[1]))
            new_basis = np.vstack((basis_matched, bm_padding_array))
            new_basis = np.split(new_basis, len(new_basis) / seq_len)
            
            self.data += [basis for basis in new_basis]
            
        max_features = max([basis.shape[1] for basis in self.data])
        self.feature_size = max_features
        
        for i, basis in enumerate(self.data):
            if basis.shape[1] < max_features:
                
                difference = max_features - basis.shape[1]
                
                self.data[i] = np.hstack((basis, np.zeros(shape=(seq_len, difference))))
            
    def choose_parameter(self, parameter_name):
        self.target_data = self.parameter_dict[parameter_name]
        
    def __getitem__(self, idx):
        
        x = self.data[idx]
        y = self.target_data[idx]
        
        return x, y
    
    def __len__(self):
        return len(self.data)

In [None]:
matches = glob.glob(os.path.join("asap-dataset\Bach", "**", "*.match"), recursive=True)

custom_dataset = MyDataset(matches, seq_len=50, drop_features=drop_cols)

In [None]:
torch.save(custom_dataset, "my_data.pt")

In [None]:
import torch
import torch.nn as nn

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device).to(x.dtype)
        # Initialize cell state
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device).to(x.dtype)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Decode the hidden state of the last time step
        out = self.fc(self.relu(out[:, -1, :]))
        return out

In [None]:
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(GRUModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device).to(x.dtype)
        
        # Forward propagate LSTM
        out, _ = self.gru(x, h0)  # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Decode the hidden state of the last time step
        out = self.fc(self.relu(out[:, -1, :]))
        return out

In [None]:
class LSTMSeq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMSeq2Seq, self).__init__()
        
        output_size = 1

        self.hidden_size = hidden_size
        self.encoder = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.output_layer = nn.Linear(hidden_size, output_size)

    def forward(self, src):
        batch_size = src.size(0)

        # Encoder
        encoder_output, (hidden, cell) = self.encoder(src)

        # Decoder initialization with encoder's last hidden and cell state
        decoder_input = torch.zeros(batch_size, 1, self.hidden_size)

        outputs = []
        for i in range(encoder_output.size(1)):
            decoder_output, (hidden, cell) = self.decoder(decoder_input, (hidden, cell))
            output = self.output_layer(decoder_output)
            outputs.append(output)
            decoder_input = decoder_output

        return torch.cat(outputs, dim=1)

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, hidden_size):
        super(SelfAttention, self).__init__()
        self.hidden_size = hidden_size
        self.query = nn.Linear(hidden_size, hidden_size)
        self.key = nn.Linear(hidden_size, hidden_size)
        self.value = nn.Linear(hidden_size, hidden_size)
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self, inputs):
        q = self.query(inputs)
        k = self.key(inputs)
        v = self.value(inputs)
        
        attention_scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.hidden_size, dtype=torch.float32))
        attention_probs = self.softmax(attention_scores)
        attention_output = torch.matmul(attention_probs, v)
        
        return attention_output

class AttentiveLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(AttentiveLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.self_attention = SelfAttention(hidden_size)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, hidden_size)
        )
        
    def forward(self, x):
        # LSTM layer
        lstm_out, _ = self.lstm(x)
        
        # Self-Attention layer
        self_attended = self.self_attention(lstm_out[:, -1, :])
        
        # Apply fully connected layer
        output = self.fc(self_attended)
        
        return output

In [None]:
import torch.optim as optim
from torch.utils.data import Subset

NUM_EPOCHS = 50
NUM_LAYERS = 1
HIDDEN_SIZE = 264

models = []
param_names = ["beat_period", "timing", "articulation_log", "velocity_trend", "velocity_dev"]
fig, ax = plt.subplots(5, figsize=(10, 15))

# train model for each parameter
for i, name in enumerate(param_names):
    
    print(f"Training model for {name}\n")
    
    custom_dataset.choose_parameter(name)

    rng = np.random.default_rng()
    all_indices = list(range(len(custom_dataset)))
    rng.shuffle(all_indices)

    test_indices = all_indices[:int(len(all_indices) * 0.2)]
    val_indices = all_indices[int(len(all_indices) * 0.2):int(len(all_indices) * 0.3)]
    train_indices = all_indices[int(len(all_indices) * 0.3):]
    
    # initialize model and dataloaders
    my_model = LSTMSeq2Seq(input_size=custom_dataset.feature_size, hidden_size=HIDDEN_SIZE, output_size=custom_dataset.seq_len)

    train_set = Subset(custom_dataset, indices=train_indices)
    test_set = Subset(custom_dataset, indices=test_indices)
    val_set = Subset(custom_dataset, indices=val_indices)

    train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=64, shuffle=False)
    val_loader = DataLoader(val_set, batch_size=64, shuffle=False)

    # Define loss function and optimizer
    criterion = nn.MSELoss()
    learning_rate = 1e-4
    optimizer = optim.Adam(my_model.parameters(), lr=learning_rate)

    # Training loop
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    my_model.to(device)

    train_losses = []
    val_losses = [np.inf]
    best_model = None
    best_val_loss = np.inf

    # Train/Validation loop
    for epoch in range(NUM_EPOCHS):

        # Training
        total_train_loss = 0
        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            inputs, targets = inputs.float(), targets.float()
            
            # Forward pass
            outputs = my_model(inputs)
            loss = criterion(outputs.squeeze(), targets.squeeze())  # Assuming the output and target shapes are compatible

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()

        # Validating
        total_val_loss = 0
        for batch_idx, (inputs, targets) in enumerate(val_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            inputs, targets = inputs.float(), targets.float()

            # Forward pass
            outputs = my_model(inputs)
            loss = criterion(outputs.squeeze(), targets.squeeze())

            total_val_loss += loss.item()

        if total_val_loss / len(val_loader) < best_val_loss:
            best_model = my_model.state_dict()

        if len(val_losses) > 10 and min(val_losses) not in val_losses[-10:]:
            print("Validation loss is increasing, stopping early...")
            break

        print(f"Epoch [{epoch + 1}/{NUM_EPOCHS}], Average train Loss: {total_train_loss / len(train_loader):.4f}, Average validation Loss: {total_val_loss / len(val_loader):.4f}")
        train_losses.append(total_train_loss / len(train_loader))
        val_losses.append(total_val_loss / len(val_loader))

    print("Training finished\n")
    torch.save(best_model, f"best_model_{name}.pt")

    # Testing
    model = LSTMSeq2Seq(input_size=custom_dataset.feature_size, hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS, output_size=custom_dataset.seq_len)
    model.load_state_dict(best_model)
    
    total_test_loss = 0
    for batch_idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        inputs, targets = inputs.float(), targets.float()

        # Forward pass
        outputs = my_model(inputs)
        loss = criterion(outputs.squeeze(), targets.squeeze())

        total_test_loss += loss.item()

    print(f"Average loss on test set: {total_test_loss / len(test_loader)}\n\n")
    models.append(model)
    
    ax[i].plot(train_losses)
    ax[i].plot(val_losses)
    
fig.tight_layout()

In [None]:
new_dataset = Subset(custom_dataset, list(range(20)))
new_dataloader = DataLoader(new_dataset, shuffle=False, batch_size=1)

param_names = ["beat_period", "timing", "articulation_log", "velocity_trend", "velocity_dev"]
preds = {name: [] for name in param_names}
ground_truths = {name: [] for name in param_names}

for inputs, targets in new_dataloader:
    inputs, targets = inputs.to(device), targets.to(device)
    inputs, targets = inputs.float(), targets.float()
    
    for model, name in zip(models, param_names):
        model.to(device)
        outputs = model(inputs)
        
        preds[name] += outputs.tolist()[0]

zipped_list = [(a, b, c, d, e) for a, b, c, d, e in zip(preds["beat_period"], preds["timing"], preds["articulation_log"], preds["velocity_trend"], preds["velocity_dev"])]

In [None]:
test_perf = np.array(zipped_list, dtype=perf_parameters[0][0].dtype)
test_perf["beat_period"] = np.abs(test_perf["beat_period"])
test_perf["velocity_trend"] = np.abs(test_perf["velocity_trend"])
test_perf["velocity_dev"] = np.abs(test_perf["velocity_dev"])

In [None]:
xml = glob.glob(os.path.join("asap-dataset", "Bach", "Fugue", "bwv_846", "*.musicxml"))[0]

score = pt.load_score(xml)
score = pt.score.merge_parts(score)
score = pt.score.unfold_part_maximal(score, update_ids=True)
pt.score.expand_grace_notes(score)

pc = get_performance_codec(param_names)

result = pc.decode(score, test_perf)

In [None]:
test_perf

In [None]:
pt.save_performance_midi(result, "test_midi.mid")

In [None]:
def plot_rendering(name):
    custom_dataset.choose_parameter(name)

    new_dataset = Subset(custom_dataset, list(range(20)))
    new_dataloader = DataLoader(new_dataset, shuffle=False, batch_size=1)

    ground_truths = []

    for inputs, targets in new_dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        inputs, targets = inputs.float(), targets.float()

        ground_truths += targets.tolist()[0]

    plt.plot(ground_truths)
    plt.plot(test_perf[name])

In [None]:
plot_rendering("beat_period")

In [None]:
plot_rendering("timing")

In [None]:
plot_rendering("articulation_log")

In [None]:
plot_rendering("velocity_trend")

In [None]:
plot_rendering("velocity_dev")