In [3]:
! pip3 install tensorflow-gpu
import tensorflow as tf
print(tf.__version__)

import os

import numpy as np

! pip3 install tqdm
from tqdm import tqdm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
2.11.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# **Parameter**

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# Original Data Info.
n_pitch = 53
n_feature = 43

# Model Parameter
input_size = n_feature
target_size = n_pitch + 4 # +4 (addition) is oneset + start correction + end correction + velocity

# Training Parameter
BATCH_SIZE = 32          # Batch Sizes
VALIDATION_RATIO = 0.1  # Validation Ratio to Input Data
learning_rate = 0.001   # Learning Rate
n_hidden = 2            # Hidden Units number

# Data Parameter
PIECE_LEN = 200          # (auto set by data) Extra +1 for START timestamp
is_onset_index = 53
start_correction_index = 54
end_correction_index = 55
velocity_index = 56

ROOT = r'/content/drive/MyDrive/ML'

In [6]:
os.chdir(ROOT)
print(os.getcwd())

/content/drive/MyDrive/ML


In [7]:
from zipfile import ZipFile
# Delete incomplete unzipped folder and run this 
if 'irealpro_dataset_v2' not in os.listdir():
    print('Creating Folder \'irealpro_dataset_v2\'')
    os.mkdir('../irealpro_dataset_v2')
    print('Extract to Folder \'irealpro_dataset_v2\'')
    with ZipFile(r"/content/drive/MyDrive/ML/irealpro_dataset_v2.zip", 'r') as zObject:
        zObject.extractall(path=r"/content/drive/MyDrive/ML/irealpro_dataset_v2")

In [8]:
np.random.seed(10000)

# **Data Preparation**

`MAX_DATA`: control how many songs files as the input (assign with -1 to use all songs)

In [9]:
MAX_DATA = 50

In [70]:
dec_init_input = np.zeros((1, target_size))
dec_init_input[0, start_correction_index] = 1.

count_pad = 0 
count_spilled = 0
def load_npy_data(x_path, y_path, offset):
    global count_pad, count_spilled, dec_init_input
    x = np.load(x_path)
    y = np.load(y_path)

    if offset == 0:
        Z_ = dec_init_input
    else:
        Z_ = y[offset-1].astype(np.float32)
        Z_ = np.expand_dims(Z_, axis=0)
        count_spilled += 1

    if x.shape[0] >= offset+PIECE_LEN:
        X_ = x[offset:offset+PIECE_LEN].astype(np.float32)
        Y_ = y[offset:offset+PIECE_LEN].astype(np.float32)
    else:
        pad_count = offset + PIECE_LEN - x.shape[0]
        X_ = np.pad(x[offset:], ((0, pad_count), (0, 0)), 'constant', constant_values=-1).astype(np.float32)
        Y_ = np.pad(y[offset:], ((0, pad_count), (0, 0)), 'constant', constant_values=-1).astype(np.float32)
        count_pad += 1
    try:
        assert X_.shape == (PIECE_LEN, input_size)
        assert Z_.shape == (1, target_size)
        assert Y_.shape == (PIECE_LEN, target_size)
    except:
        print('You got',X_.shape, Z_.shape, Y_.shape)
        raise ValueError

    return X_, Z_, Y_

def generate_dataset(input_dir: str, data_size=-1):
    # using tf.data.Dataset API to create dataset
    x_paths = [] # input path
    y_paths = [] # ans file path
    offsets = [] # starting point of a piece
    data_cnt = 0
    for file_name in os.listdir(input_dir): # 'Scan Files'
        if data_cnt==data_size-1: break
        if file_name.endswith(".ans.npy"):
            data_cnt+=1
            y_path = str(os.path.join(input_dir, file_name))
            x_path = str(os.path.join(input_dir, file_name[:-8] + ".npy"))
            assert os.path.exists(x_path), f"corresponding input file {x_path} doesn't exist"

            # split and pad data into PIECE_LEN
            y_content = np.load(y_path)
            for offset in range(0, y_content.shape[0], PIECE_LEN):
                y_paths.append(y_path)
                x_paths.append(x_path)
                offsets.append(offset)

    dataset = []
    for i in zip(x_paths, y_paths, offsets): # 'Read Files'
        dataset.append(load_npy_data(*i))
    print(len(dataset))
    dataset = list(zip(*dataset))
    # print(len(dataset[0]))
    # print(len(dataset[1]))
    # print(len(dataset[2]))
    encoder_input_data = np.array(dataset[0])
    decoder_input_data = np.array(dataset[1])
    decoder_target_data = np.array(dataset[2])
    
    perm_id = np.random.shuffle(np.arange(encoder_input_data.shape[0]))
    encoder_input_data = encoder_input_data[perm_id]
    decoder_input_data = decoder_input_data[perm_id]
    decoder_target_data = decoder_target_data[perm_id]
    
    print(encoder_input_data.shape)
    print(decoder_input_data.shape)
    print(decoder_target_data.shape)

    return encoder_input_data, decoder_input_data, decoder_target_data

encoder_input_data, decoder_input_data, decoder_target_data = generate_dataset(ROOT+"/irealpro_dataset_v2", MAX_DATA)

print('Data Padded:', count_pad)
print('Data with Non Start Decoder Input:', count_spilled)
print('Data PIECE_LEN:', PIECE_LEN)

449
(1, 449, 200, 43)
(1, 449, 1, 57)
(1, 449, 200, 57)
Data Padded: 49
Data with Non Start Decoder Input: 400
Data PIECE_LEN: 200


In [None]:
# np.save(ROOT + '/encoder_input_data.npy', encoder_input_data)
# np.save(ROOT + '/decoder_input_data.npy', decoder_input_data)
# np.save(ROOT + '/decoder_target_data.npy', decoder_target_data)

In [None]:
# encoder_input_data = np.load(ROOT + '/encoder_input_data.npy', allow_pickle=True)
# decoder_input_data = np.load(ROOT + '/decoder_input_data.npy', allow_pickle=True)
# decoder_target_data = np.load(ROOT + '/decoder_target_data.npy', allow_pickle=True)

# **Model**

## *Define Loss, Accuracy, and Optimizer*

In [40]:
# Define Loss, Accuracy, and Optimizer
def masked_loss_function(y_true, y_pred):
    mask = tf.math.not_equal(tf.reduce_sum(y_true[:,:,:n_pitch], axis=2), -1*n_pitch)  # false if it is a padding time step
    pitch_loss = tf.losses.categorical_crossentropy(y_true[:,:,:n_pitch], y_pred[:,:,:n_pitch])
    onset_loss = tf.losses.binary_crossentropy(y_true[:,:,n_pitch:n_pitch+1], y_pred[:,:,n_pitch:n_pitch+1])
    start_loss = tf.square(y_true[:,:,n_pitch+1] - y_pred[:,:,n_pitch+1])
    end_loss = tf.square(y_true[:,:,n_pitch+2] - y_pred[:,:,n_pitch+2])
    velocity_loss = tf.square(y_true[:,:,n_pitch+3] - y_pred[:,:,n_pitch+3])
    total_loss = tf.reduce_sum([pitch_loss, onset_loss, start_loss, end_loss, velocity_loss], axis=0)
    total_loss *= tf.cast(mask, total_loss.dtype)
    return tf.reduce_mean(total_loss)

def masked_accuracy(y_true, y_pred):
    mask = tf.math.not_equal(tf.reduce_sum(y_true[:,:,:n_pitch], axis=2), -1*n_pitch)  # false if it is a padding time step
    mask = tf.cast(mask, tf.float32)
    pitch_acc = tf.reduce_mean(mask * tf.metrics.categorical_accuracy(y_true[:,:,:n_pitch], y_pred[:,:,:n_pitch]))
    onset_acc = tf.reduce_mean(mask * tf.metrics.binary_accuracy(y_true[:,:,n_pitch:n_pitch+1], y_pred[:,:,n_pitch:n_pitch+1]))
    start_loss = tf.reduce_mean(mask * tf.square(y_true[:,:,n_pitch+1] - y_pred[:,:,n_pitch+1]))
    end_loss = tf.reduce_mean(mask * tf.square(y_true[:,:,n_pitch+2] - y_pred[:,:,n_pitch+2]))
    velocity_loss = tf.reduce_mean(mask * tf.square(y_true[:,:,n_pitch+3] - y_pred[:,:,n_pitch+3]))

    return pitch_acc, onset_acc, start_loss, end_loss, velocity_loss

optimizer = tf.keras.optimizers.Adam()

In [41]:
checkpoint_filepath = './checkpoint'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    )

## Reinjection Technique

In [65]:
from keras import Model
from keras import backend as K
from keras.layers import Input, Concatenate, Dense, LSTM, Embedding, GRU, Attention, Lambda, Permute, Flatten

# Encoder
encoder_inputs = Input(shape=(PIECE_LEN, input_size), name='Input_Encoder')
encoder = LSTM(n_hidden, return_sequences=True, return_state=True, name='Encoder')

encoder_outputs, state_h, state_c = encoder(encoder_inputs)
states = [state_h, state_c]

attention = Attention(n_hidden, name='Attention')

decoder_inputs = Input(shape=(1, target_size), name='Input_Decoder')
decoder_lstm = LSTM(n_hidden, return_sequences=True, return_state=True, name='Decoder')
decoder_dense = Dense(target_size, activation='softmax', name='Dense_Timestamp_Output')

all_outputs = []
all_attention = []
inputs = decoder_inputs
for _ in tqdm(range(PIECE_LEN)):
    # Attention Update
    # print(states.shape) 
    # print(encoder_outputs.shape)
    context_vector, attention_weights = attention([tf.expand_dims([state_h, state_c],1), encoder_outputs], return_attention_scores=True)
    all_attention.append(attention_weights)
    # context_vector = Permute((2, 1))(context_vector)
    # context_vector = Flatten()(context_vector)
    # print(inputs.shape)
    # print(context_vector.shape)
    context_vector_h, context_vector_c = Lambda(lambda x: x[:,0,:,:], output_shape=(1,) + context_vector.shape[2:], name='Split_Context')(context_vector)
    # print(context_vector_h.shape)
    # print(context_vector_c.shape)
    state_h = Concatenate(axis=-1, name='Concat_StateH_ContextH')([state_h,context_vector_h])
    state_c = Concatenate(axis=-1, name='Concat_StateC_ContextC')([state_c,context_vector_c])
    # Decoder Reinject
    outputs, state_h, state_c = decoder_lstm(inputs, initial_state=states)
    # Output
    outputs = decoder_dense(outputs)
    all_outputs.append(outputs)
    # Update Decoder Input & State
    inputs = outputs
    states = [state_h, state_c]

# Concatenate all predictions
decoder_outputs = Lambda(lambda x: K.concatenate(x, axis=1), name='Concat_Output')(all_outputs)

# Define and compile model as previously
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

100%|██████████| 200/200 [01:06<00:00,  3.02it/s]


In [98]:
model.compile(optimizer='rmsprop', 
              loss=masked_loss_function, 
            #   metrics={masked_accuracy},
              )

In [None]:
model.summary()
tf.keras.utils.plot_model(
    model,
    to_file='seq2seq_lstm_attention.png',
    show_shapes=True,
    show_layer_names=True,
    rankdir='LR',
    expand_nested=False,
    dpi=96,
    show_layer_activations=True
)

In [100]:
print(encoder_input_data.shape)
print(decoder_input_data.shape)
print(decoder_target_data.shape)

if encoder_input_data.shape[0] == 1:
    encoder_input_data = tf.reshape(encoder_input_data, encoder_input_data.shape[1:])
    decoder_input_data = tf.reshape(decoder_input_data, decoder_input_data.shape[1:])
    decoder_target_data = tf.reshape(decoder_target_data, decoder_target_data.shape[1:])
    print(encoder_input_data.shape)
    print(decoder_input_data.shape)
    print(decoder_target_data.shape)

inputs = tf.data.Dataset.zip(tuple([tf.data.Dataset.from_tensor_slices((encoder_input_data)), tf.data.Dataset.from_tensor_slices((decoder_input_data))]))
outputs = tf.data.Dataset.from_tensor_slices((decoder_target_data))

train_dataset = tf.data.Dataset.zip((inputs, outputs))
train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(10)

model.fit(train_dataset,
          batch_size=BATCH_SIZE,
          epochs=10,
          verbose=1,
        )

(449, 200, 43)
(449, 1, 57)
(449, 200, 57)
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7fc87a57b940>

### Predict Function

In [115]:
def predict(enc_inputs):
    # Init Decoder Input
    dec_state = np.zeros((1, target_size))
    dec_state[0, start_correction_index] = 1.
    all_outputs = []
    for enc_input in enc_inputs:
        # Format Model Input: (Encoder, Decoder)
        format_enc_input = np.expand_dims(enc_input, 0)
        format_dec_state = np.expand_dims(dec_state, 0)
        # print(format_enc_input.shape)
        # print(format_dec_state.shape)

        # Model Output
        outputs = model.predict([format_enc_input, format_dec_state])
        outputs = np.squeeze(outputs)
        # print('output:', outputs.shape)
        all_outputs.append(np.squeeze(outputs))
        dec_state = outputs[-1]
        dec_state = np.expand_dims(dec_state, 0)
        # print(dec_state.shape)
    return np.concatenate(all_outputs, axis=0)

# **Midi Format Output**

In [113]:
def generate_dataset(input_dir: str, data_size=-1, batch_per_files=True):
    # using tf.data.Dataset API to create dataset
    x_paths = [] # input path
    y_paths = [] # ans file path
    offsets = [] # starting point of a piece
    titles = []
    data_cnt = 0
    for file_name in os.listdir(input_dir): # 'Scan Files'
        if data_cnt==data_size-1: break
        if file_name.endswith(".ans.npy"):
            data_cnt+=1
            y_path = str(os.path.join(input_dir, file_name))
            x_path = str(os.path.join(input_dir, file_name[:-8] + ".npy"))
            assert os.path.exists(x_path), f"corresponding input file {x_path} doesn't exist"

            # split and pad data into PIECE_LEN
            y_content = np.load(y_path)
            for offset in range(0, y_content.shape[0], PIECE_LEN):
                y_paths.append(y_path)
                x_paths.append(x_path)
                offsets.append(offset)
                titles.append(file_name[:-8])

    paths = list(zip(x_paths, y_paths, offsets))
    i = 0
    while(i < len(paths)): # Batch per Files Handler
        dataset = []
        temp_dataset = []
        flag = False
        while(i < len(paths)):
            if not batch_per_files:
                dataset.append(load_npy_data(*paths[i]))
            else:
                offset = paths[i][2]
                if offset == 0:
                    temp_dataset = load_npy_data(*paths[i])
                    flag = True
                    i += 1
                    break
                elif flag:
                    dataset = []    # Reset Batch
                    dataset.append(temp_dataset)
                    dataset.append(load_npy_data(*paths[i]))
                else:
                    dataset.append(load_npy_data(*paths[i]))
            i += 1
        
        # Skip kickstart dataset+
        if len(dataset) == 0: continue

        # Package
        dataset = list(zip(*dataset))
        encoder_input_data = np.array(dataset[0])
        decoder_input_data = np.array(dataset[1])
        decoder_target_data = np.array(dataset[2])
        print(encoder_input_data.shape)
        print(decoder_input_data.shape)
        print(decoder_target_data.shape)

        # Shuffle
        perm_id = np.random.shuffle(np.arange(encoder_input_data.shape[0]))
        encoder_input_data = encoder_input_data[perm_id]
        decoder_input_data = decoder_input_data[perm_id]
        decoder_target_data = decoder_target_data[perm_id]

        yield encoder_input_data, decoder_input_data, decoder_target_data, titles[i-1]
    yield encoder_input_data, decoder_input_data, decoder_target_data, titles[-1]

In [118]:
# import midi_np_translation.output2midi as output2midi

for test_encoder_input_data, _, _, title in generate_dataset(ROOT+"/irealpro_dataset_v2", 5, batch_per_files=True):
    test_encoder_input_data = np.reshape(test_encoder_input_data, test_encoder_input_data.shape[1:])
    # print(test_encoder_input_data.shape)
    print(f'Generating: {title}')
    pred_result = predict(test_encoder_input_data)
    # output2midi.output_to_midi(bass_ndarr=test_result.reshape(-1,52), ref_midi_path=f"input_midi/irealpro_transposed/{title}.mid", output_path=f"{title}_al.mid")

(11, 200, 43)
(11, 1, 57)
(11, 200, 57)
Generating: I Hear Music_d1.mid
(8, 200, 43)
(8, 1, 57)
(8, 200, 57)
Generating: If I Should Lose You_d3.mid
(7, 200, 43)
(7, 1, 57)
(7, 200, 57)
Generating: If I Should Lose You_p3.mid
(7, 200, 43)
(7, 1, 57)
(7, 200, 57)
Generating: If I Should Lose You_p3.mid
Generating: If I Should Lose You_p3.mid
