<a href="https://colab.research.google.com/github/Enkrumah14/mannyNkrumahGenAi/blob/main/Problem1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
# Import Libraries
import os
import glob
import time
from fractions import Fraction
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models, losses, callbacks
import music21

# Parameters
PARSE_MIDI_FILES = True
PARSED_DATA_PATH = "/content/output"
DATASET_REPETITIONS = 1
SEQ_LEN = 50
EMBEDDING_DIM = 256
KEY_DIM = 256
N_HEADS = 5
DROPOUT_RATE = 0.3
FEED_FORWARD_DIM = 256
LOAD_MODEL = False
EPOCHS = 100
BATCH_SIZE = 256
GENERATE_LEN = 50

# Function to parse MIDI files
def parse_midi_files(file_list, parser, seq_len, parsed_data_path):
    notes, durations = [], []
    for file in file_list:
        try:
            score = parser.parse(file)
            flat_score = score.flat.notes
            file_notes, file_durations = [], []
            for element in flat_score:
                if isinstance(element, music21.note.Note):
                    if element.nameWithOctave:  # Ensure note is valid
                        file_notes.append(element.nameWithOctave)
                        file_durations.append(str(element.quarterLength))
                elif isinstance(element, music21.note.Rest):
                    file_notes.append("rest")
                    file_durations.append(str(element.quarterLength))
            notes.extend(file_notes[:seq_len])
            durations.extend(file_durations[:seq_len])
        except Exception as e:
            print(f"Error parsing {file}: {e}")

    # Filter out invalid entries and replace `[UNK]` with defaults
    notes = [note for note in notes if note.strip() and note != "[UNK]"]
    durations = [
        duration if duration.strip() and duration != "[UNK]" else "0.25"
        for duration in durations
    ]
    return notes, durations

# Function to generate a MIDI note
def get_midi_note(note, duration):
    try:
        # Convert fractional durations to float
        duration_value = float(Fraction(duration))
    except ValueError:
        print(f"Invalid duration: {duration}, defaulting to 0.25")
        duration_value = 0.25  # Default to a quarter note if conversion fails

    if note == "rest":
        return music21.note.Rest(quarterLength=duration_value)
    elif note.strip() and note != "[UNK]":  # Ensure note is valid and not `[UNK]`
        try:
            return music21.note.Note(note, quarterLength=duration_value)
        except music21.pitch.PitchException:
            print(f"Invalid note: {note}, skipping.")
            return None
    else:
        print(f"Empty or unknown note encountered, skipping.")
        return None

# Sine positional encoding
class SinePositionEncoding(layers.Layer):
    def __init__(self):
        super().__init__()

    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]
        embed_dim = tf.shape(inputs)[2]
        pos = tf.cast(tf.range(seq_len)[..., tf.newaxis], tf.float32)
        i = tf.cast(tf.range(embed_dim)[tf.newaxis, ...], tf.float32)
        angles = pos / tf.pow(10000.0, (2 * (i // 2)) / tf.cast(embed_dim, tf.float32))
        encoding = tf.where(tf.cast(i % 2, tf.bool), tf.cos(angles), tf.sin(angles))
        return inputs + encoding

# Data Preparation
file_list = glob.glob(f"{PARSED_DATA_PATH}/*.mid")
print(f"Found {len(file_list)} MIDI files")

if PARSE_MIDI_FILES:
    notes, durations = parse_midi_files(file_list, music21.converter, SEQ_LEN + 1, PARSED_DATA_PATH)
else:
    notes, durations = [], []

# Tokenization
def create_dataset(elements):
    elements = [str(e) for e in elements]
    ds = (
        tf.data.Dataset.from_tensor_slices(elements)
        .batch(BATCH_SIZE, drop_remainder=True)
        .shuffle(1000)
    )
    vectorize_layer = layers.TextVectorization(standardize=None, output_mode="int")
    vectorize_layer.adapt(ds)
    vocab = vectorize_layer.get_vocabulary()
    return ds, vectorize_layer, vocab

notes_seq_ds, notes_vectorize_layer, notes_vocab = create_dataset(notes)
durations_seq_ds, durations_vectorize_layer, durations_vocab = create_dataset(durations)
seq_ds = tf.data.Dataset.zip((notes_seq_ds, durations_seq_ds))

# Dataset Preparation for Training
def prepare_inputs(notes, durations):
    notes = tf.expand_dims(notes, -1)
    durations = tf.expand_dims(durations, -1)
    tokenized_notes = notes_vectorize_layer(notes)
    tokenized_durations = durations_vectorize_layer(durations)
    x = (tokenized_notes[:, :-1], tokenized_durations[:, :-1])
    y = (tokenized_notes[:, 1:], tokenized_durations[:, 1:])
    return x, y

ds = seq_ds.map(prepare_inputs).repeat(DATASET_REPETITIONS)

# Causal Attention Mask
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    mask = i >= j - n_src + n_dest
    mask = tf.cast(mask, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat([tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0)
    return tf.tile(mask, mult)

# Transformer Block
class TransformerBlock(layers.Layer):
    def __init__(self, num_heads, key_dim, embed_dim, ff_dim, name, dropout_rate=DROPOUT_RATE):
        super().__init__(name=name)
        self.attn = layers.MultiHeadAttention(num_heads, key_dim, output_shape=embed_dim)
        self.dropout_1 = layers.Dropout(dropout_rate)
        self.ln_1 = layers.LayerNormalization(epsilon=1e-6)
        self.ffn_1 = layers.Dense(ff_dim, activation="relu")
        self.ffn_2 = layers.Dense(embed_dim)
        self.dropout_2 = layers.Dropout(dropout_rate)
        self.ln_2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, seq_len = input_shape[0], input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        attention_output, attention_scores = self.attn(
            inputs, inputs, attention_mask=causal_mask, return_attention_scores=True
        )
        attention_output = self.dropout_1(attention_output)
        out1 = self.ln_1(inputs + attention_output)
        ffn_output = self.dropout_2(self.ffn_2(self.ffn_1(out1)))
        return self.ln_2(out1 + ffn_output), attention_scores

# Token and Position Embedding
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim, embeddings_initializer="he_uniform"
        )
        self.pos_emb = SinePositionEncoding()

    def call(self, x):
        token_embeddings = self.token_emb(x)
        positions = self.pos_emb(token_embeddings)
        return token_embeddings + positions

# Build the Model
note_inputs = layers.Input(shape=(None,), dtype=tf.int32)
durations_inputs = layers.Input(shape=(None,), dtype=tf.int32)
note_embeddings = TokenAndPositionEmbedding(len(notes_vocab), EMBEDDING_DIM // 2)(note_inputs)
duration_embeddings = TokenAndPositionEmbedding(len(durations_vocab), EMBEDDING_DIM // 2)(durations_inputs)
embeddings = layers.Concatenate()([note_embeddings, duration_embeddings])
x, attention_scores = TransformerBlock(
    N_HEADS, KEY_DIM, EMBEDDING_DIM, FEED_FORWARD_DIM, name="attention"
)(embeddings)
note_outputs = layers.Dense(len(notes_vocab), activation="softmax", name="note_outputs")(x)
duration_outputs = layers.Dense(len(durations_vocab), activation="softmax", name="duration_outputs")(x)
model = models.Model(inputs=[note_inputs, durations_inputs], outputs=[note_outputs, duration_outputs])
model.compile(optimizer="adam", loss=[
    losses.SparseCategoricalCrossentropy(), losses.SparseCategoricalCrossentropy()
])
model.summary()

# Music Generator Callback
class MusicGenerator(callbacks.Callback):
    def __init__(self, notes_vocab, durations_vocab, output_path="/content/output"):
        self.index_to_note = {idx: note for idx, note in enumerate(notes_vocab)}
        self.index_to_duration = {idx: duration for idx, duration in enumerate(durations_vocab)}
        self.output_path = output_path
        os.makedirs(output_path, exist_ok=True)

    def generate(self, start_notes, start_durations, max_tokens=GENERATE_LEN, temperature=0.5):
        notes = [self.index_to_note.get(int(x), "rest") for x in start_notes]
        durations = [self.index_to_duration.get(int(x), "0.25") for x in start_durations]

        midi_stream = music21.stream.Stream()
        midi_stream.append(music21.clef.BassClef())

        for note, duration in zip(notes, durations):
            midi_note = get_midi_note(note, duration)
            if midi_note:
                midi_stream.append(midi_note)

        for _ in range(max_tokens):
            x1 = tf.convert_to_tensor([start_notes])
            x2 = tf.convert_to_tensor([start_durations])
            pred_notes, pred_durations = model.predict([x1, x2], verbose=0)

            pred_note_idx = tf.random.categorical(tf.math.log(pred_notes[0]), num_samples=1).numpy()[0, 0]
            pred_duration_idx = tf.random.categorical(tf.math.log(pred_durations[0]), num_samples=1).numpy()[0, 0]

            start_notes.append(pred_note_idx)
            start_durations.append(pred_duration_idx)

            note = self.index_to_note.get(pred_note_idx, "rest")
            duration = self.index_to_duration.get(pred_duration_idx, "0.25")
            midi_note = get_midi_note(note, duration)
            if midi_note:
                midi_stream.append(midi_note)

        return midi_stream

    def on_epoch_end(self, epoch, logs=None):
        start_notes = [int(notes_vectorize_layer(["C4"]).numpy().item())]
        start_durations = [int(durations_vectorize_layer(["0.25"]).numpy().item())]

        midi_stream = self.generate(start_notes, start_durations)
        timestamp = time.strftime("%Y%m%d-%H%M%S")
        file_path = os.path.join(self.output_path, f"output-{timestamp}-epoch{epoch}.mid")
        midi_stream.write("midi", fp=file_path)
        print(f"Exported MIDI file for epoch {epoch}: {file_path}")

# Initialize the Music Generator Callback
music_generator = MusicGenerator(notes_vocab, durations_vocab)

# Train the Model with the Callback
model.fit(ds, epochs=EPOCHS, callbacks=[music_generator])


Found 36 MIDI files


Epoch 1/100
[1m1/7[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m25s[0m 4s/step - duration_outputs_loss: nan - loss: nan - note_outputs_loss: nanInvalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: , defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: , defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Invalid duration: [UNK], defaulting to 0.25
Exported MIDI file for epoch 0: /content/output/output-20241207-023436-epoch0.mid
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m208s[0m 34s/step - d

<keras.src.callbacks.history.History at 0x7ca4ec5bc250>

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


/content/bach_cello_suites.zip