In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mido  # importe la bibliothèque MidO qui gère aussi RtMidi
import time  # importe le module Time Python
import pretty_midi
from tqdm.auto import tqdm
pd.set_option('display.max_rows',150)
import _pickle as pickle
import os
def save(file,name, folder = ""):
    if folder != "":
        outfile = open('./'+folder+'/'+name+'.pickle', 'wb')
    else:
        outfile = open(name+'.pickle', 'wb')
    pickle.dump(file, outfile)
    outfile.close
    
def load(name, folder = ""):
    if folder != "":
        outfile = open('./'+folder+'/'+name+'.pickle', 'rb')
    else:
        outfile = open(name+'.pickle', 'rb')
    file = pickle.load(outfile)
    outfile.close
    return file

import random
import gc

from tf_transformers2 import *
from tensorflow.keras.layers import Input, Dense, Dropout, TimeDistributed, LSTM
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

In [2]:
def play(mid):
    try:
        port = mido.open_output('Microsoft GS Wavetable Synth 0')

        # chemin absolu vers le fichier .mid, ici "blackvelvet.mid"


        # affiche chemin fichier Midi + son type + nb de pistes + nb de messages dans fichier
        print("=>", mid, "...\n... ...")

        # calcul + affiche la durée de lecture du fichier Midi en h:m:s
        print("=> Durée de lecture =", time.strftime('%Hh:%Mm:%Ss', time.gmtime(mid.length)))
        print("=> Lecture en cours...")

        for msg in mid.play():  
            port.send(msg)

        port.close()  # ferme proprement le port Midi
        print("=> Fichier MIDI lu... ARRÊT !")
    except:
        print('interrupted')
        port.close()
        
def play_from_pretty(mid):
    mid.write('temp.mid')
    mid1 = mido.MidiFile('temp.mid')
    play(mid1)

In [None]:
play(mid.tracks[0])

In [None]:
mid = pretty_midi.PrettyMIDI('./data/0/009count.mid')

In [None]:
def inst_to_seq(inst):
    
    inst_code = inst.program
    inst_type_code = inst.is_drum*1
    inst_name = inst.name
    ins = []
    inst_type = []
    name = []
    t = []
    d = []
    p = []
    v = []
    
    
    for note in inst.notes:
        name.append(inst_name)
        ins.append(inst_code)
        inst_type.append(inst_type_code)
        t.append(note.start)
        d.append(-note.start + note.end)
        p.append(note.pitch)
        v.append(note.velocity)
        
    ins = np.array(ins)
    inst_type = np.array(inst_type)
    tim = np.array(t)
    num = np.array([d, p, v]).T
    return ins, inst_type, tim, num, name
    
def mid_to_seq(mid):
    
    ins = []
    ins_type = []
    tim = []
    num = []
    name = []
    
    for inst in mid.instruments:
        i, it, t, n, na = inst_to_seq(inst)
        
        ins.append(i)
        ins_type.append(it)
        tim.append(t)
        num.append(n)
        name.append(na)
    
    ins = np.concatenate(ins).astype(int)
    ins_type = np.concatenate(ins_type).astype(float)
    tim = np.concatenate(tim).astype(float)
    num = np.concatenate(num).astype(float)
    name = np.concatenate(name).astype(str)
    
    sort_index = np.argsort(tim)
    
    ins = ins[sort_index]
    ins_type = ins_type[sort_index]
    tim = tim[sort_index]
    num = num[sort_index]
    name = name[sort_index]
    
    seq = {
        'instruments' : ins,
        'is_drum' : ins_type,
        'time' : tim,
        'num' : num,
        'name' : name
    }
    return seq

def seq_to_mid(seq):
    ins, ins_type, tim, num, name = seq['instruments'], seq['is_drum'], seq['time'], seq['num'], seq['name']
    
    instru = np.unique(ins)
    
    mid = pretty_midi.PrettyMIDI()
    for i in instru:
        is_drum = (np.unique(ins_type[ins == i])[0] == 1)
        inst_name = (np.unique(name)[0] == 1)
        
        inst = pretty_midi.Instrument(program=i, is_drum = is_drum, name = inst_name)
        
        time_start = tim[ins == i]
        duration = num[ins == i, 0]
        pitch = num[ins == i, 1]
        velocity = num[ins == i, 2]
        
        for j, t in enumerate(time_start):
            note = pretty_midi.Note(velocity=int(round(velocity[j], 0)), pitch=int(round(pitch[j], 0)), start=t, end=t +duration[j])
            inst.notes.append(note)
        mid.instruments.append(inst)
    return mid

In [None]:
seq = mid_to_seq(mid)

In [None]:
mid1 = seq_to_mid(seq)

In [None]:
play_from_pretty(mid)

## Data Prep

In [None]:
path_file = []
for path, subdirs, files in os.walk('./data'):
    for name in files:
        path_file.append(os.path.join(path, name))
#         print(os.path.join(path, name))

In [None]:
path_file_clean  =[]
for elt in path_file:
    if elt[-3:].lower() == 'mid':
        path_file_clean.append(elt.replace('\\', '/'))

In [None]:
path_file_clean

In [None]:
batch = []
not_added  =[]
count= 0
count_not_added = 0
for elt in tqdm(path_file_clean):
    try:
        mid = pretty_midi.PrettyMIDI(elt)
        seq = mid_to_seq(mid)
        batch.append(seq)
        if len(batch) == 500:
            save(batch, './batch/batch_'+str(count))
            count+=1
            batch = []
    except:
        not_added.append(elt)
        print(elt)
        print(count_not_added)
        count_not_added+=1
        
save(batch, './batch/batch_'+str(count))      

## Batch_loading

In [None]:
%%time
batch = load('batch_0', 'batch')

In [None]:
batch[0]

In [3]:
def build_sequence(x, max_len = 256, start = True):
    inst  = x['instruments']
    drum = x['is_drum']
    tim = x['time']
    num = x['num']
    
    size = len(inst)
    
    
    sort_indices = np.argsort(tim)
    
    for elt in np.unique(tim):
        indices_to_resort = sort_indices[tim == elt]
        
        s = np.argsort(inst[indices_to_resort])
        indices_to_resort = indices_to_resort[s]
        sort_indices[tim == elt] = indices_to_resort
    inst = inst[sort_indices]
    drum = drum[sort_indices]
    num = num[sort_indices]
    
    
    if size < max_len + 10:
        inst = np.concatenate([inst, np.zeros(max_len+10)+154])
        drum = np.concatenate([drum, np.zeros(max_len+10)])
        tim = np.concatenate([tim, np.zeros(max_len+10)])
        num = np.concatenate([num, np.zeros((max_len+10, 3))])
    
    if start == True:
        start_tok = 0
        first = 150
    else:
        try:
            start_tok = random.randint(10, size - max_len -10)
            first = 151
        except:
            start_tok = 0
            first = 150
    
    unique_inst = np.unique(inst)
    
    seq = [first]
    for elt in unique_inst:
        seq.append(elt)
    seq.append(152)
    
    size1 = len(seq)
    
    seq = np.array(seq)
    
    seq = np.concatenate([seq, inst[start_tok : start_tok + max_len + 1 - size1]]).astype(int)
    seq_drum = np.concatenate([np.zeros(size1), drum[start_tok : start_tok + max_len + 1 - size1]]).astype(int)
    seq_tim = np.concatenate([np.zeros(size1), tim[start_tok : start_tok + max_len + 1 - size1]]).astype('float')
    seq_num = np.concatenate([np.zeros((size1,3)), num[start_tok : start_tok + max_len + 1 - size1,:]]).astype('float')
    
    return seq, seq_drum, seq_tim, seq_num
    

In [None]:
seq, seq_drum, seq_tim, seq_num = build_sequence(batch[0], max_len = 256, start = True)

In [None]:
seq

In [None]:
seq_tim

In [None]:
seq[seq_tim == seq_tim[25]]

In [None]:
seq_num.shape

In [4]:
from tensorflow.keras.utils import Sequence

class Generator(Sequence):
    def __init__(self,file_list, batch_size = 32, start_rate = 0.2, max_len = 256, model_inst = None, model_delta = None):
        self.file_list = file_list
        self.batch_size = batch_size
        self.start_rate = start_rate
        self.max_len = max_len
        self.model_inst = model_inst
        self.model_delta = model_delta
                 
                 
    def __len__(self):
        return int(10000000)
    
    def __getitem__(self, idx):
        gc.collect()
        file = random.choice(self.file_list).split('.')[0]
        total_batch = load(file, 'batch')
        
        batch = np.random.choice(total_batch, size = self.batch_size)
        
        del total_batch
        gc.collect()
        
        seq_inst = []
        seq_drum = []
        seq_time = []
        seq_num = []
        
        for seq in batch:
            
            r = random.uniform(0,1)
            if r < self.start_rate:
                start = True
            else:
                start = False
            
            si, sd, st, sn = build_sequence(seq, max_len = self.max_len, start = start)
#             print(st.shape)
            seq_inst.append(si)
            seq_drum.append(sd)
            seq_time.append(st)
            seq_num.append(sn)
        
        del batch
        gc.collect()
        
        seq_inst = np.array(seq_inst)
        seq_drum = np.array(seq_drum)
        seq_time = np.array(seq_time)
        delta_time = seq_time[:, :-1] - seq_time[:,1:]
#         print(delta_time.shape)
#         print(seq_time.shape)
        seq_time = seq_time.reshape((seq_time.shape[0], seq_time.shape[1], 1))
#         delta_time = delta_time.reshape((delta_time.shape[0], delta_time.shape[1], 1))
        seq_num = np.array(seq_num)
        
        X = [seq_inst[:,:-1], seq_drum[:,:-1],seq_time[:,:-1],seq_num[:,:-1]]
        y = [seq_inst[:,1:], seq_drum[:,1:],delta_time,seq_num[:,1:,0],seq_num[:,1:,1],seq_num[:,1:,2]]
                 
        if self.model_inst == None:
            return X, y[0]
        
        elif (self.model_inst != None) & (self.model_delta == None):
            pred_ind = self.model_inst.predict(X)
            pred_ind = np.argmax(pred_ind, axis = -1)
            X.append(pred_ind)
            return X, y[2]
        else:
            pred_ind = self.model_inst.predict(X)
            pred_delta = self.model_delta.predict(X)
            pred_ind = np.argmax(pred_ind, axis = -1)
            pred_delta = np.argmax(pred_delta, axis = -1)
            X.append(pred_ind)
            X.append(pred_delta)
            return X, y[3:]
    

In [None]:
file_list = os.listdir('./batch')[:-1]

In [None]:
gen = Generator(file_list, batch_size = 4, start_rate = 0.2, max_len = 256)

In [None]:
%%time

X, y = gen[1]

In [None]:
y[0]

In [None]:
np.unique(X[0])

In [5]:
class GPTDecoder(tf.keras.layers.Layer):    
    def __init__(self, num_layers, d_model, num_heads, dff,
               maximum_position_encoding, num_types = 2, rate=0.1, bidirectional_decoder = False):
        super(GPTDecoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers
        
        
        self.embedding = tf.keras.layers.Embedding(155, d_model)
        self.drum_embedding = tf.keras.layers.Embedding(2, d_model)
        
        self.time_encoding = tf.keras.layers.Dense(d_model, activation = 'relu')
        
        self.num_encoding = tf.keras.layers.Dense(d_model, activation = 'relu')
        self.delta_encoding = tf.keras.layers.Dense(d_model, activation = 'relu')
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
        
        self.conc = tf.keras.layers.Concatenate()
        self.agg = tf.keras.layers.Dense(d_model, activation = 'relu')
        
        self.dec_layers = [GPTDecoderLayer(d_model, num_heads, dff, rate) 
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)
        
        self.bidirectional_decoder = bidirectional_decoder
    
    def call(self, x, training = True, drum_ids = None, time_ids = None, num_ids = None, pred_x = None, pred_delta = None):

        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        if self.bidirectional_decoder == False:
            look_ahead_mask = create_look_ahead_mask(tf.shape(x)[1])
            dec_target_padding_mask = create_padding_mask(x, pad_token = 154)
            mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
        else:
            mask = create_padding_mask(x, pad_token = 154)
        
        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        
#         c = [x]
#         c.append(self.time_encoding(time_ids))
#         c.append(self.drum_embedding(drum_ids))
#         c.append(self.num_encoding(num_ids))
        
        
#         if pred_x is not None:
#             c.append(self.embedding(pred_x))
#         if pred_delta:
#             c.append(self.delta_encoding(x))
        
#         x = self.conc(c)
#         x = self.agg(x)
        
        x = self.dropout(x, training=training)
        

        for i in range(self.num_layers):
            x, block1 = self.dec_layers[i](x, training, look_ahead_mask = mask)

            attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
#            attention_weights['decoder_layer{}_block2'.format(i+1)] = block2

        # x.shape == (batch_size, target_seq_len, d_model)
        return x, attention_weights  

In [6]:
decoder_ind = GPTDecoder(num_layers = 2, d_model = 256, num_heads = 8, dff = 512,
                   maximum_position_encoding = 1024, num_types = 2, rate=0.1, bidirectional_decoder = True)

In [7]:
max_len = 512
inputs_inst = Input(shape = (max_len,))
drum_ids = Input(shape = ( max_len,))
time_ids = Input(shape = ( max_len,1))
num_ids = Input(shape = ( max_len,3))

inputs = [inputs_inst, drum_ids, time_ids, num_ids]

encoded, _ = decoder_ind(inputs_inst, training = True, drum_ids = drum_ids, time_ids = time_ids, num_ids = num_ids)

output_inst = tf.keras.layers.Dense(156, name = 'inst')(encoded)
# output_drum = tf.keras.layers.Dense(2, activation = 'softmax', name = 'drum')(encoded)
# output_time = tf.keras.layers.Dense(1, activation = 'linear', name = 'time')(encoded)
# output_duration = tf.keras.layers.Dense(1, activation = 'linear', name = 'duration')(encoded)
# output_pitch = tf.keras.layers.Dense(130, activation = 'softmax', name = 'pitch')(encoded)
# output_velocity = tf.keras.layers.Dense(130, activation = 'softmax', name = 'velocity')(encoded)


# outputs = [output_inst]

model_ind = Model(inputs, output_inst)

In [8]:
model_ind.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 512)]        0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 512)]        0                                            
__________________________________________________________________________________________________
input_4 (InputLayer)            [(None, 512, 3)]     0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            [(None, 512, 1)]     0                                            
______________________________________________________________________________________________

In [9]:
from tensorflow.keras.optimizers import Adam, SGD

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
                    from_logits=True, reduction='none')

losses = [loss_object]

loss_classif     =  losses# find the right loss for multi-class classification
optimizer        =  Adam(3e-5, 1e-8) # find the right optimizer
metrics_classif  =  ['accuracy']

model_ind.compile(loss=loss_classif,
              optimizer=optimizer,
              metrics=metrics_classif)

In [10]:
train_list = os.listdir('./batch')[:-1]
test_list = [ os.listdir('./batch')[-1]]
## Test_set
gen = Generator(test_list, batch_size = 512, start_rate = 0.2, max_len = max_len)
x_test, y_test = gen[0]
gen = Generator(train_list, batch_size = 32, start_rate = 0.2, max_len = max_len)


from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ReduceLROnPlateau

early = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=4, verbose=1, 
                                                mode='auto', restore_best_weights=True)
reduce = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, verbose=1, 
                                                     mode='auto', min_delta=0.0001, cooldown=0, min_lr=0)

# bs = 128
n_epochs = 10
steps_per_epoch = 100
#, batch_size=bs

with tf.device('/GPU:0'):
    history = model_ind.fit(gen,  epochs=n_epochs,steps_per_epoch = steps_per_epoch, validation_data=(x_test,  y_test))#, callbacks = [early, reduce])


  ...
    to  
  ['...']
Train for 100 steps, validate on 512 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
 10/100 [==>...........................] - ETA: 6:36 - loss: 2.3197 - accuracy: 0.5621

KeyboardInterrupt: 

In [None]:
model.save_weights('./checkpoints/model_inst.h5')

In [11]:
pred = model_ind.predict(x_test, verbose = 1)



In [12]:
y_pred = np.argmax(pred, axis = -1)

In [22]:
ind = 6

In [23]:
y_pred[ind]

array([  0,  50,   0,  24,   1,  38,  60,  40,  40,  32,  50,  32,  32,
        38,   0,  50,   0,   0,  40,  50,  45,  25,  27,  24,  40,  24,
        40,  32,  60,   1,  19,  71,  24,  25,  24,   0,  24,  24,  32,
        32,  24,  24,  25,  40,  32,  24,  25,  25,  25,  25,  24,  25,
        40,  40,  40,  40,   0,  24,   0,  50,  25,  25,  25,  25,   1,
        24,  26,  26,  43,  32,  24,   0,  32,  26,  26,  48,  48,  32,
        50,  48,  32,  50,  58,  58,   0,   0,   0,  50,  40,  24,   0,
        32,  25,  32,  25,  24,  40,   1,  32,  40,   1,  32,   1,   1,
        24,   1,  24,  32,   1,   1,  32,  26,  26,  26,  26,   1,  32,
        40,  26,  26, 154,  25,  33,   1,  40,  16,  40,  40,  40,  40,
        25,  25,  48,   1,  25,  25,  25,  26,  26,  40,   0,  16,  26,
        40,  40,  40,  50,   1,  48,  24,  48,   1,  25,  25,   0,  24,
        26,  45,  50,   1,  32,  16,  40,  25,   0,  45,  24,  24,  50,
        48,  48,  24,  16,  26,   0,   0,  24,  26,  24,  32,  1

In [25]:
x_test[0][ind]

array([151,  11,  32,  45,  50,  72, 152,  50,  72,  32,  45,  72,  32,
        45,  72,  45,  72,  32,  32,  45,  72,  32,  45,  72,  32,  32,
        32,  32,  45,  72,  32,  45,  72,  32,  45,  72,  45,  72,  32,
        32,  32,  45,  72,  32,  32,  45,  72,  32,  32,  32,  45,  72,
        32,  32,  32,  45,  72,  45,  72,  45,  72,  32,  32,  32,  45,
        72,  32,  45,  72,  32,  45,  72,  32,  32,  32,  45,  72,  32,
        45,  72,  32,  45,  72,  45,  72,  32,  32,  45,  72,  45,  72,
        32,  32,  32,  32,  32,  45,  50,  32,  45,  50,  32,  45,  50,
        45,  50,  32,  32,  45,  50,  32,  45,  50,  32,  45,  50,  32,
        32,  32,  45,  50,  32,  45,  50,  32,  45,  50,  45,  50,  32,
        32,  32,  45,  50,  32,  32,  45,  50,  32,  32,  32,  45,  50,
        32,  32,  32,  45,  50,  45,  50,  45,  50,  32,  32,  32,  45,
        50,  32,  45,  50,  32,  45,  50,  32,  32,  32,  45,  50,  45,
        50,  45,  50,  45,  50,  32,  32,  45,  50,  45,  50,  4

In [None]:
y_test[ind]

In [None]:
gen = Generator(file_list, batch_size = 4, start_rate = 0.2, max_len = max_len, model_inst = model_ind)


In [None]:
%%time

X, y = gen[1]