In [None]:
# Transformer-based Bach Cello Suite music generation
# Run this notebook in Google Colab for best experience.

!pip install music21 tensorflow==2.15.0 --quiet

import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
from music21 import converter, instrument, note, chord, duration, stream
from IPython.display import Audio, display
from google.colab import files

MIDI_DIR = "/content/bach_cello_midi"
os.makedirs(MIDI_DIR, exist_ok=True)
print("Upload Bach Cello Suite MIDI files into:", MIDI_DIR)

SEQ_LEN = 128
BATCH_SIZE = 16
EPOCHS = 10
EMBED_DIM = 256
NUM_HEADS = 4
FF_DIM = 512
NUM_LAYERS = 4

def parse_midi_file(path):
    midi = converter.parse(path)
    parts = instrument.partitionByInstrument(midi)
    if parts:
        notes_to_parse = parts.parts[0].recurse()
    else:
        notes_to_parse = midi.flat.notes
    seq = []
    for elem in notes_to_parse:
        if isinstance(elem, note.Note):
            pitch = elem.pitch.nameWithOctave
            dur = float(elem.duration.quarterLength)
            seq.append((pitch, dur))
        elif isinstance(elem, chord.Chord):
            pitch = elem.root().nameWithOctave
            dur = float(elem.duration.quarterLength)
            seq.append((pitch, dur))
    return seq

all_sequences = []
for fname in os.listdir(MIDI_DIR):
    if not fname.lower().endswith((".mid", ".midi")):
        continue
    path = os.path.join(MIDI_DIR, fname)
    try:
        seq = parse_midi_file(path)
        if len(seq) > 0:
            all_sequences.append(seq)
            print(f"Parsed {fname} with {len(seq)} events.")
    except Exception as e:
        print("Error parsing", fname, ":", e)
print("Total sequences loaded:", len(all_sequences))

all_notes = set()
all_durs = set()
for seq in all_sequences:
    for pitch, dur in seq:
        all_notes.add(pitch)
        all_durs.add(round(dur, 2))

all_notes = sorted(list(all_notes))
all_durs = sorted(list(all_durs))

START_NOTE = "<START_NOTE>"
START_DUR = "<START_DUR>"
PAD_TOKEN = "<PAD>"

note_vocab = [PAD_TOKEN, START_NOTE] + all_notes
dur_vocab = [PAD_TOKEN, START_DUR] + [str(d) for d in all_durs]

note2idx = {n: i for i, n in enumerate(note_vocab)}
idx2note = {i: n for n, i in note2idx.items()}
dur2idx = {d: i for i, d in enumerate(dur_vocab)}
idx2dur = {i: d for d, i in dur2idx.items()}

NOTE_VOCAB_SIZE = len(note_vocab)
dur_VOCAB_SIZE = len(dur_vocab)

def encode_sequence(seq, max_len=SEQ_LEN):
    note_ids = [note2idx[START_NOTE]]
    dur_ids = [dur2idx[START_DUR]]
    for pitch, dur in seq:
        if pitch not in note2idx:
            continue
        dur_str = str(round(dur, 2))
        if dur_str not in dur2idx:
            continue
        note_ids.append(note2idx[pitch])
        dur_ids.append(dur2idx[dur_str])
    note_ids = note_ids[:max_len]
    dur_ids = dur_ids[:max_len]
    while len(note_ids) < max_len:
        note_ids.append(note2idx[PAD_TOKEN])
        dur_ids.append(dur2idx[PAD_TOKEN])
    return np.array(note_ids, dtype=np.int32), np.array(dur_ids, dtype=np.int32)

encoded_notes = []
encoded_durs = []
for seq in all_sequences:
    n_ids, d_ids = encode_sequence(seq, SEQ_LEN)
    encoded_notes.append(n_ids)
    encoded_durs.append(d_ids)
encoded_notes = np.array(encoded_notes)
encoded_durs = np.array(encoded_durs)

def create_inputs_targets(notes, durs):
    in_notes = notes[:, :-1]
    in_durs = durs[:, :-1]
    out_notes = notes[:, 1:]
    out_durs = durs[:, 1:]
    return (in_notes, in_durs), (out_notes, out_durs)

(X_notes, X_durs), (Y_notes, Y_durs) = create_inputs_targets(encoded_notes, encoded_durs)
dataset = tf.data.Dataset.from_tensor_slices(((X_notes, X_durs), (Y_notes, Y_durs)))
dataset = dataset.shuffle(buffer_size=256).batch(BATCH_SIZE, drop_remainder=True)

class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_emb = layers.Embedding(vocab_size, embed_dim)
        self.pos_emb = layers.Embedding(maxlen, embed_dim)
        self.maxlen = maxlen
    def call(self, x):
        positions = tf.range(start=0, limit=self.maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim)]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
    def call(self, inputs, training=False, mask=None):
        attn_output = self.att(inputs, inputs, attention_mask=mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

def build_music_transformer(maxlen, note_vocab_size, dur_vocab_size, embed_dim, num_heads, ff_dim, num_layers):
    notes_in = layers.Input(shape=(maxlen-1,), name="notes_in")
    durs_in = layers.Input(shape=(maxlen-1,), name="durs_in")
    note_emb_layer = TokenAndPositionEmbedding(maxlen-1, note_vocab_size, embed_dim)
    dur_emb_layer = TokenAndPositionEmbedding(maxlen-1, dur_vocab_size, embed_dim)
    note_emb = note_emb_layer(notes_in)
    dur_emb = dur_emb_layer(durs_in)
    x = layers.Concatenate(axis=-1)([note_emb, dur_emb])
    x = layers.Dense(embed_dim)(x)
    for _ in range(num_layers):
        x = TransformerBlock(embed_dim, num_heads, ff_dim)(x)
    note_logits = layers.Dense(note_vocab_size, name="note_logits")(x)
    dur_logits = layers.Dense(dur_vocab_size, name="dur_logits")(x)
    model = Model(inputs=[notes_in, durs_in], outputs=[note_logits, dur_logits])
    return model

model = build_music_transformer(
    maxlen=SEQ_LEN,
    note_vocab_size=NOTE_VOCAB_SIZE,
    dur_vocab_size=len(dur_vocab),
    embed_dim=EMBED_DIM,
    num_heads=NUM_HEADS,
    ff_dim=FF_DIM,
    num_layers=NUM_LAYERS,
)

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss={"note_logits": loss_fn, "dur_logits": loss_fn},
    loss_weights={"note_logits": 1.0, "dur_logits": 1.0},
)

history = model.fit(dataset, epochs=EPOCHS)

def top_k_sample(logits, k=8, temperature=0.85):
    logits = logits / temperature
    values, indices = tf.math.top_k(logits, k=k)
    indices = indices.numpy()
    values = values.numpy()
    values = values - values.max()
    probs = np.exp(values)
    probs = probs / probs.sum()
    idx = np.random.choice(indices, p=probs)
    return int(idx)

def generate_music(model, max_len=SEQ_LEN, seed_notes=None, seed_durs=None):
    if seed_notes is None:
        seed_notes = [note2idx[START_NOTE]]
        seed_durs = [dur2idx[START_DUR]]
    notes = seed_notes[:]
    durs = seed_durs[:]
    while len(notes) < max_len:
        in_notes = np.array(notes[-(max_len-1):], dtype=np.int32)
        in_durs = np.array(durs[-(max_len-1):], dtype=np.int32)
        while len(in_notes) < (max_len-1):
            in_notes = np.append(in_notes, note2idx[PAD_TOKEN])
            in_durs = np.append(in_durs, dur2idx[PAD_TOKEN])
        in_notes = in_notes[np.newaxis, :]
        in_durs = in_durs[np.newaxis, :]
        note_logits, dur_logits = model.predict([in_notes, in_durs], verbose=0)
        note_logits_last = note_logits[0, -1, :]
        dur_logits_last = dur_logits[0, -1, :]
        next_note_id = top_k_sample(note_logits_last, k=8, temperature=0.85)
        next_dur_id = top_k_sample(dur_logits_last, k=8, temperature=0.85)
        if next_note_id == note2idx[PAD_TOKEN] and next_dur_id == dur2idx[PAD_TOKEN]:
            break
        notes.append(next_note_id)
        durs.append(next_dur_id)
    return notes, durs

gen_notes_ids, gen_durs_ids = generate_music(model, max_len=SEQ_LEN)
gen_sequence = []
for nid, did in zip(gen_notes_ids, gen_durs_ids):
    pitch = idx2note.get(nid, PAD_TOKEN)
    dur_str = idx2dur.get(did, "0.25")
    if pitch in (PAD_TOKEN, START_NOTE):
        continue
    if dur_str in (PAD_TOKEN, START_DUR):
        continue
    dur = float(dur_str)
    gen_sequence.append((pitch, dur))
print("Generated sequence (first 20 events):")
print(gen_sequence[:20])

def sequence_to_midi(seq, out_path, tempo_bpm=120):
    s = stream.Stream()
    from music21 import tempo as m21tempo
    s.append(m21tempo.MetronomeMark(number=tempo_bpm))
    for pitch, dur in seq:
        n = note.Note(pitch)
        n.duration = duration.Duration(dur)
        s.append(n)
    s.write("midi", fp=out_path)

out_midi_path = "/content/generated_bach_transformer_sample.mid"
sequence_to_midi(gen_sequence, out_midi_path)
files.download(out_midi_path)
