#  MAESTRO Data Preparation

## Installations

install
1. muspy & pypianoroll via pip install after activating conda environment folder 
2. fluidsynth via conda install -c conda-forge fluidsynth

In [None]:
from IPython.display import clear_output, Audio, display
from ipywidgets import interact, IntSlider

import os
import os.path
import random
import datetime
import json

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn
from tqdm import tqdm # valuebar for iterations

In [None]:
import muspy
import pypianoroll
import pretty_midi

In [None]:
muspy.download_musescore_soundfont() # if you didn't do it already 

In [None]:
muspy.download_bravura_font() # if you didn't do it already 

## Load data

In [None]:
MAESTRO = muspy.datasets.MAESTRODatasetV3("./unprepared_data/MAESTRO", download_and_extract=True)

## Prepare Piano-rolls

In [None]:
# set values
dataset_root = "./unprepared_data/MAESTRO/maestro-v3.0.0/all_midi"

n_pitches = 6*12  # number of pitches
lowest_pitch = 2*12  # MIDI note number of the lowest pitch
beat_resolution = 4 # temporal resolution of a beat (in timestep), 24 in data, 12 for MusiGAN
bars_per_instance = 12 # number of bars per instance in prepared data 

# for later..
bar_resolution = 4 * beat_resolution
sample_size = 4 * bars_per_instance # number of beats per instance created by track-cropping, 4 bars for MusiGAN

In [None]:
# helper function

def plot_pianoroll(pr):
    """
    pr: Pitches x Time
    """
    
    fig = plt.figure(figsize = (15, 6))
    plt.imshow(pr.T)
    plt.show()

In [None]:
# helper function

def inter_bar_var(splits, std_thr = 0.04) :
    """
        computes the inter-bar standard deviation
    """
    
    reshaped = np.reshape(splits, (splits.shape[0], bars_per_instance, bar_resolution, n_pitches))
    inter_bar_std_dev = np.mean(np.std(splits, axis = 1, ddof = 1), -1) # std over bars
    inter_bar_std_dev_mask = (inter_bar_std_dev >= std_thr)
    
    return(splits[inter_bar_std_dev_mask],  inter_bar_std_dev)

In [None]:
# helper function

def qualified_note_rate(pianoroll, qnr_thr) :
    """
        ratio of "qualified" notes,
        defined as a qnr_thr blips/timesteps or longer. 
        
        also called:
            QN = "qualified note rate"
    """
    minimum_length = qnr_thr # blips
    conv = np.array([-1, 1]) # used to measure note start and ends

    total_notes       = 0
    total_quali_notes = 0
    for pitch_line in pianoroll.T:
        note_starts = np.convolve(pitch_line, conv)
        note_stops  = np.convolve(pitch_line, -conv)
        start_indices = np.where(note_starts == -1)[0]
        stop_indices  = np.where(note_stops == -1)[0]
            
        note_lengths     = stop_indices - start_indices
        note_count       = note_lengths.shape[0]
        quali_note_count = np.sum(note_lengths >= minimum_length)
        total_notes       += note_count
        total_quali_notes += quali_note_count

    quali_note_ratio = total_quali_notes / total_notes
    
    return quali_note_ratio

In [None]:
def prepare_multitrack(multitrack, label, track_idx = 1, qnr_thr = 0.4, std_thr = 0.03):
    
    crop_list = []
    crop_label_list = []
    inter_bar_stds = []
    
    # check if enough notes longer than the resolution & downsample, otherwise stop
    pianoroll = multitrack.tracks[track_idx].pianoroll # get pianolroll
    resolution = multitrack.resolution
    qnr = qualified_note_rate(pianoroll > 0, resolution // beat_resolution)
    
    if qnr < qnr_thr:
        return(crop_list, crop_label_list, qnr, inter_bar_stds)
    
    else:
        multitrack = multitrack.set_resolution(beat_resolution, rounding = "round")
    
        # select piano trck (2nd one)
        piano_track = multitrack.tracks[track_idx]
    
        # pad into mupliples of bars such that later piano can be splitted into bars easily
        piano_track = piano_track.pad_to_multiple(bar_resolution)
    
        # array conversion (shape: time x pitches)
        pianoroll = piano_track.pianoroll
    
        # binarize pianoroll
        pianoroll = (pianoroll > 0)
    
        # fix pitch range
        pianoroll = pianoroll[:, lowest_pitch : lowest_pitch + n_pitches] # (shape: time x pitches))
    
        # slice into pieces of sequences without empty bars
        reshaped = pianoroll.reshape(-1, bar_resolution, n_pitches)
        reshaped = pianoroll.reshape(-1, bar_resolution * n_pitches)
        empty_bars_mask = np.any(reshaped, axis = 1)
        split_after_this_bar_mask = (np.diff(empty_bars_mask) != 0)
        split_after_this_bar_numbers = np.arange(len(empty_bars_mask) - 1)[split_after_this_bar_mask]
        split_indices = ((split_after_this_bar_numbers + 1) * bar_resolution) 
        split_list = np.split(pianoroll, split_indices)
    
        # crop each split into smaller training samples
        start = np.any(split_list[0]) # = 0 if first split empty, else 1
        for split in split_list[1 - start::2]: # only select non-empty slices
            n_timesteps = bars_per_instance * bar_resolution # time steps per instance
            if split.shape[0] >= n_timesteps: # crop only what is at least as big as one crop should be
                split = split[ : split.shape[0] - (split.shape[0] % n_timesteps), :] # make sure: number of total timesteps of track % n_timesteps == 0, else skip last bar
                splits = split.reshape((-1, n_timesteps, n_pitches)) # crops x time x pitches
                
                # avoid too repetitive splits
                good_splits, inter_bar_std_dev = inter_bar_var(splits, std_thr)
                inter_bar_stds.append(inter_bar_std_dev)
                
                # append splits & labels
                crop_list.append(good_splits)
                crop_label_list.extend([label] * good_splits.shape[0]) # append as many labels as crops added
    
        return(crop_list, crop_label_list, qnr, inter_bar_stds)

In [None]:
# iterate over all the songs in the ID list & prepare data

data_list = []
data_label_list = []
qnrs = []
inter_bar_stds_list = []

midi_file_path = "./unprepared_data/MAESTRO/maestro-v3.0.0/all_midi"
label = "classical"
n_midi_files = 0

for path in tqdm(os.listdir(midi_file_path)):
    midi_filepath = midi_file_path + "/" + path
    
    # sort out files with non single 4/4 time signature
    pretty_midi_file = pretty_midi.PrettyMIDI(midi_filepath)
    time_signatures = pretty_midi_file.time_signature_changes
    if len(time_signatures) > 1 or (time_signatures[0].numerator != time_signatures[0].denominator != 4):
        continue
    
    multitrack = pypianoroll.read(midi_filepath)
    
    # some files are corrupt or not compatible with pypianroll..
    """
    try:
        multitrack = pypianoroll.read(midi_filepath)
    except:
        continue
        
    if len(multitrack.tracks) == 0:
        continue
    """
 
    split_list, split_label_list, qnr, inter_bar_stds = prepare_multitrack(multitrack, label, track_idx = 0)
    data_list.extend(split_list)
    data_label_list.extend(split_label_list)
    qnrs.append(qnr)
    inter_bar_stds_list.extend(inter_bar_stds)
    n_midi_files += 1
    
    """
    if n_midi_files > 25:
        break
    """

qnrs_array = np.array(qnrs)
print("qnrs:", np.mean(qnrs_array), np.std(qnrs_array))
inter_bar_stds_array = np.concatenate(inter_bar_stds_list)
print("ibstds:", np.mean(inter_bar_stds_array), np.std(inter_bar_stds_array))       
        
data_array = np.concatenate(data_list, axis = 0) # (shape: n_instances x n_timesteps x n_pitches)
label_array = np.array(data_label_list)
print(f"Successfully collected {len(data_array)} samples from {n_midi_files} songs")
print(f"Data shape : {data_array.shape}")

In [None]:
for i in range(10):
    plot_pianoroll(data_array[i+40])

In [None]:
# save prepared data

## create unique file directory to save data
timestamp = datetime.datetime.now()
file_directory = f"./prepared_data/maestro_{timestamp}"
os.makedirs(file_directory)
os.makedirs(file_directory + "/audio_examples") # for later..

## save preparation parameters as json file
prep_pars_dict = {"n_pitches": n_pitches,
                 "lowest_pitch": lowest_pitch,
                 "beat_resolution": beat_resolution, 
                  "beats_per_instance": sample_size}
with open(file_directory + "/preparation_params.json", "w") as file:
    json.dump(prep_pars_dict, file, indent = 6)

## save data as compressed npz files
np.savez_compressed(file_directory + "/prepared_arrays.npz", data=data_array, labels=label_array)

## Evaluate Prepared Data

In [None]:
# load data

loaded_file_directory = file_directory # "./prepared_data/maestro_full_12bars" # change if wanted
loaded_data = np.load(loaded_file_directory + "/prepared_arrays.npz")
loaded_data_array, loaded_label_array = loaded_data["data"], loaded_data["labels"]

In [None]:
len(loaded_label_array)

In [None]:
# plot total pitch range 

n_pitches_array = loaded_data_array.sum(axis = 1).sum(axis = 0)
plt.plot(np.arange(72), n_pitches_array, "ro")
plt.ylabel("# of pitches")
plt.xlabel("pitches")
# plt.yscale("log")

In [None]:
# convert random instances of loaded data to wave (audio) file & display them

n = 5 # number or random examples
rand_idxs = np.random.randint(0, len(loaded_label_array), n)

for i in tqdm(rand_idxs):
    X, y = loaded_data_array[i, :, :], loaded_label_array[i]
    
    genre_of_X = label
    
    X_padded = np.pad(X, ((0, 0), (lowest_pitch, 128 - lowest_pitch - n_pitches))) # complete pitch range
    X_music = muspy.from_pianoroll_representation(X_padded > 0, 
                resolution = beat_resolution, encode_velocity = False) # convert to muspy.music_object

    X_timestamp = datetime.datetime.now()
    muspy.write_audio(path = loaded_file_directory + f"/audio_examples/{genre_of_X}_{X_timestamp}.wav", 
                      music = X_music) 
    
    # display audio & show pianoroll
    print(genre_of_X + ":")
    display(Audio(filename = loaded_file_directory + f"/audio_examples/{genre_of_X}_{X_timestamp}.wav"))
    muspy.visualization.show_pianoroll(X_music)
    plt.show()

In [None]:
# convert into pytorch dataset & dataloader
batch_size = 15

# convert to pytorch tensor
data_tensor = torch.as_tensor(loaded_data_array, dtype=torch.float32)
label_tensor = torch.as_tensor(loaded_label_array, dtype=torch.int)

# create pytorch dataset & dataloader
dataset = torch.utils.data.TensorDataset(data_tensor, label_tensor)
lpd5_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size)