# Composer Identification with CNNs

In this notebook we are going to explore composer identification using convolutional neural networks.

In [None]:
# Let's start importing stuff!
import pandas as pd
import numpy as np
import partitura as pt
import os
import warnings
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
from sklearn.metrics import f1_score
from partitura.utils.music import compute_pianoroll

from sklearn.model_selection import train_test_split

from classification_utils import (
    COMPOSER_CLASSES,
    CLASSES_COMPOSER,
    encode_composer,
    segment_array,
    predict_piece,
    plot_confusion_matrix
)

warnings.filterwarnings("ignore", module="partitura")

## Data Exploration

Let's have a look at the data

In [None]:
DATASET_FN = "composer_classification_training.csv"
data = pd.read_csv(DATASET_FN)

# Let's see how the data looks like
data.head()

We have scores in MusicXML format, and for each score we have the composer.

Let's look now at the distribution of the composers. Do you notice anything in particular about this distribution?

In [None]:
# Get the counts of each unique category
value_counts = data["Composer"].value_counts()

# Plotting
plt.figure(figsize=(10, 6))


plt.bar(
    value_counts.index,
    value_counts.values,
    color="firebrick",
    edgecolor="black",
)
plt.xlabel("Composer")
plt.ylabel("Count")
plt.title("Frequency of Composers")


plt.xticks(rotation=45)

# Show the plot
plt.tight_layout()
plt.show()

In [None]:
print(COMPOSER_CLASSES)

composer_indices = encode_composer(data["Composer"])

Let us now load the scores. For this tutorial we are going to use piano rolls as a low-level representation of the music.

In [None]:
# Replace this path with the location where you downloaded the files
PIANO_ROLL_DIR = "piano_rolls_numpy"
# Replace this path with the location where you downloaded the files
MUSICXML_DIR = "scores_composer_identification"

pianorolls = []

# Divisions per beat
time_div = 10

for sfn in data["Score"]:

    pr_fn = os.path.join(PIANO_ROLL_DIR, f"{sfn}.npz")

    if not os.path.exists(pr_fn):
        # Load scores
        score = pt.load_musicxml(os.path.join(MUSICXML_DIR, sfn))
        # Compute piano roll
        pr = compute_pianoroll(
            score,
            piano_range=True, 
            time_unit="beat",
            time_div=time_div,
        ).toarray().T
        
        # Save piano roll
        np.savez_compressed(pr_fn, array=pr)
    else:
        pr = np.load(pr_fn)["array"]

    pianorolls.append(pr)

pianorolls = np.array(pianorolls, dtype=object)

Let's see how a piano roll of this kind of data looks like!

In [None]:
plt.imshow(
    pianorolls[0].T,
    aspect="auto",
    origin="lower",
    cmap="gray",
    interpolation="nearest",
)

plt.ylabel("Piano key")
plt.xlabel("Time")
plt.tight_layout()
plt.show()

We are going to split by piece (to ensure that segments of pieces appear only in training, testing or validation)

In [None]:
# Split the dataset into training and testing sets with stratified sampling
test_size = 0.2
pr_train_val, pr_test, ci_train_val, ci_test = train_test_split(
    pianorolls,
    composer_indices,
    test_size=test_size,
    stratify=composer_indices,
    random_state=42,
)

val_size = 0.1
pr_train, pr_val, ci_train, ci_val = train_test_split(
    pr_train_val,
    ci_train_val,
    test_size=val_size,
    stratify=ci_train_val,
    random_state=1942,
)

# Print the shapes to verify
print(f"X_train shape: {pr_train.shape}, Y_train shape: {ci_train.shape}")
print(f"X_val shape: {pr_val.shape}, Y_val shape: {ci_val.shape}")
print(f"X_test shape: {pr_test.shape}, Y_test shape: {ci_test.shape}")

In [None]:
fig, axes = plt.subplots(3, sharex=True, figsize=(6, 5))

unique_classes = np.array(list(COMPOSER_CLASSES.keys()))
# unique, counts = np.unique(array, return_counts=True)

unique_train, counts_train = np.unique(
    ci_train,
    return_counts=True,
)
unique_val, counts_val = np.unique(
    ci_val,
    return_counts=True,
)
unique_test, counts_test = np.unique(
    ci_test,
    return_counts=True,
)

# Plot the histogram
axes[0].bar(unique_train, counts_train, color="firebrick", edgecolor="black")
axes[0].set_title("Training set")
axes[1].bar(unique_val, counts_val, color="firebrick", edgecolor="black")
axes[1].set_title("Validation set")
axes[2].bar(unique_test, counts_test, color="firebrick", edgecolor="black")
axes[2].set_title("Test set")
axes[2].set_xlabel("Composer index")
axes[0].set_ylabel("Count")
axes[1].set_ylabel("Count")
axes[2].set_ylabel("Count")

# Show the plot with tight layout to avoid label overlap
plt.tight_layout()
plt.show()

In [None]:
# length of the segment
pr_segment_length = 16 * 10
X_train = []
Y_train = []

for pr, ci in zip(pr_train, ci_train):

    segmented_pr = segment_array(
        array=pr,
        window_length=pr_segment_length,
    )
    targets = np.ones(len(segmented_pr)) * ci

    X_train.append(segmented_pr)
    Y_train.append(targets)

# Concatenate the arrays
X_train = np.concatenate(X_train)
Y_train = np.concatenate(Y_train)

In [None]:
# length of the segment
pr_segment_length = 16 * 10
X_val = []
Y_val = []

for pr, ci in zip(pr_val, ci_val):

    segmented_pr = segment_array(
        array=pr,
        window_length=pr_segment_length,
    )
    targets = np.ones(len(segmented_pr)) * ci

    X_val.append(segmented_pr)
    Y_val.append(targets)

# Concatenate the arrays
X_val = np.concatenate(X_val)
Y_val = np.concatenate(Y_val)

In [None]:
print("Training examples", len(X_train))
print("Validation examples", len(X_val))

### Training the CNN

In [None]:
# Set device (use GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(
    1
)  # Add channel dimension
Y_train_tensor = torch.tensor(Y_train, dtype=torch.long)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).unsqueeze(
    1
)  # Add channel dimension
Y_val_tensor = torch.tensor(Y_val, dtype=torch.long)

# Hyperparameters
n_classes = len(COMPOSER_CLASSES)
learning_rate = 0.001
batch_size = 64
num_epochs = 2

# Create DataLoader for training and valing sets
train_dataset = TensorDataset(X_train_tensor, Y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, Y_val_tensor)
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    drop_last=True,
)
val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    drop_last=True,
)

In [None]:
class CNN(nn.Module):
    def __init__(
        self, n_classes=9, input_height=X_train.shape[1], input_width=X_train.shape[2]
    ):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(
            1, 32, kernel_size=3, stride=1, padding=1
        )  # Output: (batch_size, 32, input_height, input_width)
        self.conv2 = nn.Conv2d(
            32, 64, kernel_size=3, stride=1, padding=1
        )  # Output: (batch_size, 64, input_height, input_width)
        self.pool = nn.MaxPool2d(
            kernel_size=2, stride=2, padding=0
        )  # Halves the spatial dimensions
        self.relu = nn.ReLU()

        # Compute the size after conv and pooling layers
        self.flattened_size = self._get_flattened_size(input_height, input_width)

        # Now, initialize the fully connected layers with the correct input size
        self.fc1 = nn.Linear(self.flattened_size, 128)
        self.fc2 = nn.Linear(128, n_classes)

    def _get_flattened_size(self, height, width):
        # Pass a dummy input through conv and pool layers to get the output size
        with torch.no_grad():
            x = torch.zeros(1, 1, height, width)
            x = self.pool(self.relu(self.conv1(x)))
            x = self.pool(self.relu(self.conv2(x)))
            flattened_size = x.numel()
        return flattened_size

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))  # After first conv and pool
        x = self.pool(self.relu(self.conv2(x)))  # After second conv and pool
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
# Instantiate the model, define the loss function and the optimizer
model = CNN(n_classes=n_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

best_val_acc = 0.0  # Initialize best validation accuracy
validate_freq = 1  # Set validation frequency (default is 1)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    progress_bar = tqdm(
        enumerate(train_loader),
        total=len(train_loader),
        desc=f"Epoch {epoch+1}/{num_epochs}",
    )
    for i, (inputs, labels) in progress_bar:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        progress_bar.set_postfix({"Loss": running_loss / (i + 1)})

    # Perform validation every 'validate_freq' epochs
    if (epoch + 1) % validate_freq == 0:
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        val_loss /= len(val_loader)
        val_acc = 100 * correct / total
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.2f}%")

        # Save checkpoint if validation accuracy improves
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_model_checkpoint.pth")
            print("Model checkpoint saved.")

In [None]:
# Load the state dictionary from the saved file
state_dict = torch.load(
    "best_model_checkpoint.pth",
    map_location=device,
)
# Load the state dictionary into the model
model.load_state_dict(state_dict)

In [None]:
# Visualize some predictions
def plot_predictions(n=5):
    model.eval()
    with torch.no_grad():
        inputs, labels = next(iter(val_loader))
        inputs, labels = inputs[:n].to(device), labels[:n].to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        fig, axes = plt.subplots(1, n, figsize=(15, 6))
        for i in range(n):
            axes[i].imshow(
                inputs[i].cpu().squeeze().T,
                cmap="gray",
                aspect="auto",
                interpolation="nearest",
            )
            axes[i].set_title(f"True: {labels[i].item()} Pred: {predicted[i].item()}")
            axes[i].axis("off")
    plt.show()


plot_predictions()

## Evaluation

In [None]:
predictions_test = []
for pr in pr_test:

    pred = predict_piece(
        pianoroll=pr,
        segment_length=16 * 10,
        cnn=model,
        device=device,
    )
    predictions_test.append(pred)

predictions_test = np.array(predictions_test)

accuracy_test = np.mean(predictions_test == ci_test)

In [None]:


plot_confusion_matrix(
    predictions=predictions_test,
    targets=ci_test,
    class_names=list(COMPOSER_CLASSES.keys()),
)

In [None]:

# Compute the macro F1 score
macro_f1 = f1_score(ci_test, predictions_test, average="macro")
micro_f1 = f1_score(ci_test, predictions_test, average="micro")
print(f"Accuracy {accuracy_test * 100:.2f}%")
print(f"Macro F1 Score: {macro_f1 * 100:.2f}%")
print(f"Micro F1 Score: {micro_f1 * 100:.2f}%")