In [79]:
import mido
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, Dataset
from matplotlib import pyplot
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, classification_report
import pandas as pd
os.chdir('C:/Users/damaf/Documents/Python Scripts/D7047E')
example_midi = 'maestro-v2.0.0/2018/MIDI-Unprocessed_Chamber3_MID--AUDIO_10_R3_2018_wav--1.midi'

def list_files(startpath): #Prototype function for listing the directory
    for root, dirs, files in os.walk(startpath):
        level = root.replace(startpath, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print('{}{}/'.format(indent, os.path.basename(root)))
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print('{}{}'.format(subindent, f))
            
def load_midi(path): #Load midi file
    mid = mido.MidiFile(path)
    return mid

def bpm_to_tempo(bpm): #Converts tempo in beats/minute to microseconds/beat
    tempo = (60000000/bpm)
    return tempo

def tempo_to_bpm(tempo): #Converts tempo in microseconds/beat to beats/minute
    bpm = 60000000/tempo
    return bpm

def ticks_to_seconds(ticks,tempo,tpb):
    return tempo/1000000*ticks/tpb
    
print()

def quantize_track(track ,ticks_per): #ticks_per denotes quantization base. 12 for 32nds, 24 for 16ths and 48 for 8ths
    for msg in track:
        msg.time = round(msg.time/ticks_per)*ticks_per
    return track


def print_verbose(mid,length):
    for i, track in enumerate(mid.tracks):
        print('Track {}: {}'.format(i, track.name))
        for msg in track[0:length]:
            print(msg)
            
def print_track_verbose(midi_track):
    for msg in midi_track:
        print(msg)
        
def generate_pitch_classes(): #Returns a list where the first index (0-11) corresponds to each musical note starting from A0
    notes = [[],[],[],[],[],[],[],[],[],[],[],[]]
    A0 = 21
    for i in range(8):
        for j in range(12):
            note = A0 +i*12 + j
            if note <= 108:
                notes[j].append(note)
    return notes

def note_to_pitchclass(midi_note,pitch_classes): #Returns a vector with a 1 for the entered note
    pitch_class = [0,0,0,0,0,0,0,0,0,0,0,0]
    for i in range(12):
        if midi_note in pitch_classes[i]:
            pitch_class[i] = 1
    return pitch_class
mid = load_midi(example_midi)
print_verbose(mid,150)
pitch_classes = generate_pitch_classes()


Track 0: 
<meta message set_tempo tempo=500000 time=0>
<meta message time_signature numerator=4 denominator=4 clocks_per_click=24 notated_32nd_notes_per_beat=8 time=0>
<meta message end_of_track time=1>
Track 1: 
program_change channel=0 program=0 time=0
control_change channel=0 control=64 value=127 time=0
note_on channel=0 note=67 velocity=52 time=755
note_on channel=0 note=72 velocity=67 time=615
note_on channel=0 note=67 velocity=0 time=20
note_on channel=0 note=72 velocity=0 time=74
control_change channel=0 control=64 value=117 time=128
control_change channel=0 control=64 value=111 time=15
control_change channel=0 control=64 value=107 time=16
control_change channel=0 control=64 value=103 time=15
note_on channel=0 note=78 velocity=65 time=11
control_change channel=0 control=64 value=98 time=3
note_on channel=0 note=71 velocity=45 time=2
control_change channel=0 control=64 value=93 time=14
note_on channel=0 note=61 velocity=39 time=5
note_on channel=0 note=67 velocity=39 time=2
cont

In [81]:
mid = load_midi(example_midi)

def group_by_time(track, group_size, ignore_tail = True, force_even = False): #Group note messages by time
    grouped_messages = []
    current_group = []
    time_passed = 0
    for i,msg in enumerate(track):
    
        if time_passed+msg.time > group_size: #If note exceeds time-frame
            surplus_time = time_passed+msg.time-group_size
            msg.time = group_size-time_passed
            
            if msg.type == 'note_on':
                current_group.append(msg)
            elif msg.type == 'note_off':
                current_group.append(mido.Message('note_on', note=msg.note, velocity=0, channel=msg.channel, time= msg.time))
            else:
                current_group.append(mido.Message('note_on', note=0, velocity=0, channel=msg.channel, time= msg.time))
            grouped_messages.append(current_group)
            current_group = [mido.Message('note_on', note=0, velocity = 0, channel=msg.channel, time = surplus_time)]
            time_passed = surplus_time
            
        else:
            time_passed += msg.time
            if msg.type == 'note_on':
                current_group.append(msg)
            elif msg.type == 'note_off':
                current_group.append(mido.Message('note_on', note=msg.note, velocity=0, channel=msg.channel, time= msg.time))
            else:
                current_group.append(mido.Message('note_on', note=0, velocity=0, channel=msg.channel, time= msg.time))
                
        if not ignore_tail: #Keep/drop last group if not full
            if i == len(track): #Check if end of track is reached, and if it is appends the current group to the total group 
                grouped_messages.append(current_group)
                
    if force_even:
        if not len(grouped_messages) % 2 == 0:
            del grouped_messages[-1]

    return grouped_messages
            
'''
def group_by_nr_notes(track, group_size,ignore_tail = True, force_even = False): #Group messages by number of notes
    current_group = []
    grouped_messages = []
    current_size = 0
    surplus_time = 0
    for i, msg in enumerate(track):
        
        if not current_size == group_size: #Append msg to group
    
            if msg.type == 'note_on':
                if not surplus_time == 0:
                    current_group.append(mido.Message('note_on', note=0, velocity=0, channel=msg.channel, time= surplus_time))
                    current_size += 1
                    surplus_time = 0
                    
                current_group.append(msg)
                current_size += 1
            if msg.type == 'note_off':
                if not surplus_time == 0:
                    current_group.append(mido.Message('note_on', note=msg.note, velocity=0, channel=msg.channel, time= surplus_time))
                    current_size += 1
                    surplus_time = 0
                    
                current_group.append(mido.Message('note_on', note=msg.note, velocity=0, channel=msg.channel, time= msg.time))
                current_size += 1
            
            elif msg.type == 'program_change' or msg.type == 'control_change':
                if current_size == 0:
                    surplus_time += msg.time
                else:
                    current_group[-1].time += msg.time


        else: #If end of group is reached, start new group
            if msg.type == 'note_on':
                grouped_messages.append(current_group)
                current_group = [msg]
                current_size = 1
            elif msg.type == 'note_off':
                grouped_messages.append(current_group)
                current_group = [mido.Message('note_on', note=msg.note, velocity=0, channel=msg.channel, time= msg.time)]
                current_size = 1
            elif msg.type == 'control_change' or msg.type == 'program_change':
                current_group[-1].time += msg.time


        if not ignore_tail:#Keep/drop last group if not full
            if i == len(track)-1: #Check if end of track is reached, and if it is appends the current group to the total group 
                grouped_messages.append(current_group)
                
    if force_even:
        if not len(grouped_messages) % 2 == 0:
            del grouped_messages[-1]
    return grouped_messages
'''
def group_by_nr_notes(track, group_size,ignore_tail = True, force_even = False): #Group messages by number of notes
    current_group = []
    grouped_messages = []
    current_size = 0
    surplus_time = 0
    for i, msg in enumerate(track):
        
        if not current_size == group_size: #Append msg to group
    
            if msg.type == 'note_on':

                current_group.append(mido.Message('note_on', note=msg.note, velocity=msg.velocity, channel=msg.channel, time= msg.time+surplus_time))
                surplus_time = 0
                current_size += 1
                    
            if msg.type == 'note_off':
                
                current_group.append(mido.Message('note_on', note=msg.note, velocity=0, channel=msg.channel, time= msg.time+surplus_time))
                surplus_time = 0
                current_size += 1
            
            elif msg.type == 'program_change' or msg.type == 'control_change':
                surplus_time += msg.time


        else: #If end of group is reached, start new group
            if msg.type == 'note_on':
                grouped_messages.append(current_group)
                current_group = [mido.Message('note_on', note=msg.note, velocity=msg.velocity, channel=msg.channel, time= msg.time+surplus_time)]
                current_size = 1
            elif msg.type == 'note_off':
                grouped_messages.append(current_group)
                current_group = [mido.Message('note_on', note=msg.note, velocity=0, channel=msg.channel, time= msg.time+surplus_time)]
                current_size = 1
            elif msg.type == 'control_change' or msg.type == 'program_change':
                surplus_time += msg.time


        if not ignore_tail:#Keep/drop last group if not full
            if i == len(track)-1: #Check if end of track is reached, and if it is appends the current group to the total group 
                grouped_messages.append(current_group)
                
    if force_even:
        if not len(grouped_messages) % 2 == 0:
            del grouped_messages[-1]
    return grouped_messages
#grouped_notes = group_by_nr_notes(mid.tracks[1], 10, ignore_tail = True, force_even = True)
#print(grouped_notes)

In [82]:
root = 'maestro-v2.0.0'

def split_files(root): #Goes thorugh the attached csv file to make the train, test and validation splits
    df = pd.read_csv(root + '/maestro-v2.0.0.csv')
    train = []
    val = []
    test = []
    for i in range(len(df)):
        row = df.iloc[i]
        filepath = row[4]
        if row[2] == 'train':
            train.append(filepath)
        elif row[2] == 'validation':
            val.append(filepath)
        elif row[2] == 'test':
            test.append(filepath)
    print('Train length: {}, Validation length: {}, Test length: {}'.format(len(train), len(val), len(test)))
    return train, val, test

train_files, val_files, test_files = split_files(root)
def split_groups(root, nr_notes, train_files, val_files, test_files):
    train = []
    train_labels = []
    val = []
    val_labels = []
    test = []
    test_labels = []
    for file in train_files:
        mid = load_midi(root + '/' + file)
        grouped_notes = group_by_nr_notes(mid.tracks[1], nr_notes, ignore_tail = True, force_even = True)
        i = 0
        for group in grouped_notes:
            i += 1
            if not i % 2 == 0:
                train.append(group)
            else:
                train_labels.append(group)
    for file in val_files:
        mid = load_midi(root + '/' + file)
        grouped_notes = group_by_nr_notes(mid.tracks[1], nr_notes, ignore_tail = True, force_even = True)
        i = 0
        for group in grouped_notes:
            i += 1
            if not i % 2 == 0:
                val.append(group)
            else:
                val_labels.append(group)
    for file in test_files:
        mid = load_midi(root + '/' + file)
        grouped_notes = group_by_nr_notes(mid.tracks[1], nr_notes, ignore_tail = True, force_even = True)
        i = 0
        for group in grouped_notes:
            i += 1
            if not i % 2 == 0:
                test.append(group)
            else:
                test_labels.append(group)
    return train, train_labels, val, val_labels, test, test_labels

train, train_labels, val, val_labels, test, test_labels = split_groups(root, 100, train_files, val_files, test_files)


Train length: 967, Validation length: 137, Test length: 178


In [83]:
folds = [train, train_labels, val, val_labels, test, test_labels] 
fold_names = ['train.txt', 'train_labels.txt', 'val.txt', 'val_labels.txt', 'test.txt', 'test_labels.txt']
def write_to_txt(folds,fold_names):
    for i in range(6):
        with open(root + '/Data/' + fold_names[i], 'w') as f:
            for group in folds[i]:
                for i,msg in enumerate(group):
                    f.write(str(msg.note) + '_' + str(msg.velocity) + '_' + str(msg.time))
                    if not i == 99:
                        f.write(',')
                    else:
                        f.write('\n')
            f.close()
write_to_txt(folds,fold_names)


In [86]:
def read_txt_files(data_folder):
    folds = []
    for file in sorted(os.listdir(data_folder)):
        if not file.split('.')[1] == 'txt': #Read only txt files
            continue
        with open(data_folder + '/' + file,'r') as f: #Read groups one at a time
            groups = []
            for line in f:
                group = []
                for event in line.split(','):
                    event = event.split('_')
                    note = event[0]
                    vel = event[1]
                    time = event[2]
                    pitch_class = note_to_pitchclass(int(note),pitch_classes)
                    attr = [int(note), int(vel), int(time)]
                    attr.extend(pitch_class)
                    group.append(attr)
                groups.append(group)
        folds.append(groups)
    return folds
folds = read_txt_files(root + '/Data')


In [87]:
train = np.array(folds[2])
train_labels = np.array(folds[3])
val = np.array(folds[4])
val_labels = np.array(folds[5])
test = np.array(folds[0])
test_labels = np.array(folds[1])

train_dataset = TensorDataset(torch.from_numpy(train),torch.from_numpy(train_labels))
val_dataset = TensorDataset(torch.from_numpy(val),torch.from_numpy(val_labels))
test_dataset = TensorDataset(torch.from_numpy(test),torch.from_numpy(test_labels))


In [88]:
def array_to_midi(data,root,midi_name):
    os.chdir(root)
    if not os.path.exists('Output'): #Create output directory
        os.makedirs('Output')
    os.chdir(root + '/Output')
    
    mid = mido.MidiFile()
    mid.ticks_per_beat = 384
    track0 = mido.MidiTrack()
    track1 = mido.MidiTrack()

    track0.append(mido.MetaMessage('set_tempo', tempo = 500000, time = 0)) # Meta information like tempo and time signature assigned to track 0
    track0.append(mido.MetaMessage('time_signature', numerator = 4, denominator = 4, clocks_per_click = 24, notated_32nd_notes_per_beat = 8, time = 0))
    track0.append(mido.MetaMessage('end_of_track', time = 1))
    mid.tracks.append(track0)
    track1.append(mido.Message('program_change', channel = 0, program = 0, time = 0))
    
    for group in data: #Write notes to track 1
        for i, row in enumerate(group):
            note = row[0]
            vel = row[1]
            time = row[2]
            track1.append(mido.Message('note_on', channel = 0, note = note, velocity = vel, time = time))
    track1.append(mido.MetaMessage('end_of_track', time = 1))
    
    mid.tracks.append(track1)

    mid.save(midi_name)
    os.chdir(root)
    
array_to_midi(train[0:1], 'C:/Users/damaf/Documents/Python Scripts/D7047E/maestro-v2.0.0', 'test.mid')

    
    