In [189]:
import numpy as np
import pandas as pd
import pretty_midi
import matplotlib.pyplot as plt

from utils import plot_piano_roll

In [190]:
# TODO:
# group according to bar length
# do hidden state concat or not in decoder bottom lstm cell
# MAKE DIMENSION OF X : 128 + 2

In [191]:
# CONFIG.YAML --NOT SAVED YET--

# variable declarations

TEMPO = 120.  # 1 sec = 2 beats
SIXTEENTH_NOTE_BEATS = 0.25  # 1 16th note = 0.25 beats
SIXTEENTH_NOTE_LEN = SIXTEENTH_NOTE_BEATS / (TEMPO / 60.)  # 1 16th note = 0.125 sec
ONE_BAR_LEN = SIXTEENTH_NOTE_LEN * 16  # 16 16th notes = 1 bar = 2 sec
FOUR_BAR_LEN = ONE_BAR_LEN * 4  # 16-bar = 32 sec

EVENT_SIZE = 64

In [192]:
# UTILS.PY --SAVED--

def midi_notes2notes_df(notes):

    prev_note = notes[0]

    processed_notes = []
    for temp_note in notes:
        pitch = temp_note.pitch
        duration = temp_note.end - temp_note.start
        step = temp_note.start - prev_note.start
        prev_note = temp_note

        processed_notes.append({'pitch':pitch,'duration':duration,'step':step})

    notes_df = pd.DataFrame.from_dict(processed_notes)

    return notes_df


def notesdf2midi_notes(notes_df):

    recovered_midi_notes = []
    current_step = 0.0
    for _,row in notes_df.iterrows():
        note_duration = row['duration']
        current_step = current_step + row['step']
        recovered_midi_notes.append(pretty_midi.Note(velocity=100,pitch=int(row['pitch']),start=current_step,end=current_step+note_duration))
    
    return recovered_midi_notes


# reconstruction of recovered notes
def pred_df2midi_file(notes_df):
    recovered_notes = notesdf2midi_notes(notes_df.iloc[:64])

    pm = pretty_midi.Instrument(program=0,is_drum=False)
    pm.notes = recovered_notes

    recovered_midi_file = pretty_midi.PrettyMIDI(initial_tempo=120.)
    recovered_midi_file.instruments = [pm]
    recovered_midi_file.time_signature_changes = [pretty_midi.TimeSignature(4,4,0.0)]
    recovered_midi_file.write('../data/processed/mini_guitar.mid')

    return

def midi_data2tensor(midi_data):
    notes = midi_data.instruments[0].notes
    notes_df = midi_notes2notes_df(notes)
    split_indices = np.arange(start=EVENT_SIZE,stop=len(notes_df),step=EVENT_SIZE)
    single_batch_of_events = np.stack(np.split(notes_df.values,split_indices,axis=0)[:-1],axis=0)
    return single_batch_of_events

In [349]:
# MODEL.PY --SAVED--

from tensorflow.keras.layers import Bidirectional,LSTM,Dense,LSTMCell
from tensorflow.keras import Input

class MusicVAE_Encoder(tf.keras.Model):
    def __init__(self,latent_dim):
        super().__init__()
        self.latent_dim = latent_dim
        self.encoder_lstm_1 = Bidirectional(LSTM(units=512,return_sequences=True))
        self.encoder_lstm_2 = Bidirectional(LSTM(units=512,return_sequences=False))
        self.mu_dense = Dense(units=latent_dim)
        self.rho_dense = Dense(units=latent_dim)

    def call(self,x):

        x = self.encoder_lstm_1(x)
        last_h = self.encoder_lstm_2(x)
        mu = self.mu_dense(last_h)
        rho = self.rho_dense(last_h)

        return mu,rho


class MusicVAE_Decoder(tf.keras.Model):
    def __init__(self,conductor_len):
        super().__init__()
        self.conductor_len = conductor_len
        self.conductor_dense = Dense(units=512,activation='tanh')
        self.conductor_lstm_1 = LSTM(units=512,return_sequences=True)
        self.conductor_lstm_2 = LSTM(units=256,return_sequences=True)
        self.bottom_lstm_input_dense = Dense(units=128,activation='tanh')
        self.bottom_lstm_cell = LSTMCell(units=128)
        self.bottom_lstm_pitch_dense = Dense(units=128)
        self.bottom_lstm_duration_dense = Dense(units=1)
        self.bottom_lstm_step_dense = Dense(units=1)

    def call(self,z,x,teacher_forcing=True): # teacher_forcing = False when prediction

        # CONDUCTOR RNN
        conductor_rnn_h0 = self.conductor_dense(z)
        batch_size,_ = conductor_rnn_h0.shape
        conductor_input = tf.zeros(shape=(batch_size,self.conductor_len,1))
        conductor_output = self.conductor_lstm_1(inputs=conductor_input, initial_state=[conductor_rnn_h0, conductor_rnn_h0])
        conductor_output = self.conductor_lstm_2(conductor_output)
        bottom_input = self.bottom_lstm_input_dense(conductor_output) 

        # BOTTOM RNN
        total_seq_len = x.shape[1]
        subseq_len = int(total_seq_len/self.conductor_len)
        
        # model predictions
        global_pitch_pred = None
        global_duration_pred = None
        global_step_pred = None

        temp_pred = tf.zeros_like(x[:, 0, :])
        for subsec_idx in range(self.conductor_len):
            bottom_rnn_h0 = bottom_input[:,subsec_idx,:]
            subseq_x = x[:,subsec_idx*subseq_len:(subsec_idx+1)*subseq_len,:]

            h_next,c_next = None,None
            for j in range(0, subseq_x.shape[1]):
                if j == 0:
                    if teacher_forcing:
                        _, (h_n, c_n) = self.bottom_lstm_cell(inputs=tf.zeros_like(subseq_x[:, j, :]), states=[bottom_rnn_h0,bottom_rnn_h0])
                    else:
                        _, (h_n, c_n) = self.bottom_lstm_cell(inputs=temp_pred, states=[bottom_rnn_h0,bottom_rnn_h0])
                    h_next,c_next = h_n, c_n
                    # 3 different outputs from model
                    temp_pred_pitch = self.bottom_lstm_pitch_dense(h_next)
                    temp_pred_duration = self.bottom_lstm_duration_dense(h_next)
                    temp_pred_step = self.bottom_lstm_step_dense(h_next)
                    # prepare next step's input by merging these 3 different outputs
                    # think about normalization more here
                    temp_pred = tf.concat([tf.cast(tf.expand_dims(tf.argmax(temp_pred_pitch,axis=1),axis=1)/128,dtype=tf.float32),temp_pred_duration,temp_pred_step],axis=1)
                    
                    # save each step's 3 different outputs for loss computation
                    if subsec_idx==0:
                        global_pitch_pred = tf.expand_dims(temp_pred_pitch,axis=1)
                        global_duration_pred = tf.expand_dims(temp_pred_duration,axis=1)
                        global_step_pred = tf.expand_dims(temp_pred_step,axis=1)
                    else:
                        global_pitch_pred = tf.concat([global_pitch_pred,tf.expand_dims(temp_pred_pitch,axis=1)],axis=1)
                        global_duration_pred = tf.concat([global_duration_pred,tf.expand_dims(temp_pred_duration,axis=1)],axis=1)
                        global_step_pred = tf.concat([global_step_pred,tf.expand_dims(temp_pred_step,axis=1)],axis=1)


                else:
                    if teacher_forcing:
                        _, (h_n, c_n) = self.bottom_lstm_cell(inputs=subseq_x[:, j-1, :], states=[h_next,c_next])
                    else:
                        _, (h_n, c_n) = self.bottom_lstm_cell(inputs=temp_pred, states=[h_next,c_next])
                    h_next,c_next = h_n, c_n
                    # 3 different outputs from model
                    temp_pred_pitch = self.bottom_lstm_pitch_dense(h_next)
                    temp_pred_duration = self.bottom_lstm_duration_dense(h_next)
                    temp_pred_step = self.bottom_lstm_step_dense(h_next)
                    # prepare next step's input by merging these 3 different outputs
                    # think about normalization more here
                    temp_pred = tf.concat([tf.cast(tf.expand_dims(tf.argmax(temp_pred_pitch,axis=1),axis=1)/128,dtype=tf.float32),temp_pred_duration,temp_pred_step],axis=1)

                    # save each step's 3 different outputs for loss computation
                    global_pitch_pred = tf.concat([global_pitch_pred,tf.expand_dims(temp_pred_pitch,axis=1)],axis=1)
                    global_duration_pred = tf.concat([global_duration_pred,tf.expand_dims(temp_pred_duration,axis=1)],axis=1)
                    global_step_pred = tf.concat([global_step_pred,tf.expand_dims(temp_pred_step,axis=1)],axis=1)

        return global_pitch_pred,global_duration_pred,global_step_pred



class MusicVAE(tf.keras.Model):
    def __init__(self,latent_dim,conductor_len,teacher_forcing):
        super().__init__()
        self.latent_dim = latent_dim
        self.conductor_len = conductor_len
        self.teacher_forcing = teacher_forcing
        self.encoder = MusicVAE_Encoder(latent_dim)
        self.decoder = MusicVAE_Decoder(conductor_len)

    def call(self,input_seq):
        z_mu,z_rho = self.encoder(input_seq)

        epsilon = tf.random.normal(shape=z_mu.shape,mean=0.0,stddev=1.0)
        z = z_mu + tf.math.softplus(z_rho) * epsilon

        global_pitch_pred,global_duration_pred,global_step_pred = self.decoder(z,input_seq,self.teacher_forcing)

        return z_mu,z_rho,global_pitch_pred,global_duration_pred,global_step_pred

In [195]:
# DATASET.PY --NOT SAVED YET-- or preprocessing.py outside model definition

import glob
midi_list = glob.glob("../data/maestro-v3.0.0/2018/*.midi")

complete_batch_of_events = None

for midi_file in midi_list:
    midi_data = pretty_midi.PrettyMIDI(midi_file)
    single_batch_of_events = midi_data2tensor(midi_data)
    if complete_batch_of_events is None:
        complete_batch_of_events = single_batch_of_events
    else:
        complete_batch_of_events= np.concatenate((complete_batch_of_events,single_batch_of_events),axis=0)

print(complete_batch_of_events.shape)

KeyboardInterrupt: 

In [None]:
import tensorflow as tf
train_ds = tf.data.Dataset.from_tensor_slices(complete_batch_of_events)
train_ds = train_ds.shuffle(1000).batch(6)
train_ds = train_ds.prefetch(tf.data.AUTOTUNE)

In [None]:
for temp_ds in train_ds:
    break

In [350]:
latent_dim = 256
CONDUCTOR_LEN = 4

model = MusicVAE(latent_dim=latent_dim,conductor_len=CONDUCTOR_LEN,teacher_forcing=True)

z_mu,z_rho,global_pitch_pred,global_duration_pred,global_step_pred = model(temp_ds)

In [355]:
global_step_pred.shape

TensorShape([6, 64, 1])