The objective is to develop a model that predicts the composer of a piece of classical music, represented in CSV format that was converted directly from MIDI data.

In [195]:
import numpy as np
import matplotlib.pyplot as plt
import csv
import os
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

np.random.seed(0)

All the categories in the original csv is as follows: tick, type, time, meta, track, numerator, denominator, clocks_per_click, notated_32nd_notes_per_beat, key, tempo, control, value, channel, program, note, velocity

--------------

PREPROCESSING 1:
 * NOTE: time column represents the time to wait since the last note played on that channel. This information is fully duplicated in the form of a linear timeline represented by the tick column
 * NOTE: note_off and note_on + velocity of 0 are equivalent

The only useful ones for us are: tick (temporal dimension), type (note_on and note_off are the only one that matters), channel, note (pitch), velocity (loudness)

--------------

PROPROCESSING 2:
For training, the notes will be represented as follows:
 * a note-based DS will be generated. Each row is a note containing its start-tick, duration, pitch and velocity. Shape: (NOTES, 4)
 * a chord-based DS will then be generated. All notes that have the same start-tick, duration and velocity will be grouped together, up to 4. Shape: (CHORDS, 4, 4). Zeros will be used as padding for chords less than 4 notes.

Each song piece may have a maximum CHORDS length of 300. This is just so we have consistent training and testing data shape, in reality it should theoritically work for other sizes as well.

Both will be attempted for training, I suspect the chord based method might have advantages but I'm not sure.

--------------

DATA AUGMENTATION: we can segment the data in many more ways, eg chords 0-300, 10-310, ... , 5900-6200 etc

--------------

DATA EXPLORATION:
 * statistics on chord formation, number of notes sharing the same tick, overall velocity, duration and pitch running average/std deviation
 * outliers in terms of song length, too many notes per chord, long pauses between notes

In [196]:
from dictionary_array import Dict_Array

def process_notes(notes):
    FEATURES_PER_CHORD = 12
    pending_notes = {}
    processed_notes = Dict_Array(list_type='list')

    #print(f'notes: {notes}')

    for note in notes:
        if note['type'] not in ('note_on', 'note_off'):
            continue
        note_id = f"{note['note']}+{note['channel']}"
        if note_id in pending_notes and (note['velocity'] == 0 or note['type'] == 'note_off'):
            pn = pending_notes[note_id]
            processed_note = {'tick': pn['tick'], 'duration': note['tick'] - pn['tick'], 'note': pn['note'], 'channel': pn['channel'], 'velocity': pn['velocity']}
            processed_notes.add(f"{note['tick']}+{note['channel']}+{processed_note['duration']}", processed_note)
        elif note['type'] == 'note_on' and note['velocity'] > 0:
            pending_notes.update({note_id:note})

    #print(f'processed notes: {processed_notes.dictionary}')
    # at this point, all chords are organized together within processed_notes, they just require sorting before insertion into chords
    sorted_keys = list(processed_notes.dictionary.keys())
    sorted_keys.sort()
    chords = np.zeros((len(sorted_keys), FEATURES_PER_CHORD))

    for i, key in enumerate(sorted_keys):
        notes = processed_notes.get(key)
        for j, note in enumerate(notes):
            if j == 0:
                chords[i, 0] = note['tick']
                chords[i, 1] = note['channel']
                chords[i, 2] = note['duration']
                chords[i, 3] = note['velocity']
            if 4 + j == FEATURES_PER_CHORD:
                break
            chords[i, 4 + j] = note['note']

    #print(f'chords: {chords}')

    return chords
    

# augment into multiple chord sets
def split_chords_into_sets(chords):
    FEATURES_PER_CHORD = len(chords[0])
    SKIP_SIZE = 40
    CHORD_SET_SIZE = 4
    start_index = 0
    num_chord_sets = int(max(1, np.ceil((len(chords) - CHORD_SET_SIZE) / SKIP_SIZE)) + 1)
    chord_sets = []
    for i in range(num_chord_sets):
        end_index = min(start_index + CHORD_SET_SIZE, len(chords))
        if end_index <= start_index:
            break

        chord_set = np.zeros((CHORD_SET_SIZE, FEATURES_PER_CHORD))
        chord_set[0:end_index - start_index] = chords[start_index:end_index]
        chord_sets.append(chord_set)

        start_index += SKIP_SIZE

    #print(f'chord_sets: {chord_sets}')

    return chord_sets


def to_int(x):
    if x is None or x == '':
        x = -1
    return int(x)


def import_data(data_dir, folders):
    # folders respresent the directories to import, leave as none to import all
    CATEGORIES = len(folders)

    total_items = 0
    categories = 0
    for subdir in os.listdir(data_dir):
        if folders is not None and subdir not in folders:
            continue
        subdir_path = os.path.join(data_dir, subdir)
        if not os.path.isdir(subdir_path):
            continue
        categories += 1
        total_items += len(os.listdir(subdir_path))

    X_temp = []
    Y_temp = []

    chords = []
    
    category_counter = 0
    item_counter = 0
    # List all subfolders
    for subdir in os.listdir(data_dir):
        # if folders list parameter is specified, only continue if this directory is included in it
        if folders is not None and subdir not in folders:
            continue
        subdir_path = os.path.join(data_dir, subdir)
        # ensure this subdirectory is a folder
        if not os.path.isdir(subdir_path):
            continue
        print(f'reading directory {subdir}')

        # process each file in the subdirectory
        for filename in os.listdir(subdir_path):
            file_path = os.path.join(subdir_path, filename)

            all_notes = []

            with open(file_path, 'r') as csvfile:
                csvreader = csv.reader(csvfile)
                header = next(csvreader)
                IDX_TICK = header.index('tick')
                IDX_TYPE = header.index('type')
                IDX_CHAN = header.index('channel')
                IDX_NOTE = header.index('note')
                IDX_VELO = header.index('velocity')

                for row in csvreader:
                    all_notes.append({'tick': to_int(row[IDX_TICK]), 
                                      'type': row[IDX_TYPE], 
                                      'channel': to_int(row[IDX_CHAN]), 
                                      'note': to_int(row[IDX_NOTE]), 
                                      'velocity': to_int(row[IDX_VELO])})

            chords = process_notes(all_notes)
            chord_sets = split_chords_into_sets(chords)
            X_temp.extend(chord_sets)
            
            category_one_hot = [0] * CATEGORIES
            category_one_hot[category_counter] = 1
            categories_extension = []
            for _ in range(len(chord_sets)):
                categories_extension.append(category_one_hot)
            Y_temp.extend(categories_extension)

            item_counter += 1

        category_counter += 1

    X = np.array(X_temp)
    Y = np.array(Y_temp)
    print(f'X shape: {X.shape} Y shape: {Y.shape}')
    print(f'CATEGORY STATS:')
    for i in range(Y.shape[1]):
        print(f'  {i}: {np.sum(Y[:,i])}')
    return X, Y

VISUALIZATION:

take a chordset and print it in music sheet style to see if it looks reasonable. Compare to original. This helps with outlier and anomaly detection, and catched mistakes that might have been present in the original dataset. Outliers can then be removed when necessary, such as a note that is (mistakenly) held for the entire piece.

In [197]:
from sklearn.model_selection import train_test_split

data_path = '../../Data/music/midi/'

FEATURES_PER_CHORD = 12
CATEGORIES = ["Chopin", "Vivaldi"]
n_categories = len(CATEGORIES)

X, Y = import_data(data_path, CATEGORIES)
X = X.astype(np.float32)
Y = Y.astype(np.float32)

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.1, random_state=0)
print(f'xtrain: {x_train.shape} ytrain: {y_train.shape} xtest: {x_test.shape} ytest: {y_test.shape}')

reading directory Chopin
reading directory Vivaldi
X shape: (14691, 4, 12) Y shape: (14691, 2)
CATEGORY STATS:
  0: 6328
  1: 8363
xtrain: (13221, 4, 12) ytrain: (13221, 2) xtest: (1470, 4, 12) ytest: (1470, 2)


In [198]:
#TODO: make RNN model and visualization. I have preprocessed the data into an organized, uniform format ready to be used. May need to build it custom from linear

def get_category(output):
    return torch.argmax(output)

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size
        self.in2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.h2out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        #print(f'input: {input.size()}, hidden: {hidden.size()}')
        combined = torch.cat((input, hidden), 1)
        hidden = self.in2h(combined)
        output = self.h2out(hidden)
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        #return torch.zeros(1, self.hidden_size)
        return (torch.rand(1, self.hidden_size) - 0.5) * 0.02
    
x_train = torch.from_numpy(x_train)
x_train = torch.unsqueeze(x_train, 2)
y_train = torch.from_numpy(y_train)
y_train = torch.unsqueeze(y_train, 1)
y_train = y_train.type(torch.LongTensor)

x_test = torch.from_numpy(x_test)
x_test = torch.unsqueeze(x_test, 2)
y_test = torch.from_numpy(y_test)
y_test = torch.unsqueeze(y_test, 1)
y_test = y_test.type(torch.LongTensor)

In [199]:
learning_rate = 0.001
criterion = nn.NLLLoss()
n_hidden = 128
rnn = RNN(FEATURES_PER_CHORD, n_hidden, n_categories)

def train(chord_set, category):
    hidden = rnn.initHidden()
    rnn.zero_grad()

    for i in range(chord_set.size()[0]):
        output, hidden = rnn(chord_set[i], hidden)

    #print(f'output: {output} category: {category}')
    loss = criterion(output[0], category[0])
    loss.backward()

    # Add parameters' gradients to their values, multiplied by learning rate
    for p in rnn.parameters():
        clip_value = 1e2
        p.register_hook(lambda grad: torch.clamp(grad, -clip_value, clip_value))
        #print(f'p data: {p.data} grad: {p.grad.data}')
        p.data.add_(p.grad.data, alpha=-learning_rate)

    return output, loss.item()


n_epoch = 10
all_losses = []
current_loss = 0
correct = 0

for epoch in range(n_epoch):
    plot_interval = max(1, int(len(x_train) / 20))
    for i in range(len(x_train)):
        chord_set = x_train[i]
        category = y_train[i]
        output, loss = train(chord_set, category)
        current_loss += loss

        guess = get_category(output)
        if guess == torch.argmax(category):
            correct += 1

        if i % plot_interval == 0 and i != 0:
            print(f'epoch {epoch}, iteration {i}/{len(x_train)}, loss {loss}, {correct}/{i} correct ( {correct/i} )')
            all_losses.append(current_loss/plot_interval)
            current_loss = 0

    correct = 0

epoch 0, iteration 661/13221, loss 0.0, 307/661 correct ( 0.46444780635400906 )
epoch 0, iteration 1322/13221, loss 0.0, 643/1322 correct ( 0.4863842662632375 )
epoch 0, iteration 1983/13221, loss 0.0, 957/1983 correct ( 0.4826021180030257 )
epoch 0, iteration 2644/13221, loss 1218897920.0, 1273/2644 correct ( 0.48146747352496216 )
epoch 0, iteration 3305/13221, loss 468072704.0, 1606/3305 correct ( 0.4859304084720121 )
epoch 0, iteration 3966/13221, loss 1133373440.0, 1938/3966 correct ( 0.4886535552193646 )
epoch 0, iteration 4627/13221, loss 0.0, 2285/4627 correct ( 0.4938405014047979 )
epoch 0, iteration 5288/13221, loss 370916608.0, 2600/5288 correct ( 0.491679273827534 )
epoch 0, iteration 5949/13221, loss 0.0, 2906/5949 correct ( 0.48848545974113294 )
epoch 0, iteration 6610/13221, loss 0.0, 3226/6610 correct ( 0.4880484114977307 )
epoch 0, iteration 7271/13221, loss 0.0, 3532/7271 correct ( 0.48576536927520286 )
epoch 0, iteration 7932/13221, loss 221268704.0, 3854/7932 correct

KeyboardInterrupt: 