In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from text_preprocessing import create_songs_for, preprocess_texts
from Midi_preprocessing import preprocess_midi, encode_midi
import nltk
nltk.download('punkt')

In [None]:
songs = create_songs_for()

text_tokenizer, text_sequences, embedding_matrix = preprocess_texts([song['lyrics'] for song in songs], 200)
midi_tokenizer, midi_sequences = preprocess_midi([song['midi_file'] for song in songs], 300, 1)

In [None]:
from sklearn.model_selection import train_test_split
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(midi_sequences, text_sequences, test_size=0.2)

# Show length
print(len(input_tensor_train), len(target_tensor_train), len(input_tensor_val), len(target_tensor_val))



In [None]:
BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 32
steps_per_epoch = len(input_tensor_train)//BATCH_SIZE
steps_per_epoch_val = len(input_tensor_val)//BATCH_SIZE
embedding_dim = 256
units = 512
vocab_inp_size = len(midi_tokenizer.word_index)+1
vocab_tar_size = len(text_tokenizer.word_index)+1

In [None]:
import tensorflow as tf
dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

valid_dataset = tf.data.Dataset.from_tensor_slices((input_tensor_val, target_tensor_val)).shuffle(len(input_tensor_val))
valid_dataset = valid_dataset.batch(BATCH_SIZE, drop_remainder=True)

In [None]:
from Seq2SeqAttention import Encoder, BahdanauAttention, Decoder

encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)

attention_layer = BahdanauAttention(50)

decoder = Decoder(vocab_tar_size, embedding_dim, embedding_matrix, units, BATCH_SIZE)

optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

In [None]:
@tf.function
def train_step(inp, targ, enc_hidden):
    loss = 0

    with tf.GradientTape() as tape:
        enc_output, enc_hidden = encoder(inp, enc_hidden)
    
        dec_hidden = enc_hidden
    
        dec_input = tf.expand_dims([text_tokenizer.word_index['<start>']] * BATCH_SIZE, 1)
    
        # Teacher forcing - feeding the target as the next input
        for t in range(1, targ.shape[1]):
            # passing enc_output to the decoder
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)

            loss += loss_function(targ[:, t], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(targ[:, t], 1)
    
    batch_loss = (loss / int(targ.shape[1]))
    
    variables = encoder.trainable_variables + decoder.trainable_variables
    
    gradients = tape.gradient(loss, variables)
    
    optimizer.apply_gradients(zip(gradients, variables))
    
    return batch_loss

In [None]:
@tf.function
def valid_step(inp, targ, enc_hidden):
    loss = 0
    
    enc_output, enc_hidden = encoder(inp, enc_hidden)
    
    dec_hidden = enc_hidden
    
    dec_input = tf.expand_dims([text_tokenizer.word_index['<start>']] * BATCH_SIZE, 1)
    
      # Teacher forcing - feeding the target as the next input
    for t in range(1, targ.shape[1]):
        # passing enc_output to the decoder
        predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
    
        loss += loss_function(targ[:, t], predictions)
    
        # using teacher forcing
        dec_input = tf.expand_dims(targ[:, t], 1)
    
    batch_loss = (loss / int(targ.shape[1]))
    
    return batch_loss

In [None]:
import time
EPOCHS = 50

for epoch in range(EPOCHS):
    start = time.time()

    enc_hidden = encoder.initialize_hidden_state()
    train_total_loss = 0
    val_total_loss = 0

    for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = train_step(inp, targ, enc_hidden)
        train_total_loss += batch_loss

    # saving (checkpoint) the model every 2 epochs
    if (epoch + 1) % 2 == 0:
        checkpoint.save(file_prefix = checkpoint_prefix)
    
    for (batch, (inp, targ)) in enumerate(valid_dataset.take(steps_per_epoch_val)):
        batch_loss = valid_step(inp, targ, enc_hidden)
        val_total_loss += batch_loss 
    print('Epoch {} Loss {:.4f}'.format(epoch + 1,
                                        train_total_loss / steps_per_epoch))
    print('Validation Epoch {} Loss {:.4f}'.format(epoch + 1,
                                        val_total_loss / steps_per_epoch_val))
    print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))


In [None]:
import random
import numpy as np
def evaluate(sentence, initial_word):
    inputs = tf.convert_to_tensor([sentence])

    result = ''

    hidden = [tf.zeros((1, units))]
    enc_out, enc_hidden = encoder(inputs, hidden)

    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([text_tokenizer.word_index[initial_word]], 0)

    for t in range(200):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        random_index = random.randint(0,1)
        softmax_values = scipy.special.softmax(predictions[0])
        predicted_id = np.where(predictions[0].numpy() == np.random.choice(predictions[0], p=softmax_values))[0][0]

        result += text_tokenizer.index_word[predicted_id] + ' '

        if text_tokenizer.index_word[predicted_id] == 'eos':
            return result, sentence, attention_plot

        # the predicted ID is fed back into the model
        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence


def translate(sentence, initial_word):
    result, sentence = evaluate(sentence, initial_word)

    print('Input: %s' % (sentence))
    print('Predicted translation: {}'.format(result))

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

test_files = create_songs_for(train=False)

test_onlyfiles = [file['midi_file'] for file in test_files]
print(test_onlyfiles)
encoded_testmidis = encode_midi(test_onlyfiles)

test_sequenecs = pad_sequences(midi_tokenizer.texts_to_sequences(encoded_testmidis), maxlen=300)

In [None]:
for index, song in enumerate(test_sequenecs):
    for word in ['hello', 'beautiful', 'world']:
        print(test_onlyfiles[index], word, ':')
        print(translate(song, word))