## Package Set up

In [None]:
!pip install pretty_midi

In [None]:
import os
import requests
from zipfile import ZipFile

import pretty_midi
import numpy as np
import glob

import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader,random_split
import matplotlib.pyplot as plt

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Download Dataset

In [None]:
def download_dataset(dataset_url, save_path):
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    file_name = dataset_url.split('/')[-1]
    zip_path = os.path.join(save_path, file_name)

    if os.path.exists(os.path.join(save_path, dataset_url.split('/')[-1])):
        print("Dataset already downloaded.")
    else:
        print("Downloading dataset...")
        response = requests.get(dataset_url)
        with open(zip_path, 'wb') as f:
            f.write(response.content)

    if os.path.exists(os.path.join(save_path, 'POP909')):
        print("Dataset already extracted.")
    else:
        print("Extracting dataset...")
        with ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(save_path)

    print("Dataset downloaded and extracted successfully.")

In [None]:
data_URL = "https://storage.googleapis.com/magentadata/datasets/maestro/v3.0.0/maestro-v3.0.0-midi.zip"
data_path = "./maestro_dataset"
download_dataset(data_URL, data_path)

Downloading dataset...
Extracting dataset...
Dataset downloaded and extracted successfully.


In [None]:
musicFile = glob.glob(os.path.join(data_path, '**/*.mid*'), recursive=True)
print('Number of files:', len(musicFile))

Number of files: 1276


## Data Processing

In [None]:
def process_midi(path_to_midi):
    midi_data = pretty_midi.PrettyMIDI(path_to_midi)

    # extract melody
    melody = []
    for instrument in midi_data.instruments:
        if not instrument.is_drum:  # Determine whether it is a non-percussion instrument
            for note in instrument.notes:
                start = note.start
                end = note.end
                velocity = note.velocity
                pitch = note.pitch
                melody.append((start, end, velocity, pitch))

    melody.sort(key=lambda x: x[0]) # Sort melody by time
    return melody

def preprocess_dataset(midi_files):
    dataset = []
    for file_path in midi_files:
        file_melody = process_midi(file_path)
        dataset.append(file_melody)
    return dataset

In [None]:
processed_data = preprocess_dataset(musicFile)

In [None]:
# print(processed_data[0])
print(len(processed_data))

1276


## Extract Main Melody & Split data

In [None]:
class Skyline(Dataset):
    def __init__(self, notes, window_size=10):
        self.notes = notes
        self.window_size = window_size
        self.data = self.load_data()

    def load_data(self):
        data = []
        for song in self.notes:
            for i in range(len(song) - self.window_size + 1):
                window = song[i:i+self.window_size]
                max_pitch = max(note[-1] for note in window)
                label = [1 if note[-1] == max_pitch else 0 for note in window]
                data.append((window, label))
        return data

    def __len__(self):
        return len(self.data)


    def __getitem__(self, idx):
        window, label = self.data[idx]

        return torch.tensor(window, dtype=torch.float), torch.tensor(label, dtype=torch.bool)

In [None]:
window_size = 10

dataset = Skyline(processed_data, window_size = window_size)

In [None]:
print(len(dataset))

7028680


In [None]:
# Dataset split
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

## GRU Model

In [None]:
class BasicGRU(nn.Module):
    def __init__(self, input_shape):
        super(BasicGRU, self).__init__()
        self.gru1 = nn.GRU(input_shape, 128, batch_first=True)
        self.dropout1 = nn.Dropout(0.2)
        self.gru2 = nn.GRU(128, 128, batch_first=True)
        self.dropout2 = nn.Dropout(0.2)
        self.time_distributed = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x, _ = self.gru1(x)
        x = self.dropout1(x)
        x, _ = self.gru2(x)
        x = self.dropout2(x)
        x = self.time_distributed(x)
        x = self.sigmoid(x)
        return x

In [None]:
model = BasicGRU(4).to(device)

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

## Training Part

In [None]:
history = {'train_loss': [], 'val_loss': [], 'train_accuracy': [], 'val_accuracy': []}

In [None]:
def train_and_validate(epochs):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_correct = 0
        total_train = 0

        train_loader_tqdm = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch")

        for inputs, labels in train_loader_tqdm:
            inputs, labels = inputs.to(device), labels.to(device).float()

            optimizer.zero_grad()
            outputs = model(inputs).squeeze(-1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)
            predictions = (outputs > 0.5).float()
            train_correct += (predictions == labels).sum().item()
            total_train += labels.size(0)

            train_loader_tqdm.set_postfix({"Train Loss": train_loss / total_train, "Train Accuracy": train_correct / total_train})

        train_accuracy = train_correct / total_train
        history['train_loss'].append(train_loss / len(train_loader.dataset))
        history['train_accuracy'].append(train_accuracy)

        model.eval()
        val_loss = 0
        val_correct = 0
        total_val = 0

        with torch.no_grad():
            val_loader_tqdm = tqdm(val_loader, desc=f"Validation {epoch+1}/{epochs}", unit="batch")
            for inputs, labels in val_loader_tqdm:
                inputs, labels = inputs.to(device), labels.to(device).float()

                outputs = model(inputs).squeeze(-1)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                predictions = (outputs > 0.5).float()
                val_correct += (predictions == labels).sum().item()
                total_val += labels.size(0)

                val_loader_tqdm.set_postfix({"Val Loss": val_loss / total_val, "Val Accuracy": val_correct / total_val})

        val_accuracy = val_correct / total_val
        history['val_loss'].append(val_loss / len(val_loader.dataset))
        history['val_accuracy'].append(val_accuracy)

        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss/total_train}, Train Accuracy: {train_accuracy}, Val Loss: {val_loss/total_val}, Val Accuracy: {val_accuracy}')

In [None]:
train_and_validate(10)

In [None]:
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history['train_accuracy'], label='Train Accuracy')
plt.plot(history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()