## IMPORT LIBRARIES

In [1]:
import os
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN
import IPython.display as ipd
import json
import pretty_midi
import fluidsynth
from google.cloud import storage
from io import BytesIO
import pickle

## PATH TO SAMPLE, TRAINING AND TEST DATA

In [2]:
MUSIC_TEST_OUTPUT = 'output'
MUSIC_SAMPLE_PATH = 'sample'
JSON_DATA_FILE = 'processed_data.json'
META_DATA_FILE = 'meta_data.json'
BUCKET_NAME = "bucket-piano-data"
MUSIC_TRAIN_PATH = 'gs://{}/train'.format(BUCKET_NAME)
MUSIC_TEST_PATH = 'gs://{}/{}'.format(BUCKET_NAME, META_DATA_FILE)
MUSIC_VALIDATION_PATH = 'gs://{}/validation'.format(BUCKET_NAME)
META_DATA_PATH = 'gs://{}/train'.format(BUCKET_NAME)

## MODEL PARAMETERS

In [4]:
RNN_CELL_SIZE = 128
BATCH_SIZE = 32
LEARNING_RATE = 1e-3
NUM_EPOCHS = 10
DECAY_RATE = 1e-5
LOSS_FN = 'categorical_crossentropy'
METRICS = ['accuracy']
DROPOUT_RATE = 0.2

## LOAD TRAINING DATA AND PARAMETERS FROM JSON FILE

In [5]:
storage_client = storage.Client()
bucket = storage_client.get_bucket('bucket-piano-data')

In [6]:
blob = bucket.get_blob('meta_data.json') 
meta_data = json.loads(blob.download_as_string())

In [7]:
class CustomDataGenerator(keras.utils.Sequence):
    def __init__(self, prefix, bucket, meta_data=meta_data, batch_size=BATCH_SIZE):
        self.prefix = prefix
        self.bucket = bucket
        self.batch_size = batch_size
        self.meta_data = meta_data
        self.num_samples = len(bucket.list_blobs(prefix='train'))
        
    def __len__(self):
        return (np.ceil(len(self.num_samples) / float(self.batch_size))).astype(np.int)
    
    def __getitem__(self, idx):
        batch_x = []
        batch_y = []
        start_idx = self.batch_size * idx
        for blob in bucket.list_blobs(prefix='train')[start_idx:start_idx+self.batch_size]:
            json_data = json.loads(json.loads(blob.download_as_string()))
            batch_x.append(json_data['input'])
            batch_y.append(json_data['output'])
        return np.array(batch_x), np.array(batch_y)

In [None]:
X_train = np.array(X_train)
y_train = np.array(y_train)
train_note_velocity_mean_var = train_data['note_velocities_mean_var']
train_token_to_notes = train_data['tokens_to_notes_dict']
print(X_train.shape, y_train.shape)

In [None]:
X_test = []
y_test = []
for blob in bucket.list_blobs(prefix='test'):
    json_data = json.loads(json.loads(blob.download_as_string()))
    X_train.append(json_data['input'])
    y_train.append(json_data['output'])
    
X_test = np.array(X_train)
y_test = np.array(y_train)
test_note_velocity_mean_var = train_data['note_velocities_mean_var']
test_token_to_notes = train_data['tokens_to_notes_dict']

In [None]:
NUM_UNIQUE_NOTES = train_data['num_unique_notes']
KEY_FPS = train_data['key_fps']
HOP_LENGTH = train_data['hop_length']
WINDOW_SIZE_IN_KEYS = train_data['windows_size_in_keys']
SAMPLE_RATE = KEY_FPS * HOP_LENGTH
FRAME_SIZE = WINDOW_SIZE_IN_KEYS * HOP_LENGTH
MAX_PIANO_FREQ = train_data['max_piano_freq']

In [None]:
print(X_train.shape)
print(y_train.shape)

## LOAD TEST DATA

In [None]:
test_data = None
with open(os.path.join(MUSIC_SAMPLE_PATH, JSON_DATA_FILE)) as json_data_file:
    test_data = json.loads(json.load(json_data_file))

X_test = np.array(test_data['input'])
y_test = np.array(test_data['output'])
test_note_velocity_mean_var = test_data['note_velocities_mean_var']
test_token_to_notes = test_data['tokens_to_notes_dict']

In [None]:
print(X_test.shape)
print(y_test.shape)

## CONVERT OUTPUT LABEL AT EACH TIME-STEP TO ONE-HOT ENCODING

In [None]:
def convert_label_to_one_hot(output_labels):
    output_one_hot = []
    for sample in range(output_labels.shape[0]):
        sample_output = output_labels[sample,:]
        sample_output_one_hot = np.zeros((sample_output.size, sample_output.max()+1))
        sample_output_one_hot[np.arange(sample_output.size), sample_output] = 1
        output_one_hot.append(sample_output_one_hot)
    
    return np.array(output_one_hot)

In [None]:
y_train = convert_label_to_one_hot(y_train)
y_test = convert_label_to_one_hot(y_test)

## A SINGLE BASIC RNN CELL

In [None]:
ipd.Image("images/rnn_vanilla.png")

## SPECIFY MODEL ARCHITECTURE

In [None]:
model = Sequential()
model.add(SimpleRNN(RNN_CELL_SIZE, activation='relu', return_sequences=True, input_shape=(None, MAX_PIANO_FREQ), stateful=True,
                   batch_input_shape=(BATCH_SIZE, None, MAX_PIANO_FREQ)))
# model.add(Dropout(DROPOUT_RATE))
model.add(Dense(NUM_UNIQUE_NOTES, activation='softmax'))
model.build()
model.summary()

## CHOOSE OPTIMIZER AND COMPILE MODEL

In [None]:
optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE, decay=DECAY_RATE)
model.compile(loss=LOSS_FN, optimizer=optimizer, metrics=METRICS)

## TRAIN MODEL

In [None]:
model.fit(X_train, y_train, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE)

## EVALUATE MODEL AND MAKE PREDICTION

In [None]:
scores = model.evaluate(X_test, y_test, verbose=0)
print("Accuracy on test data: %.2f%%" % (scores[1]*100))

In [None]:
y_predict = model.predict(X_test)
print(y_predict.shape)

## CONVERT PREDICTION TO MIDI

In [None]:
def convert_prediction_to_midi(indx, output, token_to_notes=test_token_to_notes, note_vel_mean_var=test_note_velocity_mean_var):
    assert (output.shape[-1] == NUM_UNIQUE_NOTES)
    notes_seq = [token_to_notes[str(x)] for x in output.argmax(1)]
    instrument = pretty_midi.Instrument(0, name='piano')
    note_details = {}
    pm = pretty_midi.PrettyMIDI()
    
    for index, note in enumerate(notes_seq):
        if note != '':
            notes = note.split(',')
            for i in notes:
                note_vel = np.random.normal(note_vel_mean_var[indx][0][int(i)], note_vel_mean_var[indx][1][int(i)])
                if int(i) not in note_details:
                    note_details[int(i)]=[[index], [index], [note_vel]]
                else:
                    if note_details[int(i)][1][-1]==(index-1):
                        note_details[int(i)][1][-1]=index
                    else:
                        note_details[int(i)][0].append(index)
                        note_details[int(i)][1].append(index)
                        note_details[int(i)][2].append(note_vel)

    for note in note_details:
        for start_time, end_time, note_velocity in zip(note_details[note][0], note_details[note][1], note_details[note][2]):
            pretty_note = pretty_midi.Note(
                velocity=int(note_velocity),
                pitch=note,
                start=start_time/KEY_FPS,
                end=end_time/KEY_FPS
            )
            instrument.notes.append(pretty_note)
    
    instrument.notes.sort(key=lambda x: x.start)
    pm.instruments.append(instrument)
    return pm

In [None]:
file_indx = 0
pmidi = convert_prediction_to_midi(file_indx, y_predict[file_indx])
pmidi.write(os.path.join(MUSIC_TEST_OUTPUT, test_data['filenames'][file_indx]+'.midi'))