In [1]:
import os
import io
import time
import pickle
import numpy as np
from numpy.random import choice, shuffle
import tqdm.notebook
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.losses import sparse_categorical_crossentropy
from tensorflow.keras.optimizers import Nadam
from keras.layers import Dense, Activation, Dropout, Input, LeakyReLU
from keras.layers import Input, Embedding, Bidirectional, LSTM, GRU
from keras.models import Model, Sequential
from keras_self_attention import SeqSelfAttention

Using TensorFlow backend.


In [2]:
with open('pickle/all_song.pickle','rb') as read_file:
    all_song= pickle.load(read_file)
with open('pickle/notes_index_dict.pickle','rb') as read_file:
    notes_index_dict= pickle.load(read_file)

# Model architecture

In [3]:
def create_model(seq_len, unique_notes, dropout=0.3, output_emb=100, rnn_unit=128, dense_unit=64):
  inputs = Input(shape=(seq_len,))
  embedding = Embedding(input_dim=unique_notes+1, output_dim=output_emb, input_length=seq_len)(inputs)
  forward_pass = Bidirectional(GRU(rnn_unit, return_sequences=True))(embedding)
  forward_pass , att_vector = SeqSelfAttention(
      return_attention=True,
      attention_activation='sigmoid', 
      attention_type=SeqSelfAttention.ATTENTION_TYPE_MUL,
      attention_width=50, 
      kernel_regularizer=tf.keras.regularizers.l2(1e-4),
      bias_regularizer=tf.keras.regularizers.l1(1e-4),
      attention_regularizer_weight=1e-4,
  )(forward_pass)
  forward_pass = Dropout(dropout)(forward_pass)
  forward_pass = Bidirectional(GRU(rnn_unit, return_sequences=True))(forward_pass)
  forward_pass , att_vector2 = SeqSelfAttention(
      return_attention=True,
      attention_activation='sigmoid', 
      attention_type=SeqSelfAttention.ATTENTION_TYPE_MUL,
      attention_width=50, 
      kernel_regularizer=tf.keras.regularizers.l2(1e-4),
      bias_regularizer=tf.keras.regularizers.l1(1e-4),
      attention_regularizer_weight=1e-4,
  )(forward_pass)
  forward_pass = Dropout(dropout)(forward_pass)
  forward_pass = Bidirectional(tf.keras.layers.GRU(rnn_unit))(forward_pass)
  forward_pass = Dropout(dropout)(forward_pass)
  forward_pass = Dense(dense_unit)(forward_pass)
  forward_pass = LeakyReLU()(forward_pass)
  outputs = Dense(unique_notes+1, activation = "softmax")(forward_pass)

  model = Model(inputs=inputs, outputs=outputs, name='generate_scores_rnn')
  return model


In [4]:
unique_notes = len(notes_index_dict)
seq_len = 50

In [5]:
model = create_model(seq_len, unique_notes)

In [6]:
model.summary()

Model: "generate_scores_rnn"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 50)]              0         
_________________________________________________________________
embedding (Embedding)        (None, 50, 100)           11601800  
_________________________________________________________________
bidirectional (Bidirectional (None, 50, 256)           176640    
_________________________________________________________________
seq_self_attention (SeqSelfA [(None, 50, 256), (None,  65537     
_________________________________________________________________
dropout (Dropout)            (None, 50, 256)           0         
_________________________________________________________________
bidirectional_1 (Bidirection (None, 50, 256)           296448    
_________________________________________________________________
seq_self_attention_1 (SeqSel [(None, 50, 256), 

# Model training

In [7]:
class TrainModel:
  
    def __init__(self, epochs, train, target, batch_nnet_size, batch_song, optimizer, loss_fn,
                total_songs, model):
        self.epochs = epochs
        self.train = train
        self.target = target
        self.batch_nnet_size = batch_nnet_size
        self.batch_song = batch_song
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.total_songs = total_songs
        self.model = model
    
    def fit(self):
        for epoch in tqdm.notebook.tqdm(range(self.epochs),desc='epochs'):
            
            # for each epochs, we shuffle the list of all the datasets
            train_target = list(zip(self.train, self.target))
            shuffle(train_target)
            self.train, self.target = zip(*train_target)
            loss_total = 0
            steps = 0
            steps_nnet = 0

            # In each epoch, iterate all songs by the size of batch_song
            for i in tqdm.notebook.tqdm(range(0,self.total_songs, self.batch_song), desc='MUSIC'):

                steps += 1
                
                input_batch = [y for x in self.train[i:i+self.batch_song] for y in x]
                output_batch = [y for x in self.target[i:i+self.batch_song] for y in x]
                inputs_nnet_large = np.array(input_batch)
                outputs_nnet_large = np.array(output_batch)

                index_shuffled = np.arange(start=0, stop=len(inputs_nnet_large))
                np.random.shuffle(index_shuffled)
                
                # process the windows/target in batches
                for nnet_steps in tqdm.notebook.tqdm(range(0,len(index_shuffled),self.batch_nnet_size)):
                    steps_nnet += 1
                    current_index = index_shuffled[nnet_steps:nnet_steps+self.batch_nnet_size]

                    inputs_nnet, outputs_nnet = inputs_nnet_large[current_index], outputs_nnet_large[current_index]

                    # To make sure no exception thrown by tensorflow on autograph
                    if len(inputs_nnet) // self.batch_nnet_size != 1:
                        break
                    loss = self.train_step(inputs_nnet, outputs_nnet)
                    loss_total += tf.math.reduce_sum(loss)
                    if steps_nnet % 20 == 0:
                        print("epochs {} | Steps {} | total loss : {}".format(epoch + 1, steps_nnet,loss_total))


    @tf.function
    def train_step(self, inputs, targets):
        with tf.GradientTape() as tape:
            prediction = self.model(inputs)
            loss = self.loss_fn(targets, prediction)
        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
        return loss

In [8]:
train = []
target = []

for song in all_song:
    train.append(song['train'])
    target.append(song['target'])

In [10]:
optimizer = Nadam()
loss_fn = sparse_categorical_crossentropy

In [1]:
seq_len = 50
EPOCHS = 10
BATCH_SONG = 100
BATCH_NNET_SIZE = 600
TOTAL_SONGS = len(all_song)

train_class = TrainModel(EPOCHS, train, target,
                  BATCH_NNET_SIZE, BATCH_SONG, optimizer, loss_fn, TOTAL_SONGS, model)

train_class.fit()

# Save the model for generation

In [None]:
model.save('all_100sb_10ep.h5')