In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
import glob
import pickle
from unittest import result
import numpy as np
import random
from tqdm import tqdm
from typing import cast
from music21 import converter, instrument, note, chord, stream
from music21.stream.base import Score
import tensorflow as tf
from pathlib import Path
from tensorflow import keras
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Dropout, LSTM, Activation, BatchNormalization, Input, concatenate
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

In [14]:
def get_songs():
    songs = []
    folder = Path('/content/drive/MyDrive/finalyearproject/LakhMIDI')
    for file in folder.rglob('*.mid'):
        songs.append(file)

    result = random.sample(songs, 100)
    # print(result)
    return result

In [4]:
def quantise_value(value, resolution=0.25):
    return round(value / resolution) * resolution


In [5]:
def get_notes():
    notes_file = Path('/content/drive/MyDrive/finalyearproject/notes/notes7.pkl')

    songs = get_songs()
    notes = []

    # load existing notes
    if notes_file.exists():
        with open(notes_file, 'rb') as f:
            notes = pickle.load(f)
        print(f" Resuming from {len(notes)} existing notes")

    batch_notes = []

    # process each song and extract note/chord data
    for i, song in enumerate(tqdm(songs, desc="Processing songs")):
        try:
            midi = converter.parse(song)
            parsing_notes = None

            try:
                ins = instrument.partitionByInstrument(midi)
                if isinstance(ins, Score):
                    parsing_notes = ins.parts[0].recurse()
                else:
                    parsing_notes = midi.flatten().notes
            except Exception as e:
                print(f" Error processing {song}: {e}")
                continue

            offsetBase = 0

            # go through each note/chord and format its represenation
            if parsing_notes is not None:
                for element in parsing_notes:
                    if isinstance(element, note.Note):
                        q_duration = quantise_value(element.duration.quarterLength, resolution=0.25)
                        q_offset = quantise_value(element.offset - offsetBase, resolution=0.25)
                        note_str = f"{element.pitch}:{q_duration}:{q_offset}" # pitch:duration:offset
                        batch_notes.append(note_str)
                        offsetBase = element.offset
                    elif isinstance(element, chord.Chord):
                        chord_components = '.'.join(str(n) for n in element.normalOrder)
                        q_duration = quantise_value(element.duration.quarterLength, resolution=0.25)
                        q_offset = quantise_value(element.offset - offsetBase, resolution=0.25)
                        chord_str = f"{chord_components}:{q_duration}:{q_offset}" # chord:duration:offset
                        batch_notes.append(chord_str)
                        offsetBase = element.offset

        except Exception as e:
            print(f" Skipping {song} due to error: {e}")
            continue

        # save and clear batch every 50 songs
        if (i + 1) % 50 == 0:
            notes.extend(batch_notes)  # add batch to full list
            with open(notes_file, 'wb') as f:
                pickle.dump(notes, f)  # save full list to file
            print(f" Checkpoint: Saved {len(notes)} notes")
            batch_notes.clear()  # Clear batch to save memory

    # final save
    if batch_notes:  # save remaining batch if not empty
        notes.extend(batch_notes)
        with open(notes_file, 'wb') as f:
            pickle.dump(notes, f)
        print(f" Final save: Saved {len(notes)} notes")

    return notes

In [None]:
get_notes()

In [6]:
def prepare_sequences(notes, n_vocab):
	sequence_length = 128

	# get total vocab of component
	total_comp = sorted(set(item for item in notes))

	 # create a dictionary to map vocab to integers
	note_index_map = dict((note, number) for number, note in enumerate(total_comp))

	network_input = []
	network_output = []

	# create input sequences and outputs
	for i in range(0, len(notes) - sequence_length, 1):
		sequence_input = notes[i:i + sequence_length]
		sequence_output = notes[i + sequence_length]
		network_input.append([note_index_map[char] for char in sequence_input])
		network_output.append(note_index_map[sequence_output])

	n_patterns = len(network_input)

	# reshape for LSTM compatibility
	network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))

	# normalise input
	network_input = network_input / float(n_vocab)

	network_output = to_categorical(network_output)

	return (network_input, network_output)

In [7]:
def create_input_branch(input_data, name, lstm_units=256, dropout_rate=0.2):
    # layer for each feature branch
    input_layer = Input(shape=(input_data.shape[1], input_data.shape[2]), name=f"input_{name}")
    x = LSTM(
        lstm_units,
        return_sequences=True,
        name=f"lstm_{name}"
    )(input_layer)
    x = Dropout(dropout_rate, name=f"dropout_{name}")(x)

    return input_layer, x

In [8]:
def create_output_branch(x, n_vocab, name, dense_units=128, dropout_rate=0.3):
    # output for each branch
    x = Dense(dense_units, activation='relu', name=f"dense_{name}")(x)
    x = BatchNormalization(name=f"bn_{name}")(x)
    x = Dropout(dropout_rate, name=f"dropout_{name}")(x)
    output = Dense(n_vocab, activation='softmax', name=name)(x)

    return output

In [11]:
def create_network(network_input_notes, n_vocab_notes,
                  network_input_durations, n_vocab_durations,
                  network_input_offsets, n_vocab_offsets):

    # Create input branches
    input_notes_layer, input_notes = create_input_branch(network_input_notes, "notes")
    input_durations_layer, input_durations = create_input_branch(network_input_durations, "durations")
    input_offsets_layer, input_offsets = create_input_branch(network_input_offsets, "offsets")

    # Concatenate the three input branches
    combined = concatenate([input_notes, input_durations, input_offsets], name="combined_features")

    # Process combined features
    x = LSTM(512, return_sequences=True, name="lstm_combined_1")(combined)
    x = Dropout(0.3, name="dropout_combined_1")(x)
    x = LSTM(512, name="lstm_combined_2")(x)
    x = BatchNormalization(name="bn_combined")(x)
    x = Dropout(0.3, name="dropout_combined_2")(x)
    x = Dense(256, activation='relu', name="dense_combined")(x)

    # Create output branches
    output_notes = create_output_branch(x, n_vocab_notes, "Note")
    output_durations = create_output_branch(x, n_vocab_durations, "Duration")
    output_offsets = create_output_branch(x, n_vocab_offsets, "Offset")

    # Create and compile model
    model = Model(
        inputs=[input_notes_layer, input_durations_layer, input_offsets_layer],
        outputs=[output_notes, output_durations, output_offsets]
    )

    model.compile(loss='categorical_crossentropy', optimizer='adam')

    # load weights to continue training
    # model.load_weights(weights_path)

    return model

In [10]:
def train(model, network_input_notes, network_input_durations, network_input_offsets, network_output_notes, network_output_durations, network_output_offsets, epochs=200, batch_size=128):

  filepath = Path("/content/drive/MyDrive/finalyearproject/weights/weight-{epoch:02d}-{loss:.4f}.keras")

  checkpoint = ModelCheckpoint(
		filepath,
		monitor='loss',
		verbose=0,
		save_best_only=True,
		mode='min'
	)

  # setup early stopping callback
  early_stopping = EarlyStopping(
      monitor='loss',
      patience=10,
      min_delta=0.0005,
      verbose=1,
      restore_best_weights=True  # restore model weights from the epoch with the best value
  )


  callbacks_list = [checkpoint, early_stopping]

  model.fit([network_input_notes, network_input_durations, network_input_offsets], [network_output_notes, network_output_durations, network_output_offsets], epochs=200, batch_size=128, callbacks=callbacks_list, verbose=1)

  # train the model
  history = model.fit(
      [network_input_notes, network_input_durations, network_input_offsets],
      [network_output_notes, network_output_durations, network_output_offsets],
      epochs=epochs,
      batch_size=batch_size,
      callbacks=callbacks_list,
      verbose=1
  )

  return history


In [15]:
def train_network():
  notes_file = Path("/content/drive/MyDrive/finalyearproject/notes/notes7.pkl")
  notes = []
  with open(notes_file, "rb") as file:
      notes = pickle.load(file)

  # Get vocabs
  # Extract individual components for each note
  all_pitches = []
  all_durations = []
  all_offsets = []
  for item in notes:
      parts = item.split(":")
      all_pitches.append(parts[0])
      all_durations.append(parts[1])
      all_offsets.append(parts[2])


  pitches = sorted(set(all_pitches))
  durations = sorted(set(all_durations))
  offsets = sorted(set(all_offsets))

  # prepare for model
  n_vocab_offsets = len(offsets)
  network_input_offsets, network_output_offsets = prepare_sequences(all_offsets, n_vocab_offsets)

  n_vocab_notes = len(pitches)
  network_input_notes, network_output_notes = prepare_sequences(all_pitches, n_vocab_notes)

  n_vocab_durations = len(durations)
  network_input_durations, network_output_durations = prepare_sequences(all_durations, n_vocab_durations)

  # Create model
  model = create_network(
      network_input_notes, n_vocab_notes,
      network_input_durations, n_vocab_durations,
      network_input_offsets, n_vocab_offsets
  )

  # Train model
  train(
      model,
      network_input_notes, network_input_durations, network_input_offsets,
      network_output_notes, network_output_durations, network_output_offsets
  )

  # # Optional: Save final model
  # model.save(Path("/path"))

In [None]:
train_network()