# Final Team Project: Music Genre and Composer Classification Using Deep Learning

## Introduction
Music is a form of art that is ubiquitous and has a rich history. Different composers have created music with their unique styles and compositions. However, identifying the composer of a particular piece of music can be a challenging task, especially for novice musicians or listeners. The proposed project aims to use deep learning techniques to identify the composer of a given piece of music accurately.

## Objective
The primary objective of this project is to develop a deep learning model that can predict the composer of a given musical score accurately. The project aims to accomplish this objective by using two deep learning techniques: Long Short-Term Memory (LSTM) and Convolutional Neural Network (CNN).

## Dataset
The project will use a dataset consisting of musical scores from various composers.

The dataset contains the midi files of compositions from well-known classical composers like Bach, Beethoven, Chopin, and Mozart. The dataset should be labeled with the name of the composer for each score.

1-Bach  
2-Beethoven  
3-Chopin  
4-Mozart  

## Methodology
The proposed project will be implemented using the following steps:

1. **Data Collection**: Data is collected and provided to you.
2. **Data Pre-processing**: Convert the musical scores into a format suitable for deep learning models. This involves converting the musical scores into MIDI files and applying data augmentation techniques.
3. **Feature Extraction**: Extract features from the MIDI files, such as notes, chords, and tempo, using music analysis tools.
4. **Model Building**: Develop a deep learning model using LSTM and CNN architectures to classify the musical scores according to the composer.
5. **Model Training**: Train the deep learning model using the pre-processed and feature-extracted data.
6. **Model Evaluation**: Evaluate the performance of the deep learning model using accuracy, precision, and recall metrics.
7. **Model Optimization**: Optimize the deep learning model by fine-tuning hyperparameters.

## Deliverables
There are **two** deliverables for this Final Project:

1. **Project Report**: A comprehensive documentation/report that describes the methodology, data pre-processing steps, feature extraction techniques, model architecture, and training process for reproducibility and future reference. Write your technical report in APA 7 style. Please submit the report in PDF format and use the File naming convention DeliverableName-TeamNumber.pdf; for example, **Project_Report-Team1.pdf**
    - Your report should:
        - contain a reference list that includes any external sources, libraries, or frameworks used during the project, including proper citations or acknowledgments.
        - include a concluding section or markdown cell that summarizes the project, highlights key findings, and suggests any potential future improvements or extensions to the work.
2. **Project Notebook**: A Jupyter Notebook file (.ipynb) that contains the entire project code, including data pre-processing, feature extraction, model building, training, evaluation, and any additional analysis or visualizations performed during the project.
    - This deliverable will be exported from a Jupyter Notebook and submitted as a PDF or HTML file.

## Conclusion
The proposed project aims to use deep learning techniques to accurately predict the composer of a given musical score. The project will be implemented using LSTM and CNN architectures and will involve data pre-processing, feature extraction, model building, training, and evaluation. The final model can be used by novice musicians, listeners, and music enthusiasts to identify the composer of a musical piece accurately.

**NOTE**: Team members may not get the same grade on the Final Team Project, depending on each team member's level of contribution.

## Imports and File Reads

In [1]:
# Standard library imports
import os
import csv
import random
import warnings

# Data processing and numerical operations
import pandas as pd
import numpy as np

# Data visualization
import seaborn as sns
import matplotlib.pyplot as plt
from prettytable import PrettyTable
%matplotlib inline

# MIDI processing
import pretty_midi as pm
import mido

# Machine learning and preprocessing
from sklearn.preprocessing import StandardScaler, LabelEncoder, label_binarize
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc

# Deep learning frameworks
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, LSTM, Conv2D, MaxPooling2D, Dense, Flatten, Concatenate, BatchNormalization, Dropout
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

import torch as pt

# Progress bar
from tqdm import tqdm

# Configure plotting
plt.style.use('fivethirtyeight')
sns.set_theme(style='whitegrid', palette='deep')

# Suppress warnings
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=RuntimeWarning)

2024-07-12 00:23:16.575733: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
root_dir = '../aai-511_group1/midiclassics'

In [None]:
def analyze_midi_file(file_path):
    midi_data = pm.PrettyMIDI(file_path)
    
    info = {
        'filename': file_path.split('/')[-1],
        'total_duration': midi_data.get_end_time(),
        'tempo': midi_data.estimate_tempo(),
        'time_signature_changes': len(midi_data.time_signature_changes),
        'key_signature_changes': len(midi_data.key_signature_changes),
        'number_of_instruments': len(midi_data.instruments),
    }
    
    all_notes = []
    all_control_changes = []
    all_pitch_bends = []
    
    for i, instrument in enumerate(midi_data.instruments):
        info[f'instrument_{i}_name'] = instrument.name
        info[f'instrument_{i}_program'] = instrument.program
        info[f'instrument_{i}_is_drum'] = instrument.is_drum
        info[f'instrument_{i}_note_count'] = len(instrument.notes)
        
        for note in instrument.notes:
            all_notes.append({
                'track': i,
                'type': 'note',
                'start': note.start,
                'end': note.end,
                'pitch': note.pitch,
                'velocity': note.velocity
            })
        
        for cc in instrument.control_changes:
            all_control_changes.append({
                'track': i,
                'type': 'control_change',
                'start': cc.time,
                'number': cc.number,
                'value': cc.value
            })
        
        for pb in instrument.pitch_bends:
            all_pitch_bends.append({
                'track': i,
                'type': 'pitch_bend',
                'start': pb.time,
                'value': pb.pitch
            })
    
    # Tempo changes
    tempo_times, tempo_values = midi_data.get_tempo_changes()
    tempo_changes = [{
        'type': 'tempo_change',
        'start': tempo_times[i],
        'tempo': tempo_values[i]
    } for i in range(len(tempo_times))]
    
    return info, all_notes, all_control_changes, all_pitch_bends, tempo_changes

In [None]:
def process_midi_directory(root_dir, bach_limit=250):
    data = []
    all_notes = []
    all_control_changes = []
    all_pitch_bends = []
    all_tempo_changes = []
    
    bach_files = []

    for composer in os.listdir(root_dir):
        composer_dir = os.path.join(root_dir, composer)
        if os.path.isdir(composer_dir):
            if composer.lower() == 'bach':
                # to handle the file imbalance, we walk all of Bach's files first
                # then select 250 randomly and process them
                for root, _, files in os.walk(composer_dir):
                    bach_files.extend([os.path.join(root, file) for file in files if file.lower().endswith(('.mid', '.midi'))])
                if len(bach_files) > bach_limit:
                    bach_files = random.sample(bach_files, bach_limit)
                for file_path in tqdm(bach_files, desc=f"Processing Bach (limited to {bach_limit})"):
                    process_file(file_path, 'Bach', data, all_notes, all_control_changes, all_pitch_bends, all_tempo_changes)
            else:
                # collect and process remaining composers
                for root, _, files in os.walk(composer_dir):
                    for file in tqdm(files, desc=f"Processing {composer}"):
                        if file.lower().endswith(('.mid', '.midi')):
                            file_path = os.path.join(root, file)
                            process_file(file_path, composer, data, all_notes, all_control_changes, all_pitch_bends, all_tempo_changes)

    df_info = pd.DataFrame(data)
    df_notes = pd.DataFrame(all_notes)
    df_control_changes = pd.DataFrame(all_control_changes)
    df_pitch_bends = pd.DataFrame(all_pitch_bends)
    df_tempo_changes = pd.DataFrame(all_tempo_changes)
   
    return df_info, df_notes, df_control_changes, df_pitch_bends, df_tempo_changes

def process_file(file_path, composer, data, all_notes, all_control_changes, all_pitch_bends, all_tempo_changes):
    try:
        info, notes, control_changes, pitch_bends, tempo_changes = analyze_midi_file(file_path)
        info['composer'] = composer
        data.append(info)
        
        # unique ID for each file
        file_id = len(data) - 1 
        
        for note in notes:
            note['file_id'] = file_id
            all_notes.append(note)
        
        for cc in control_changes:
            cc['file_id'] = file_id
            all_control_changes.append(cc)
        
        for pb in pitch_bends:
            pb['file_id'] = file_id
            all_pitch_bends.append(pb)
        
        for tc in tempo_changes:
            tc['file_id'] = file_id
            all_tempo_changes.append(tc)
    
    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")


df_info, df_notes, df_control_changes, df_pitch_bends, df_tempo_changes = process_midi_directory(root_dir)

# saving to CSV for reporducability and save time processing files
df_info.to_csv('midi_info.csv', index=False, escapechar='\\', quoting=csv.QUOTE_ALL)
df_notes.to_csv('midi_notes.csv', index=False, escapechar='\\', quoting=csv.QUOTE_ALL)
df_control_changes.to_csv('midi_control_changes.csv', index=False, escapechar='\\', quoting=csv.QUOTE_ALL)
df_pitch_bends.to_csv('midi_pitch_bends.csv', index=False, escapechar='\\', quoting=csv.QUOTE_ALL)
df_tempo_changes.to_csv('midi_tempo_changes.csv', index=False, escapechar='\\', quoting=csv.QUOTE_ALL)

display(df_info.head())
display(df_info.shape)

In [None]:
# Add file_id to df_info
df_info['file_id'] = df_info.index

# Group notes by file_id and create features
note_features = df_notes.groupby('file_id').agg({
    'pitch': ['mean', 'std', 'min', 'max'],
    'velocity': ['mean', 'std', 'min', 'max'],
    'start': ['min', 'max'],
    'end': ['max']
}).reset_index()
note_features.columns = ['file_id'] + [f'note_{col[0]}_{col[1]}' for col in note_features.columns[1:]]

# Create features from control changes
cc_features = df_control_changes.groupby('file_id').agg({
    'number': ['nunique'],
    'value': ['mean', 'std']
}).reset_index()
cc_features.columns = ['file_id'] + [f'cc_{col[0]}_{col[1]}' for col in cc_features.columns[1:]]

# Create features from pitch bends
pb_features = df_pitch_bends.groupby('file_id').agg({
    'value': ['mean', 'std', 'min', 'max']
}).reset_index()
pb_features.columns = ['file_id'] + [f'pb_{col[0]}_{col[1]}' for col in pb_features.columns[1:]]

# Create features from tempo changes
tempo_features = df_tempo_changes.groupby('file_id').agg({
    'tempo': ['mean', 'std', 'min', 'max', 'count']
}).reset_index()
tempo_features.columns = ['file_id'] + [f'tempo_{col[0]}_{col[1]}' for col in tempo_features.columns[1:]]

# Merge all features
combined_features = df_info.merge(note_features, on='file_id', how='left')\
                           .merge(cc_features, on='file_id', how='left')\
                           .merge(pb_features, on='file_id', how='left')\
                           .merge(tempo_features, on='file_id', how='left')

# Fill NaN values (in case some files don't have certain features)
combined_features = combined_features.fillna(0)

# print(combined_features.head())
print(f'Combined Features Dataframe Shape: {combined_features.shape}')

# Save the combined_features DataFrame to a CSV file
combined_features.to_csv('combined_features.csv', index=False, escapechar='\\', quoting=csv.QUOTE_ALL)

# Print confirmation
print("DataFrame saved to 'combined_features.csv'")

In [None]:
def create_sequence(file_id, max_length=500):
    file_notes = df_notes[df_notes['file_id'] == file_id].sort_values('start')
    sequence = file_notes[['pitch', 'velocity', 'start', 'end']].values
    if len(sequence) > max_length:
        sequence = sequence[:max_length]
    else:
        padding = np.zeros((max_length - len(sequence), 4))
        sequence = np.vstack((sequence, padding))
    return sequence

# Create sequences for each file
X_lstm = np.array([create_sequence(file_id) for file_id in combined_features['file_id']])
y = combined_features['composer'].values

In [None]:
def create_piano_roll(file_id, time_steps=500, pitch_range=128):
    file_notes = df_notes[df_notes['file_id'] == file_id]
    piano_roll = np.zeros((time_steps, pitch_range))
    for _, note in file_notes.iterrows():
        start = int(note['start'] * time_steps / file_notes['end'].max())
        end = int(note['end'] * time_steps / file_notes['end'].max())
        pitch = int(note['pitch'])
        piano_roll[start:end, pitch] = note['velocity']
    return piano_roll

# Create piano rolls for each file
X_cnn = np.array([create_piano_roll(file_id) for file_id in combined_features['file_id']])

In [None]:
# Normalize LSTM input
scaler_lstm = StandardScaler()
X_lstm_scaled = scaler_lstm.fit_transform(X_lstm.reshape(-1, X_lstm.shape[-1])).reshape(X_lstm.shape)

# Normalize CNN input (assuming X_cnn is already created)
scaler_cnn = StandardScaler()
X_cnn_scaled = scaler_cnn.fit_transform(X_cnn.reshape(-1, X_cnn.shape[-1])).reshape(X_cnn.shape)

# Normalize combined features
X_combined = combined_features.select_dtypes(include=[np.number])
scaler_combined = StandardScaler()
X_combined_scaled = scaler_combined.fit_transform(X_combined)

In [None]:
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

print("X_lstm_scaled shape:", X_lstm_scaled.shape)
print("X_cnn_scaled shape:", X_cnn_scaled.shape)
print("X_combined_scaled shape:", X_combined_scaled.shape)
print("y_encoded shape:", y_encoded.shape)

In [None]:
X_lstm_train, X_lstm_test, X_cnn_train, X_cnn_test, X_combined_train, X_combined_test, y_train, y_test = train_test_split(
    X_lstm_scaled, X_cnn_scaled, X_combined_scaled, y_encoded, test_size=0.2, random_state=42
)

print("After splitting:")
print("X_lstm_train shape:", X_lstm_train.shape)
print("X_cnn_train shape:", X_cnn_train.shape)
print("X_combined_train shape:", X_combined_train.shape)
print("y_train shape:", y_train.shape)

Tweaked Model --> 75% (Hopefully more)

In [None]:
# LSTM input
lstm_input = Input(shape=(X_lstm_train.shape[1], X_lstm_train.shape[2]))
lstm_layer1 = LSTM(128, return_sequences=True)(lstm_input)
lstm_layer1 = BatchNormalization()(lstm_layer1)
lstm_layer1 = Dropout(0.3)(lstm_layer1)
lstm_layer2 = LSTM(64)(lstm_layer1)
lstm_layer2 = BatchNormalization()(lstm_layer2)
lstm_out = Dropout(0.3)(lstm_layer2)

# CNN input
cnn_input = Input(shape=(X_cnn_train.shape[1], X_cnn_train.shape[2], 1))
conv1 = Conv2D(64, kernel_size=(3, 3), activation='relu', kernel_regularizer=l2(0.01))(cnn_input)
conv1 = BatchNormalization()(conv1)
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = Conv2D(128, kernel_size=(3, 3), activation='relu', kernel_regularizer=l2(0.01))(pool1)
conv2 = BatchNormalization()(conv2)
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
flatten = Flatten()(pool2)
flatten = Dropout(0.3)(flatten)

# Combined features input
combined_input = Input(shape=(X_combined_train.shape[1],))
combined_dense = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(combined_input)
combined_dense = BatchNormalization()(combined_dense)
combined_dense = Dropout(0.3)(combined_dense)

# Merge all features
merged = Concatenate()([lstm_out, flatten, combined_dense])
merged = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(merged)
merged = BatchNormalization()(merged)
merged = Dropout(0.3)(merged)

# Output layer
output = Dense(len(np.unique(y)), activation='softmax')(merged)

# Create model
model = Model(inputs=[lstm_input, cnn_input, combined_input], outputs=output)

# Compile model
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=0.00001)

# Train model
history = model.fit(
    [X_lstm_train, X_cnn_train, X_combined_train], 
    y_train, 
    validation_split=0.2, 
    epochs=100,  # Increased epochs
    batch_size=32,
    callbacks=[early_stopping, lr_scheduler]
)

Original Model --> 81-82% Acc

In [None]:
# # LSTM input
# lstm_input = Input(shape=(X_lstm_train.shape[1], X_lstm_train.shape[2]))
# lstm_out = LSTM(64)(lstm_input)

# # CNN input
# cnn_input = Input(shape=(X_cnn_train.shape[1], X_cnn_train.shape[2], 1))
# conv1 = Conv2D(32, kernel_size=(3, 3), activation='relu')(cnn_input)
# pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
# flatten = Flatten()(pool1)

# # Combined features input
# combined_input = Input(shape=(X_combined_train.shape[1],))

# # Merge all features
# merged = Concatenate()([lstm_out, flatten, combined_input])

# # Output layer
# output = Dense(len(np.unique(y)), activation='softmax')(merged)

# # Create model
# model = Model(inputs=[lstm_input, cnn_input, combined_input], outputs=output)

# # Compile model
# model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# # Train model
# history = model.fit(
#     [X_lstm_train, X_cnn_train, X_combined_train], 
#     y_train, 
#     validation_split=0.2, 
#     epochs=50, 
#     batch_size=32
# )

In [None]:
# 1. Model Evaluation
test_loss, test_accuracy = model.evaluate([X_lstm_test, X_cnn_test, X_combined_test], y_test)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

# Get predictions
y_pred = model.predict([X_lstm_test, X_cnn_test, X_combined_test])
y_pred_classes = np.argmax(y_pred, axis=1)

# 2. Confusion Matrix
cm = confusion_matrix(y_test, y_pred_classes)
plt.figure(figsize=(18, 12))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(np.unique(y_test)))
plt.xticks(tick_marks, np.unique(y_test))
plt.yticks(tick_marks, np.unique(y_test))
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

# 3. Classification Report
print(classification_report(y_test, y_pred_classes))

# 4. ROC Curve and AUC (for binary classification)
# If binary classification
if len(np.unique(y_test)) == 2:
    fpr, tpr, _ = roc_curve(y_test, y_pred[:, 1])
    roc_auc = auc(fpr, tpr)
    
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.show()
else:
    # For multi-class, you might want to use one-vs-rest ROC curves
    y_test_bin = label_binarize(y_test, classes=np.unique(y_test))
    n_classes = y_test_bin.shape[1]
    
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_pred[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    
    plt.figure(figsize=(10, 8))
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], label=f'ROC curve of class {i} (AUC = {roc_auc[i]:.2f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Multi-class ROC')
    plt.legend(loc="lower right")
    plt.show()

# 5. Learning Curves
plt.figure(figsize=(18, 12))
plt.subplot(121)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(122)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Production Demo
These 8 files will be used for Demo purposes only. They were removed from the MIDI files dataset prior to models being trained, so the models will have never seen these files. They will be used in a final demonstration to determine if our efforts in being able to create a deep learning model to classify composers was successful.

### Bach Files
- *07 Rondo.mid*
- *022602bv.mid*

### Beethoven Files
- *Sonatina In C.mid*
- *137.MID*

### Chopin Files
- *Prelude n18 op28 "Suicide".mid*
- *Ballad op53.mid*

### Mozart Files
- *Early Pieces n9 Allegretto.mid*
- *Rondo.mid*