In [8]:
import sys
sys.path.append('..')

In [None]:
import os, pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import GRU, Input, Embedding, LSTM, Dense, Concatenate, TimeDistributed, Dropout, BatchNormalization, LayerNormalization
from tensorflow.keras.models import Model
import glob
import numpy as np
from tqdm import tqdm
import config

In [10]:
use_gpu = True

if not use_gpu:
    tf.config.set_visible_devices([], 'GPU')

In [11]:
file_paths = glob.glob(os.path.join(config.MidiFiles.preprocessed_csv_files, "*.csv"))

In [12]:
def preprocess_file(df):
  df['delta_time'] = np.log1p(df['delta_time'])
  df['duration'] = np.log1p(df['duration'])
  
  df['note'] = df['pitch'] % 12
  df['octave'] = df['pitch'] // 12

  df['zero_delta_time'] = df['delta_time'] == 0
  df["delta_time"] = df["delta_time"].replace(0, pd.NA).ffill()
  df["delta_time"] = df["delta_time"].fillna(0)

  df = df.drop(columns=["pitch"])

  return df

In [13]:
pd.set_option("future.no_silent_downcasting", True)

songs = []
for p in tqdm(file_paths):
    try:
        df = pd.read_csv(p)
        df = preprocess_file(df)
        songs.append(df)
    except Exception as e:
        print("Skipping", p, e)

100%|██████████| 454/454 [00:02<00:00, 157.47it/s]


In [14]:
transpose_offset = config.LstmParameters.transpose_offset

seq_len = config.LstmParameters.seq_len
num_features = config.LstmParameters.num_features

batch_size = config.LstmParameters.batch_size
epochs = config.LstmParameters.epochs

In [15]:
def sequence_generator(songs, seq_len):
    for offset in range(transpose_offset):
        for df in songs:
            copied_df = df.copy()
            
            copied_df["note"] = (copied_df["note"] + offset) % 12

            # one-hot encoding the notes
            notes = copied_df["note"].astype(int).values
            notes_onehot = np.eye(12, dtype=np.float32)[notes]  # shape (len, 12)

            # one-hot encoding the octave
            octaves = copied_df["octave"].astype(int).values
            octaves_onehot = np.eye(10, dtype=np.float32)[octaves]  # shape (len, 10)

            # drop old note column and replace with one-hot
            copied_df = copied_df.drop(columns=["note", "octave"])
            data = np.hstack([copied_df.values.astype(np.float32), notes_onehot, octaves_onehot])
            # print(data.shape)

            # split features
            # features: delta_time, arg1-3, Control_c, Note_on_c, Program_c, Pitch_bend_c
            X_seq = data[:, :]  # all features
            for i in range(len(data) - seq_len):
                X = X_seq[i:i+seq_len]
                y = X_seq[i+seq_len]

                # Separate outputs
                y_delta = y[0:1]
                y_duration = y[1:2]
                y_zero_delta_time = y[2:3]

                # note_idx = int(y[3])
                # y_note = np.zeros(12, dtype=np.float32)
                # y_note[note_idx] = 1.0  
                y_note = y[3:15]
                y_octave = y[15:25]

                yield X, {
                    'out_delta': y_delta,
                    'out_duration': y_duration,
                    'out_zero_delta': y_zero_delta_time,
                    'out_note': y_note,
                    'out_octave': y_octave,
                }

In [16]:
dataset = tf.data.Dataset.from_generator(
    lambda: sequence_generator(songs, seq_len),
    output_signature=(
        tf.TensorSpec(shape=(seq_len, num_features), dtype=tf.float32),
        {
            'out_delta': tf.TensorSpec(shape=(1,), dtype=tf.float32),
            'out_duration': tf.TensorSpec(shape=(1,), dtype=tf.float32),
            'out_zero_delta': tf.TensorSpec(shape=(1,), dtype=tf.float32),
            'out_note': tf.TensorSpec(shape=(12,), dtype=tf.float32),
            'out_octave': tf.TensorSpec(shape=(10,), dtype=tf.float32),
        }
    )
)

dataset = dataset.shuffle(10000, seed=42).batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [17]:
from src.lstm import get_model

model = get_model()

model.summary()

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping
import re

output_path = config.MidiFiles.weights_path

checkpoint_filepath = output_path / 'lstm-{epoch:02d}-{loss:.4f}.weights.h5'

last_epoch = 0
files = [f for f in os.listdir(output_path) if f.endswith(".weights.h5")]
if files:
    # Extract epoch numbers
    epochs = [int(re.search(r"lstm-(\d+)-", f).group(1)) for f in files]
    last_epoch = max(epochs)

    # Pick last checkpoint
    last_checkpoint = [f for f in files if f"lstm-{last_epoch:02d}-" in f][0]
    last_checkpoint_path = os.path.join(output_path, last_checkpoint)

    print(f"Resuming from checkpoint: {last_checkpoint_path}")

    model.load_weights(last_checkpoint_path)

checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=False,
    save_best_only=False, 
    monitor='loss',
    mode='min',     # Mode for the monitor metric ('min' for loss, 'max' for accuracy)
    save_freq='epoch' # Save after each epoch
)

early_stopping_callback = EarlyStopping(
    monitor='loss',
    patience=5,
    restore_best_weights=True
)

history = model.fit(
    dataset, 
    epochs=epochs, 
    callbacks=[checkpoint_callback, early_stopping_callback],
    initial_epoch=last_epoch
)

## Generating data

In [16]:
import re

models = os.listdir(config.MidiFiles.weights_path)

pattern = r"lstm-(\d+)-([\d.]+)\.weights.h5"

min_loss = float('inf')
best_file = None

for filename in models:
    match = re.match(pattern, filename)
    if match:
        epoch = int(match.group(1))
        loss = float(match.group(2))
        if loss < min_loss:
            min_loss = loss
            best_file = filename

print("Best weights file:", best_file)
print("Minimum loss:", min_loss)

Best weights file: lstm-03-2.5557.weights.h5
Minimum loss: 2.5557


In [17]:
model.load_weights(config.MidiFiles.weights_path + '/' + best_file)

  saveable.load_own_variables(weights_store.get(inner_path))


In [None]:
for batch in dataset.take(1):  # take one batch
    X_seed, y_seed = batch

In [126]:
seed_sequence = X_seed[10] 
seed_sequence = tf.expand_dims(seed_sequence, 0)

In [128]:
generated_sequence = tf.identity(seed_sequence)
sequence_length = seed_sequence.shape[1]
num_steps_to_generate = 250

for _ in range(num_steps_to_generate):
    input_seq = generated_sequence[:, -sequence_length:, :]
    
    # next_pred = model(input_seq)

    pred_delta, pred_velocity, pred_duration, pred_note, pred_octave, pred_zero_delta = model(input_seq)

    # Optionally sample instead of taking raw predictions
    # For categorical note output: sample from softmax distribution
    note_probs = tf.squeeze(pred_note)  # shape (12,)
    note_index = tf.random.categorical(tf.math.log([note_probs]), 1)
    note_onehot = tf.one_hot(tf.squeeze(note_index), depth=12)

    # Concatenate all outputs into one step vector
    next_step = tf.concat([
        tf.cast(pred_delta, tf.float32),       # (batch, 1)
        tf.cast(pred_velocity, tf.float32),    # (batch, 1)
        tf.cast(pred_duration, tf.float32),    # (batch, 1)
        tf.cast(pred_octave, tf.float32),      # (batch, 1)
        tf.cast(pred_zero_delta, tf.float32),   # (batch, 1)
        tf.cast(note_onehot[tf.newaxis, :], tf.float32),  # (1, 12)
    ], axis=-1)  # shape (batch, 17)

    # Append to the sequence
    generated_sequence = tf.concat(
        [generated_sequence, next_step[:, tf.newaxis, :]], axis=1
    )


    # next_step = tf.concat([tf.cast(next_pred[0], tf.float32),
    #                        tf.cast(next_pred[1], tf.float32),
    #                        tf.cast(next_pred[2], tf.float32),
    #                        tf.cast(next_pred[3], tf.float32)], axis=-1) 
    # # next_step = next_pred[:, -1:]  # take only the last timestep

    # # Append to generated sequence
    # generated_sequence = tf.concat([generated_sequence, next_step[:, tf.newaxis, :]], axis=1)

In [130]:
seq = generated_sequence[0].numpy()  # remove batch dimension → shape (total_steps, feature_dim)
start = 0

delta_time = seq[start:, 0]
velocity = seq[start:, 1]
duration = seq[start:, 2]
octave = seq[start:, 3]
zero_delta_time = seq[start:, 4]
note_onehot = seq[start:, 5:17]  # 12 columns

In [132]:
note = np.argmax(note_onehot, axis=1)

df = pd.DataFrame({
    "delta_time": delta_time,
    "velocity": velocity,
    "duration": duration,
    "note": note,
    "octave": octave,
    "zero_delta_time": zero_delta_time,
})

In [None]:
def reverse_preprocess_file(df):
    df = df.copy()

    df['delta_time'] = np.expm1(df['delta_time']).round().astype(int)
    df['duration'] = np.expm1(df['duration']).round().astype(int)

    df["velocity"] = 127
    df["velocity"] = df["velocity"].round().astype(int)
    df["octave"] = df["octave"].round().astype(int)

    df.loc[df["zero_delta_time"] > 0.5, "delta_time"] = 0

    df['pitch'] = df['octave'] * 12 + df["note"]
    
    df.drop(["zero_delta_time", 'note', 'octave'], inplace=True, axis=1)

    df = df[['delta_time', 'pitch', 'velocity', 'duration']]

    return df


In [139]:
reversed_df = reverse_preprocess_file(df)

In [None]:
from IPython.display import display

pd.options.display.max_rows = None
display(reversed_df)

In [141]:
reversed_df.to_csv("generated_music.csv", index=False)