In [2]:
# import fluidsynth
import glob
import numpy as np
import pandas as pd
import pretty_midi
import collections
from pathlib import Path
from IPython import display

import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn

%matplotlib inline
from matplotlib import pyplot as plt

# scrape time

In [162]:
import requests
from bs4 import BeautifulSoup
_datadir = Path('./data/classical')
def get_artist_link(tag):
    if tag.name != 'a':
        return False
    if tag.parent.name != 'td':
        return False
    if not tag.parent.has_attr('class'):
        return False
    if tag.parent.attrs['class'][0] == 'midi':
        return True
    return False

def get_files(tag):
    if tag.name != 'a':
        return False
    if tag.parent.name != 'td':
        return False
    if not tag.parent.has_attr('class'):
        return False
    if tag.parent.attrs['class'][0] != 'midi':
        return False
    if tag.has_attr('class'):
        return False
    href = tag.attrs['href']
    if href.split('/')[0] != 'midis':
        return False
    return True

root_url = 'http://www.piano-midi.de/'
base_url = f'{root_url}/midi_files.htm'
r = requests.get(base_url)
soup = BeautifulSoup(r.content, 'html.parser')
composer_page_link = soup.find_all(get_artist_link)
for composer_link in composer_page_link:
    composer_url = f'{root_url}/{composer_link.attrs["href"]}'
    r = requests.get(composer_url)
    soup = BeautifulSoup(r.content, 'html.parser') 
    file_tags = soup.find_all(get_files)
    for file_tag in file_tags:
        href = file_tag.attrs['href']
        filename = href.split('/')[-1]
        file_url = f'{root_url}/{href}'
        r = requests.get(file_url)
        with open(_datadir / filename, 'wb') as f:
            f.write(r.content)
        print(f'{filename} downloaded')

alb_esp1.mid downloaded
alb_esp2.mid downloaded
alb_esp3.mid downloaded
alb_esp4.mid downloaded
alb_esp5.mid downloaded
alb_esp6.mid downloaded
alb_se1.mid downloaded
alb_se2.mid downloaded
alb_se3.mid downloaded
alb_se4.mid downloaded
alb_se5.mid downloaded
alb_se6.mid downloaded
alb_se7.mid downloaded
alb_se8.mid downloaded
bach_846.mid downloaded
bach_847.mid downloaded
bach_850.mid downloaded
islamei.mid downloaded
beethoven_opus10_1.mid downloaded
beethoven_opus10_2.mid downloaded
beethoven_opus10_3.mid downloaded
pathetique_1.mid downloaded
pathetique_2.mid downloaded
pathetique_3.mid downloaded
beethoven_opus22_1.mid downloaded
beethoven_opus22_2.mid downloaded
beethoven_opus22_3.mid downloaded
beethoven_opus22_4.mid downloaded
mond_1.mid downloaded
mond_2.mid downloaded
mond_3.mid downloaded
waldstein_1.mid downloaded
waldstein_2.mid downloaded
waldstein_3.mid downloaded
appass_1.mid downloaded
appass_2.mid downloaded
appass_3.mid downloaded
beethoven_les_adieux_1.mid downloade

# load metadata

In [4]:
_datadir = Path('./data/classical')
_metadata_file = _datadir / 'metadata.csv'

if not _metadata_file.exists():
    files = collections.defaultdict(list)
    for filepath in _datadir.glob('*.mid'):
        files['file'].append(str(filepath))
        composer = filepath.stem.split('_')[0]
        files['composer'].append(composer)

        pm = pretty_midi.PrettyMIDI(str(filepath))
        files['end_time'].append(pm.get_end_time())
        
        
        tempos, probabilities = pm.estimate_tempi()
        assert np.isclose(sum(probabilities), 1)
        tempo_bpm = np.dot(tempos, probabilities) # expected tempo in beats/min
        seconds_per_beat = (1/tempo_bpm)*60
        time_sig_denom = pm.time_signature_changes[0].denominator
        note_dist = seconds_per_beat / (16 / time_sig_denom)
        
        files['expected_tempo'].append(tempo_bpm)
        files['16th_note_duration'].append(note_dist)
        roll = midi_to_pianoroll(str(filepath), sample_dist=note_dist)
        files['roll_length'].append(roll.shape[1])

    df_meta = pd.DataFrame({ key: np.asarray(val) for key, val in files.items() })
    df_meta.to_csv(_metadata_file, index=False)
else:
    df_meta = pd.read_csv(_metadata_file)
    
    
df_meta.head()

Unnamed: 0,file,composer,end_time,expected_tempo,16th_note_duration,roll_length
0,data/classical/beethoven_opus22_1.mid,beethoven,399.624004,164.101873,0.091407,4371
1,data/classical/schub_d760_4.mid,schub,206.069698,240.133084,0.062465,3285
2,data/classical/mz_330_3.mid,mz,482.101321,189.367179,0.079211,6086
3,data/classical/beethoven_les_adieux_1.mid,beethoven,341.950341,214.732513,0.069854,4895
4,data/classical/burg_spinnerlied.mid,burg,102.871024,277.682898,0.054018,1904


# converting midi to audio

In [36]:
# Sampling rate for audio playback
_SAMPLING_RATE = 16000

def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
  waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
  # Take a sample of the generated waveform to mitigate kernel resets
  waveform_short = waveform[:seconds*_SAMPLING_RATE]
  return display.Audio(waveform_short, rate=_SAMPLING_RATE)

# converting midi to pianoroll

In [74]:
def midi_to_pianoroll(file, sample_dist=0.02):
    pm = pretty_midi.PrettyMIDI(file)
    sampling_rate = 1/sample_dist
    piano_roll = pm.get_piano_roll(fs=sampling_rate)
    return piano_roll

# define dataset and dataloader

In [22]:
midi_dir.is_dir()

True

In [3]:
def get_preprocessed_files(processed_data_dir):

    preprocessed_data = collections.defaultdict(list)
    for midi_dir in processed_data_dir.glob('*/'):
        if not midi_dir.is_dir():
            continue
        piece_name = midi_dir.name
        chord_file_name = f'{piece_name}_right_C_full.npy'
        data_file = midi_dir / chord_file_name
        if not data_file.exists():
            print(data_file)
            raise Exception('shit is fucked!!')

        # record piece name and the path of the file containing the full chord data
        preprocessed_data['right_full_chord_file'].append(str(data_file))
        preprocessed_data['piece_name'].append(piece_name)

        # compute the size of the file after 
        chord_roll = np.load(data_file)
        preprocessed_data['chord_roll_size'].append(chord_roll.shape[0])

    df_preprocess = pd.DataFrame({ key: np.asarray(val) for key, val in preprocessed_data.items() })
    return df_preprocess

processed_data_dir = Path('./data/chopin_processed_bin')
df_preprocess = get_preprocessed_files(processed_data_dir)

In [35]:
class FullChord(Dataset):
    
    def __init__(self, df_meta: pd.DataFrame, seq_length: int = 25, 
                 batch_size: int = 20, batch_per_file=None):
        self.df_meta = df_meta.copy()

        self.df_meta['n_batches'] = (self.df_meta['chord_roll_size'] - seq_length )*0.95 // batch_size
        self.df_meta['n_batches'] = self.df_meta['n_batches'].astype(int)
        file_idx_ends = []
        n_batches = self.df_meta['n_batches'].values
        file_idx_ends = [n_batches[0]*batch_size - 1]
        for batches_in_file in n_batches[1:]:
            file_idx_ends.append(batches_in_file*batch_size + file_idx_ends[-1] )

        self.df_meta['file_idx_ends'] = file_idx_ends

        
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.batch_per_file = batch_per_file

        if self.batch_per_file is not None:
            self.idx_per_file = self.batch_size*self.batch_per_file
        else:
            self.idx_per_file = None
        
    def __len__(self):
        if self.batch_per_file is None:
            return int(self.df_meta['n_batches'].sum()*self.batch_size)
        else:
            return self.df_meta.shape[0]*self.idx_per_file
    
    def __getitem__(self, idx):
        if self.batch_per_file is None:
            file_idx_ends = self.df_meta['file_idx_ends'].values
            for i in range(len(file_idx_ends)):
                if idx < file_idx_ends[i]:
                    file_idx = i
                    break
            if file_idx == 0:
                idx_start = 0
            else:
                idx_start = file_idx_ends[file_idx - 1]
            window_idx = int(idx - idx_start)
        else:
            file_idx = idx // self.idx_per_file
            window_idx = idx % self.idx_per_file
        
        
        seq, label = self.get_rolls(file_idx, window_idx)
        
        seq = torch.from_numpy(seq).float()
        label = torch.from_numpy(label).float()
        return seq, label
    
    def get_rolls(self, file_idx, window_idx):
        file_path = self.df_meta.iloc[file_idx]['right_full_chord_file']
        
        roll = np.load(file_path)
        roll_window = roll[window_idx:window_idx+self.seq_length+1, :]
        
        seq = roll_window[:-1]
        label = roll_window[-1]
        return seq, label
    
    
class ChordLSTM(nn.Module):
    def __init__(self, vocab_size=1848):
        super(ChordLSTM, self).__init__()
            
        hidden_size = vocab_size // 8
            
        self.lstm = nn.LSTM(input_size=vocab_size, batch_first=True, num_layers=1, hidden_size=hidden_size)
        
        self.predict_layer = nn.Sequential(
            nn.Linear(hidden_size, vocab_size),
            nn.Softmax(dim=2)
        )
    def forward(self, x):
        output, (h_n, c_n) = self.lstm(x)
         
        output = self.predict_layer(h_n)
        
        return output

In [45]:
seq_length = 100
learning_rate = 1e-3
batch_size = 8
num_workers = 0

processed_data_dir = Path('./data/chopin_processed_bin')
df_preprocess = get_preprocessed_files(processed_data_dir)

rng = np.random.default_rng(12345)
idx = np.arange(df_preprocess.shape[0])
n_train = int(0.8*idx.shape[0])
train_idx = rng.choice(idx, size=n_train, replace=False)
test_idx = idx[~np.in1d(idx, train_idx)]
df_train = df_preprocess.iloc[train_idx]
df_test = df_preprocess.iloc[test_idx]

dset_train = FullChord(df_meta=df_train, 
                    batch_size=batch_size,
                    batch_per_file=None,
                    seq_length=seq_length)


dset_test = FullChord(df_meta=df_test, 
                    batch_size=batch_size, 
                    batch_per_file=20,
                    seq_length=seq_length)


train_dataloader = DataLoader(dset_train, batch_size=batch_size, shuffle=True, num_workers=0)
test_dataloader = DataLoader(dset_test, batch_size=batch_size, shuffle=False, num_workers=0)

model = ChordLSTM()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

metrics = collections.defaultdict(list)

iter_idx = -1
train_iterator = iter(train_dataloader)

train_losses = []

while iter_idx < 20:
    iter_idx += 1
    print(f'iter_idx = {iter_idx}', flush=True)

    try:
        features, labels = next(train_iterator)
    except StopIteration:
        train_iterator = iter(train_dataloader)
        features, labels = next(train_iterator)


    # compute prediction and loss
    pred = model(features)[0, :, :]
    loss = loss_fn(pred, labels)

    # backprop
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    train_losses.append(loss.item())


    # compute metrics every 10 iterations
    if iter_idx > 0 and iter_idx % 5 == 0:

        metrics['iter'].append(iter_idx)

        # compute train loss
        train_loss = np.mean(np.asarray(train_losses))
        metrics['train_loss'].append(train_loss)
        train_losses = []


        # test loop
        test_loss_fn = nn.CrossEntropyLoss()
        test_loss = 0
        frames_correct = 0
        num_batches = len(test_dataloader)
        model.eval()
        with torch.no_grad():
            for features, labels in test_dataloader:
                pred = model(features)[0, :, :]
                test_loss += test_loss_fn(pred, labels).item()

                pred_chords = pred.argmax(axis=1)
                label_chords = labels.argmax(axis=1) 
                equal = torch.eq(pred_chords, label_chords)
                frames_correct += torch.sum(equal)
        model.train()

        frac_frames_correct = frames_correct / (num_batches*batch_size)
        avg_test_loss = test_loss / num_batches

        metrics['test_loss'].append(avg_test_loss)
        metrics['frac_frames_correct'].append(frac_frames_correct)

        # save metrics
        df_metrics = pd.DataFrame({ key: np.asarray(val) for key, val in metrics.items() })
        # df_metrics.to_csv(metrics_file)

        for key, val in metrics.items():
            print(f'iter_idx={iter_idx}, {key}={val[-1]}')

        # save model
        # state_file = output_dir / f'model_weights_iter{iter_idx}.pth'
        # torch.save(model.state_dict(), state_file)

iter_idx = 0
iter_idx = 1
iter_idx = 2
iter_idx = 3
iter_idx = 4
iter_idx = 5
iter_idx=5, iter=5
iter_idx=5, train_loss=7.521861394246419
iter_idx=5, test_loss=7.52184399843216
iter_idx=5, frac_frames_correct=0.0
iter_idx = 6
iter_idx = 7
iter_idx = 8
iter_idx = 9
iter_idx = 10
iter_idx=10, iter=10
iter_idx=10, train_loss=7.52182788848877
iter_idx=10, test_loss=7.521803941726684
iter_idx=10, frac_frames_correct=0.19062499701976776
iter_idx = 11
iter_idx = 12
iter_idx = 13
iter_idx = 14
iter_idx = 15
iter_idx=15, iter=15
iter_idx=15, train_loss=7.521801853179932
iter_idx=15, test_loss=7.521310896873474
iter_idx=15, frac_frames_correct=0.19062499701976776
iter_idx = 16
iter_idx = 17
iter_idx = 18
iter_idx = 19
iter_idx = 20
iter_idx=20, iter=20
iter_idx=20, train_loss=7.516920948028565
iter_idx=20, test_loss=7.49155084848404
iter_idx=20, frac_frames_correct=0.19062499701976776


In [37]:
df_metrics

Unnamed: 0,iter,train_loss,test_loss,frac_frames_correct
0,5,7.521852,7.521848,0.003125
1,10,7.52185,7.521825,0.001875
2,15,7.52182,7.52177,0.19
3,20,7.521413,7.519488,0.190625


In [44]:
for features, labels in iter(test_dataloader):
    model(features).argmax(axis=2)

TypeError: 'Tensor' object is not callable