In [10]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import sys
sys.path.append("..") # Add parent directory to sys.path to access preprocessing module

from preprocessing.feature_extractor import FeatureExtractor

In [11]:
data_dir = "../../data/Composer_Dataset"
composers = ["Bach", "Beethoven", "Chopin", "Mozart"]
scalar_features, multidimensional_features = FeatureExtractor.extract_features_for_directory(data_dir, composers)

Loading existing features from ../../data/Composer_Dataset/extracted_features.pkl


In [12]:
scalar_features.head()

Unnamed: 0,max_independent_voices,avg_independent_voices,var_independent_voices,avg_simultaneity,var_simultaneity,note_density,avg_note_duration,var_note_duration,initial_tempo,time_signature_numerator,...,perfect_vertical_intervals,vertical_minor_seconds,vertical_thirds,vertical_fifths,vertical_tritones,vertical_octaves,avg_chord_duration,length,file_name,composer
0,4.0,3.899642,0.300477,3.72,1.020588,6.439026,0.621212,0.296121,143.000038,4.0,...,0.391195,0.0,0.327044,0.2,0.045283,0.122013,0.465,29.999992,042100b_.mid,Bach
1,4.0,3.986667,0.114698,3.6,1.2,7.394595,0.526316,0.250579,189.176471,4.0,...,0.400447,0.00522,0.307979,0.168531,0.042506,0.123788,0.321429,25.0,043100b_.mid,Bach
2,7.0,2.664012,1.617631,2.331998,1.816512,6.292553,0.371665,0.686803,181.905446,4.0,...,0.378734,0.006634,0.29159,0.120602,0.057594,0.158185,0.177695,923.701235,Bwv0564-Toccata-Adagio-and-Fugue.mid,Bach
3,4.0,3.970874,0.16816,3.662222,1.112213,8.775758,0.455801,0.212811,177.509434,4.0,...,0.375205,0.009852,0.312808,0.130542,0.030378,0.115764,0.321875,22.5,027400b_.mid,Bach
4,4.0,3.917323,0.275394,3.455782,1.371385,5.195127,0.713614,0.442007,153.000153,4.0,...,0.478439,0.002738,0.309377,0.206023,0.016427,0.140999,0.47037,29.333304,026400b_.mid,Bach


In [13]:
print(f"Piano Roll Shape: {multidimensional_features[0]['piano_roll'].shape}")
print(f"Chroma Piano Roll Shape: {multidimensional_features[0]['chroma_piano_roll'].shape}")
print(f"Pitch Class Histogram Shape: {multidimensional_features[0]['pitch_class_histogram'].shape}")
print(f"Pitch Class Transition Matrix Shape: {multidimensional_features[0]['pitch_class_transition_matrix'].shape}")

Piano Roll Shape: (128, 299)
Chroma Piano Roll Shape: (12, 299)
Pitch Class Histogram Shape: (12,)
Pitch Class Transition Matrix Shape: (12, 12)


## CNN Example

### Train Val Test Split

In [14]:
train_split = 0.8  # ratio of all data to use for training
val_test_split = 0.5  # ratio of holdout data to use for test set

x = multidimensional_features
y = scalar_features['composer']

label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

x_train, x_holdout, y_train, y_holdout = train_test_split(x, y, test_size=(1 - train_split), random_state=1)
x_val, x_test, y_val, y_test = train_test_split(x_holdout, y_holdout, test_size=val_test_split, random_state=1)

In [15]:
torch.manual_seed(42)

<torch._C.Generator at 0x149c0fd10>

### Chunk Into Sequences

In [16]:
def chunk_sequences(X_in, y_in, feature, sequence_length=100):
    X_out = []
    y_out = []

    # make sequences split along the time axis
    for i in range(len(X_in)):
        for j in range(0, len(X_in[i][feature][1]) - sequence_length, sequence_length):
            X_out.append(X_in[i][feature][:, j:j + sequence_length])
            y_out.append(y_in[i])

    return X_out, y_out

### Setup

### Make Dataloader

In [17]:
class PianoRollDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        X = torch.tensor(self.X[idx], dtype=torch.float32) 
        y = torch.tensor(self.y[idx], dtype=torch.long)
        return X, y

In [18]:
def get_dataloaders(use_chroma, seq_length=100, batch_size=8):
    
    feature_set = 'chroma_piano_roll' if use_chroma else 'piano_roll'

    X_train_seq, y_train_seq = chunk_sequences(x_train, y_train, feature_set, seq_length)
    X_val_seq, y_val_seq = chunk_sequences(x_val, y_val, feature_set, seq_length)
    X_test_seq, y_test_seq = chunk_sequences(x_test, y_test, feature_set, seq_length)

    train_dataset = PianoRollDataset(X_train_seq, y_train_seq)
    val_dataset = PianoRollDataset(X_val_seq, y_val_seq)
    test_dataset = PianoRollDataset(X_test_seq, y_test_seq)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

### Define CNN

In [19]:
class PianoRollCNN(nn.Module):
    def __init__(self, h_params):
        super(PianoRollCNN, self).__init__()
        # Conv1d layers: input channels = 12, height = 1, width = 50

        input_channels = 12 if h_params['use_chroma'] else 128

        self.conv1 = nn.Conv1d(input_channels, h_params['conv1_out_channels'], 
                               kernel_size=h_params['kernel_size'], stride=h_params['stride'], 
                               padding=h_params['kernel_size'] // 2)
        self.conv2 = nn.Conv1d(h_params['conv1_out_channels'], h_params['conv2_out_channels'], 
                               kernel_size=h_params['kernel_size'], stride=h_params['stride'],
                               padding=h_params['kernel_size'] // 2)
        self.pool = nn.MaxPool1d(kernel_size=h_params['pool_kernel_size'], stride=h_params['pool_stride'], padding=0)

        def conv_output_size(input_size, kernel_size, stride, padding):
            return (input_size - kernel_size + 2 * padding) // stride + 1
    
        padding = h_params['kernel_size'] // 2  # Assuming same padding

        # First conv layer output size
        conv1_output_width = conv_output_size(int(h_params['seq_length']), h_params['kernel_size'], h_params['stride'], padding)
        # First pooling layer output size
        pooled_width = conv_output_size(conv1_output_width, h_params['pool_kernel_size'], h_params['pool_stride'], 0)
        # Second conv layer output size
        conv2_output_width = conv_output_size(pooled_width, h_params['kernel_size'], h_params['stride'], padding)
        # Second pooling layer output size
        pooled_width = conv_output_size(conv2_output_width, h_params['pool_kernel_size'], h_params['pool_stride'], 0)

        self.fc1 = nn.Linear(h_params['conv2_out_channels'] * pooled_width, h_params['fc1_out'])
        self.fc2 = nn.Linear(h_params['fc1_out'], 4)  # Output of 4 classes (composers)

    def forward(self, x):
        x = x.squeeze(2) # This can potentially be removed if the input is reshaped correctly in the dataloader
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(x.size(0), -1) 
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [33]:
def train_cnn(model, train_loader, val_loader, num_epochs, criterion, optimizer):

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        report_interval = 1000
        for i, data in enumerate(train_loader):
            inputs, labels = data
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            if i % report_interval == (report_interval - 1):
                print(f'[Epoch {epoch + 1}, Batch {i + 1}] loss: {running_loss / report_interval:.3f}')
                running_loss = 0.0  # Reset running loss

        # Validation
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for data in val_loader:
                inputs, labels = data
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        print(f'Validation loss: {val_loss / len(val_loader):.3f}, Accuracy: {100 * correct / total:.2f}%')


### Evaluate Performance

In [21]:
def evaluate_performance(model, data_loader):
    model.eval()

    all_labels = []
    all_predictions = []

    # Disable gradient computation for evaluation
    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)

            # Store true and predicted labels
            all_labels.extend(labels.numpy())
            all_predictions.extend(predicted.numpy())

    # Compute the classification report
    report = classification_report(all_labels, all_predictions, target_names=label_encoder.classes_)
    print(report)

    return report, all_labels, all_predictions

### Implement One Training Run

In [22]:
def training_run(h_params):

    seq_length = int(h_params['seq_length'])
    batch_size = int(h_params['batch_size'])

    train_loader, val_loader, test_loader = get_dataloaders(h_params['use_chroma'], seq_length, batch_size)

    model = PianoRollCNN(h_params)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=h_params['lr'])
    train_cnn(model, train_loader, val_loader, h_params['num_epochs'], criterion, optimizer)

    # TODO: Add model save 
    ###### Save results #####
    return model, test_loader

### Process All Hyperparameter Sets

In [35]:
# Function to process the file and train models
def process_file(file_path):
    infile = pd.read_csv(file_path)

    outputs = ['accuracy', 'bach_f1', 'beethoven_f1', 'chopin_f1', 'mozart_f1']
    floating_point_columns = ['lr']

    for index, row in infile.iterrows():
        # Create hyperparameter dictionary dynamically
        hyperparams = {col: row[col] for col in infile.columns if col not in outputs}
        # Convert int columns to int
        for col in infile.columns:
            if col not in floating_point_columns and col not in outputs:
                hyperparams[col] = int(hyperparams[col])
            elif col in floating_point_columns:
                hyperparams[col] = float(hyperparams[col])

        # Train model
        model, test_loader = training_run(hyperparams)

        # Evaluate performance
        report, all_labels, all_predictions = evaluate_performance(model, test_loader)

        # Write accuracy to data then save
        infile.at[index, 'accuracy'] = float(report.split('\n')[7].split()[1])
        infile.at[index, 'bach_f1'] = float(report.split('\n')[2].split()[3])
        infile.at[index, 'beethoven_f1'] = float(report.split('\n')[3].split()[3])
        infile.at[index, 'chopin_f1'] = float(report.split('\n')[4].split()[3])
        infile.at[index, 'mozart_f1'] = float(report.split('\n')[5].split()[3])
        infile.to_csv(file_path, index=False)


In [36]:
process_file('cnn_optimization.csv')

[Epoch 1, Batch 1000] loss: 1.295
[Epoch 1, Batch 2000] loss: 1.180
[Epoch 1, Batch 3000] loss: 1.148
[Epoch 1, Batch 4000] loss: 1.133
[Epoch 1, Batch 5000] loss: 1.107
[Epoch 1, Batch 6000] loss: 1.083
[Epoch 1, Batch 7000] loss: 1.094
[Epoch 1, Batch 8000] loss: 1.075
Validation loss: 1.085, Accuracy: 46.76%
              precision    recall  f1-score   support

        Bach       0.68      0.92      0.78      3290
   Beethoven       0.45      0.56      0.50      1814
      Chopin       0.91      0.10      0.18      1047
      Mozart       0.14      0.04      0.06       968

    accuracy                           0.59      7119
   macro avg       0.54      0.40      0.38      7119
weighted avg       0.58      0.59      0.52      7119

[Epoch 1, Batch 1000] loss: 1.310
[Epoch 1, Batch 2000] loss: 1.133
[Epoch 1, Batch 3000] loss: 1.092
Validation loss: 1.083, Accuracy: 52.11%
              precision    recall  f1-score   support

        Bach       0.66      0.91      0.76      1619


## Single Training Run

In [38]:
test_params = {}
test_params['use_chroma'] = True
test_params['seq_length'] = 50
test_params['batch_size'] = 8
test_params['lr'] = 0.001
test_params['num_epochs'] = 1

test_params['conv1_out_channels'] = 32
test_params['conv2_out_channels'] = 64
test_params['kernel_size'] = 3
test_params['stride'] = 1

test_params['pool_kernel_size'] = 2
test_params['pool_stride'] = 2

test_params['fc1_out'] = 128

model, test_dataloader = training_run(test_params)

report, _, _ = evaluate_performance(model, test_dataloader)

[Epoch 1, Batch 1000] loss: 1.299
[Epoch 1, Batch 2000] loss: 1.192
[Epoch 1, Batch 3000] loss: 1.175
[Epoch 1, Batch 4000] loss: 1.150
[Epoch 1, Batch 5000] loss: 1.108
[Epoch 1, Batch 6000] loss: 1.108
[Epoch 1, Batch 7000] loss: 1.087
[Epoch 1, Batch 8000] loss: 1.077
Validation loss: 1.067, Accuracy: 52.58%
              precision    recall  f1-score   support

        Bach       0.66      0.89      0.76      3290
   Beethoven       0.45      0.25      0.32      1814
      Chopin       0.86      0.17      0.28      1047
      Mozart       0.23      0.35      0.28       968

    accuracy                           0.55      7119
   macro avg       0.55      0.42      0.41      7119
weighted avg       0.58      0.55      0.51      7119

