In [None]:
%pip install pretty_midi
%pip install tensorflow
%pip install music21
%pip uninstall fluidsynth -y
%pip install --upgrade fluidsynth 

In [None]:
import pandas as pd
import numpy as np
import pretty_midi
import pathlib
import collections
import datetime
import glob
import music21
import tensorflow as tf
import fluidsynth


# Music Gen using LSTM model

This project will use a 4 layer keras LSTM model to predict notes based on training from a famous jazz recording captures that were converted to midi.

The notebook has 3 main sections.
* Training Data Preparation
* Model Definition and fit
* Predictions based on sample input

### Helper Functions

Some conversion and database i/o functions were split into a separate python file to help readability of this notebook.  This are loaded here.

In [None]:
import my_functions

### This cell defines global constants that are used throughout the notebook 

In [None]:
# Global things used throughout the notebook

seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# length of trainig sequences
seq_length = 20

# Size of pitch vocab
vocab_size = 25

# Keys that will get extracted into the training set. This are the inputs to the model!
key_order = ['interval', 'step', 'duration', 'tempo', 'instrument_num', 'key_num']

# Songs to leave out of training
skip_list = [3, 130, 222]

# should you trip of remainders on each song by modulo seq_length
trim_song_ends = True



### This cell will load training data using a SQL query and calculate some differential values
* see my_functions.extract_notes() for the sql query to the wjazzd.db
* interval is the diffenrence in pitch between successive notes
* countour generates an abstraction based on interval

In [None]:
# This loads up all the notes in the dataset
pitchInst = my_functions.extract_notes()

# # Calc the gap between start of consecutive notes
pitchInst['step'] = pitchInst['start'] - pitchInst['start'].shift(1)
#fix problems at boundaries
pitchInst['step'].fillna((pitchInst['step'].median()), inplace=True)

# Calculate the inverval between successive notes
pitchInst['interval'] = pitchInst['pitch'] - pitchInst['pitch'].shift(1)
#fix problems at boundaries
pitchInst['interval'].fillna(0, inplace=True)

# apply a contour function
pitchInst['contour'] = pitchInst['interval'].apply(my_functions.contour)

pitchInst.head()
pitchInst['instrument'].dtype
#pitchInst['instrument_num'] = pitchInst['instrument'].cat.codes

# fix out of bound steps.  Negative steps will have the median step size
median = pitchInst['step'].median()
pitchInst.loc[pitchInst['step'] < 0.0, 'step'] = median
#binds interval to range to ensure vocab size, two octaves either direction
pitchInst['interval'] = np.clip(pitchInst['interval'], -24.0, +24.0)

### Turn to alpha numeric lables into numbers for training input
* instrument_num maps to the type of instrument used in the solo
* key_num maps to the musical key that the tune was in (Bb-maj for example)

In [None]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
pitchInst['instrument_num'] = le.fit_transform(pitchInst['instrument']).astype(float)
pitchInst['key_num'] = le.fit_transform(pitchInst['key']).astype(float)


### trim off extraneous notes from the sequences
Make sure the training set is an integer multiple of seq_length

In [None]:

if trim_song_ends:
    #REMOVE REMAINDERS FROM TRAINING SET HERE
    dfs = dict(tuple(pitchInst.groupby('melid')))

    # Empty array to build up
    train_subset = pd.DataFrame(None, columns=pitchInst.columns)
    # Loop through the solos
    for i, df in dfs.items():
        # skip the first 10 songs so they can be used for test genearation
        if (i not in skip_list):
            n = len(dfs[i])%seq_length  # leftovers
            dfs[i].drop(df.tail(n).index, inplace = True) # Drop the remnants
            train_subset = pd.concat([train_subset, dfs[i]], ignore_index=True)  # append to the set

    # Note: There is probably a way to just flatten dfs after the loop with the drops instead
    # of the repeated calls to pd.concat()


    # n_notes will be used later to build batches
    n_notes = len(train_subset)
    train_notes = np.stack([train_subset[key] for key in key_order], axis=1)
else:
    n_notes = len(pitchInst) - len(pitchInst) % seq_length
    train_notes = np.stack([pitchInst[key].head(n_notes) for key in key_order], axis=1)



train_notes[:,0] = train_notes[:,0] + 24
notes_ds = tf.data.Dataset.from_tensor_slices(train_notes)
notes_ds.element_spec

In [None]:
# from tensorFlow MusGen tutorial 
def create_sequences(
    dataset: tf.data.Dataset,
    seq_length: int,
    vocab_size: int,
) -> tf.data.Dataset:
  """Returns TF Dataset of sequence and label examples."""
  seq_length = seq_length+1

  # Take 1 extra for the labels
  windows = dataset.window(seq_length, shift=1, stride=1,
                              drop_remainder=True)

  # `flat_map` flattens the" dataset of datasets" into a dataset of tensors
  flatten = lambda x: x.batch(seq_length, drop_remainder=True)
  sequences = windows.flat_map(flatten)

  # Split the labels
  def split_labels(sequences):
    inputs = sequences[:-1]
    labels_dense = sequences[-1]
    labels = {key:labels_dense[i] for i,key in enumerate(key_order)}

    return inputs, labels

  return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
seq_ds = create_sequences(notes_ds, seq_length, vocab_size)
seq_ds.__len__

In [None]:
batch_size = 64
buffer_size = n_notes  - seq_length  # the number of items in the dataset
train_ds = (seq_ds
            .shuffle(buffer_size)
            .batch(batch_size, drop_remainder=True)
            .cache()
            .prefetch(tf.data.experimental.AUTOTUNE))

In [None]:
train_ds.element_spec

In [None]:
def mse_with_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
  mse = (y_true - y_pred) ** 2
  positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
  return tf.reduce_mean(mse + positive_pressure)

In [None]:
# definition for the inputs.  Note how num of inputs is realted to size of key_order global
input_shape = (seq_length, len(key_order))
learning_rate = 0.010

#input layer
inputs = tf.keras.Input(input_shape)
#hidden layers
x = tf.keras.layers.LSTM(64, return_sequences=True)(inputs)
#x = tf.keras.layers.Dropout(0.50, seed=seed)(x) #dropout layer
x = tf.keras.layers.LSTM(8, return_sequences=True)(x)
#x = tf.keras.layers.Dropout(0.50, seed=seed)(x) #dropout layer
x = tf.keras.layers.LSTM(8, return_sequences=False)(x) #last layer to outputs



outputs = {
  'interval': tf.keras.layers.Dense(49, activation='softmax', name='interval')(x),
  'step': tf.keras.layers.Dense(1, name='step')(x),
  'duration': tf.keras.layers.Dense(1, name='duration')(x),
}

model = tf.keras.Model(inputs, outputs)

loss = {
      'interval':tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
      'step': mse_with_positive_pressure,
      'duration': mse_with_positive_pressure,
}

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

model.compile(loss=loss, optimizer=optimizer)

model.summary()

In [None]:
# these are the three outputs of the model

model.compile(
    loss=loss,
    loss_weights={
        'interval': 1.0,
        'step': 1.0,
        'duration':1.0,
    },
    optimizer=optimizer,
)

In [None]:
model.evaluate(train_ds, return_dict=True)

In [None]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath='./training_checkpoints/ckpt_{epoch}.weights.h5',
        save_weights_only=True),
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=4,
        verbose=1,
        start_from_epoch=2,
        restore_best_weights=True),
]

In [None]:
%%time
epochs = 8

history = model.fit(
    train_ds,
    epochs=epochs,
    callbacks=callbacks,
)

In [None]:
def predict_next_note(
    notes: np.ndarray,
    init_pitch: float,
    model: tf.keras.Model,
    temperature: float = 1.0) -> tuple[int, float, float]:
  """Generates a note as a tuple of (pitch, step, duration), using a trained sequence model."""

  assert temperature > 0

  # Add batch dimension
  inputs = tf.expand_dims(notes, 0)

  predictions = model.predict(inputs)
  interval_logits = predictions['interval']
  step = predictions['step']
  duration = predictions['duration']

  interval_logits /= temperature 
  interval = tf.random.categorical(interval_logits, num_samples=1) 
  interval = tf.squeeze(interval, axis=-1) 
  duration = tf.squeeze(duration, axis=-1)
  step = tf.squeeze(step, axis=-1)
  interval = interval - 24
  pitch = init_pitch + interval

  # `step` and `duration` values should be non-negative

  step = tf.maximum(0, step)
  duration = tf.maximum(0, duration)
  pitch = tf.minimum(120, pitch)
  pitch = tf.maximum(30, pitch)

  return int(pitch), int(interval), float(step), float(duration)

In [None]:
temperature = 1.0
num_predictions = 40
song = skip_list[0]

for song in skip_list:
  test_notes = pitchInst[pitchInst['melid'] == song].reset_index()
  sample_notes = np.stack([test_notes[key] for key in key_order], axis=1)

  input_notes = sample_notes[:seq_length]

  #
  tempo = test_notes['tempo'].iloc[seq_length]
  inst = test_notes['instrument_num'].iloc[seq_length]
  key = test_notes['key_num'].iloc[seq_length]
  title = test_notes['title'].iloc[seq_length]
  performer = test_notes['performer'].iloc[seq_length]
  instrument_name = test_notes['instrument'].iloc[seq_length]
  pitch = test_notes['pitch'].iloc[seq_length]


  generated_notes = []
  prev_start = 0
  for i in range(num_predictions): #THERE ARE PROBLEMS HERE
    pitch, interval ,step, duration = predict_next_note(input_notes, pitch ,model, temperature)
    # interval = interval - 24
    #pitch = pitch + interval
    start = prev_start
    end = start + duration
    # TODO:
    # This line has to change when you change the inputs to the model.  The input_note
    # that is getting appended to input notes needs to have the correct number of 
    # fields cause it is gonna get fed back into the model.predict function
    input_note = (interval, step, duration, tempo, inst, key)
    # input_note = (pitch, step, duration, tempo, inst, key)
    generated_notes.append((*input_note, pitch ,start, end))
    input_notes = np.delete(input_notes, 0, axis=0)
    input_notes = np.append(input_notes , np.expand_dims(input_note, 0), axis=0) 
    prev_start = start + step
    

  generated_notes = pd.DataFrame(
      generated_notes, columns=(*key_order, 'pitch' ,'start', 'end'))

test_notes = test_notes[:seq_length]
start_df = test_notes.drop(['interval'], axis=1)

# string together the first training data and the generated notes
full_sequence = pd.concat([start_df, generated_notes], ignore_index=True)

example_file = f"Song-{song}_seq-{seq_length}-{performer}-{title}.midi"  # adds a prefix to the sample filename
example_pm = my_functions.notes_to_midi(full_sequence[['pitch', 'step', 'duration', 'tempo']], out_file=example_file, instrument_name='Acoustic Grand Piano')



In [None]:
my_functions.plot_piano_roll(full_sequence)

In [None]:
my_functions.plot_piano_roll(generated_notes)

In [None]:
my_functions.plot_distributions(generated_notes)

In [None]:
my_functions.plot_distributions(test_notes)