In [1]:
from pathlib import Path
import mido

train_path = Path("data/train")

# Collect all Mozart MIDI files
mozart_midis = []
for midi_file in train_path.glob("*.mid"):
    if "mozart" in midi_file.name.lower():
        try:
            midi_obj = mido.MidiFile(midi_file)
            mozart_midis.append(midi_obj)
        except Exception as e:
            print(f"Could not load {midi_file}: {e}")

print(f"Loaded {len(mozart_midis)} Mozart MIDI files from train/")


Could not load data\train\mozart-piano_sonatas-nueva_carpeta-k281_piano_sonata_n03_3mov.mid: Could not decode key with 2 flats and mode 2
Could not load data\train\unknown_artist-i_o-mozart_k550.mid: MThd not found. Probably not a MIDI file
Loaded 231 Mozart MIDI files from train/


In [8]:
import midi_conversion
import importlib

importlib.reload(midi_conversion)

mozart_texts = []

# print(midi_to_text(mozart_midis[0]))

total = len(mozart_midis)
for i, mid in enumerate(mozart_midis, start=1):
    mozart_texts.append(midi_conversion.midi_to_text(mid, "mozart"))
    print(f"Processed {i}/{total} files", end="\r")

SEQ_SOS = "<SOS>"
SEQ_EOS = "<EOS>"
seqs = [f"{SEQ_SOS} {txt} {SEQ_EOS}" for txt in mozart_texts]

print("Mozart text processing completed.")

Mozart text processing completed.


In [9]:
from collections import Counter
import torch, math
import torch.nn as nn
import torch.nn.functional as F

# 1) tokenize: your format is already space-separated
all_tokens = []
for s in seqs:
    all_tokens.extend(s.split())

vocab = sorted(set(all_tokens))
stoi = {t:i for i, t in enumerate(vocab)}
itos = {i:t for t,i in stoi.items()}
vocab_size = len(vocab)
print("vocab_size:", vocab_size)

def encode(text: str):
    return [stoi[t] for t in text.split()]

def decode(ids):
    return " ".join(itos[int(i)] for i in ids)

# concatenate all pieces into one long stream
ids = torch.tensor([stoi[t] for t in all_tokens], dtype=torch.long)

# train/val split
n = int(0.9 * len(ids))
train_data = ids[:n]
val_data   = ids[n:]


vocab_size: 606


In [10]:
block_size = 128   # sequence length
batch_size = 32

def get_batch(split):
    data = train_data if split == "train" else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size]     for i in ix]).to(device)
    y = torch.stack([data[i+1:i+block_size+1] for i in ix]).to(device)
    return x, y


In [11]:
class MozartTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=256,
                 n_head=4, n_layer=6, dim_ff=512, block_size=512):
        super().__init__()
        self.block_size = block_size
        self.tok_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(block_size, d_model)

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_head,
            dim_feedforward=dim_ff,
            batch_first=True
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=n_layer)
        self.lm_head = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        B, T = x.shape
        assert T <= self.block_size
        pos = torch.arange(T, device=x.device).unsqueeze(0).expand(B, T)
        h = self.tok_emb(x) + self.pos_emb(pos)

        # causal mask: prevent attention to future positions
        mask = torch.triu(torch.ones(T, T, device=x.device) * float("-inf"), diagonal=1)
        h = self.encoder(h, mask)
        logits = self.lm_head(h)
        return logits


In [12]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

model = MozartTransformer(vocab_size, block_size=block_size)
model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

Device: cuda


In [13]:
def estimate_loss():
    model.eval()
    out = {}
    with torch.no_grad():
        for split in ["train", "val"]:
            losses = []
            correct = 0
            total = 0
            for _ in range(10):
                xb, yb = get_batch(split)
                logits = model(xb)
                loss = F.cross_entropy(
                    logits.view(-1, vocab_size),
                    yb.view(-1)
                )
                losses.append(loss.item())

                # Accuracy
                preds = torch.argmax(logits, dim=-1)
                correct += (preds == yb).float().sum().item()
                total += yb.numel()
                
            avg_loss = sum(losses) / len(losses)
            accuracy = correct / total
            out[split] = (avg_loss, accuracy)
    model.train()
    return out

max_iters = 6000
eval_interval = 250

for step in range(max_iters):
    xb, yb = get_batch("train")
    logits = model(xb)
    loss = F.cross_entropy(
        logits.view(-1, vocab_size),
        yb.view(-1)
    )

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if step % eval_interval == 0:
        losses = estimate_loss()
        train_loss, train_acc = losses["train"]
        val_loss, val_acc = losses["val"]
        print(f"step {step}: train loss {train_loss:.3f}, acc {train_acc:.3f} | val loss {val_loss:.3f}, acc {val_acc:.3f}")


step 0: train loss 6.206, acc 0.030 | val loss 6.144, acc 0.049
step 250: train loss 2.197, acc 0.432 | val loss 2.022, acc 0.485
step 500: train loss 1.775, acc 0.517 | val loss 1.613, acc 0.562
step 750: train loss 1.667, acc 0.538 | val loss 1.501, acc 0.577
step 1000: train loss 1.557, acc 0.568 | val loss 1.396, acc 0.599
step 1250: train loss 1.465, acc 0.587 | val loss 1.326, acc 0.620
step 1500: train loss 1.426, acc 0.595 | val loss 1.287, acc 0.633
step 1750: train loss 1.412, acc 0.597 | val loss 1.263, acc 0.635
step 2000: train loss 1.373, acc 0.605 | val loss 1.299, acc 0.621
step 2250: train loss 1.352, acc 0.608 | val loss 1.229, acc 0.641
step 2500: train loss 1.354, acc 0.611 | val loss 1.219, acc 0.644
step 2750: train loss 1.294, acc 0.623 | val loss 1.194, acc 0.655
step 3000: train loss 1.344, acc 0.617 | val loss 1.223, acc 0.647
step 3250: train loss 1.332, acc 0.618 | val loss 1.189, acc 0.654
step 3500: train loss 1.268, acc 0.633 | val loss 1.165, acc 0.659
s

In [30]:
SOS_ID = stoi[SEQ_SOS]
EOS_ID = stoi[SEQ_EOS]

@torch.no_grad()
def generate(start_tokens=None, max_new_tokens=200):
    model.eval()
    if start_tokens is None:
        x = torch.tensor([[SOS_ID]], dtype=torch.long, device=device)
    else:
        x = torch.tensor([start_tokens], dtype=torch.long, device=device)

    for _ in range(max_new_tokens):
        x_cond = x[:, -block_size:]
        logits = model(x_cond)
        logits = logits[:, -1, :]
        probs = torch.softmax(logits, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1)  # sample
        x = torch.cat([x, next_id], dim=1)

        # stop at EOS
        if int(next_id[0, 0]) == EOS_ID:
            break

    return x[0].tolist()

# Seed with first few tokens from a real piece
seed_tokens = encode(mozart_texts[0])[:50]
generated_ids = generate(seed_tokens, max_new_tokens=800)
generated_text = decode(generated_ids)
print('First 200 chars of generated text:\n')
print(generated_text[:200])

First 200 chars of generated text:

COMPOSER_mozart KEY_D TIME_SIGNATURE_4/4 TEMPO_BPM_36 MEASURE BEAT POS_0 NOTE_71 DUR_50 VEL_4 BEAT POS_0 NOTE_66 DUR_25 VEL_4 POS_24 NOTE_74 DUR_21 VEL_4 BEAT POS_0 NOTE_53 DUR_95 VEL_6 NOTE_59 DUR_50


In [31]:
from midi_conversion import text_to_midi
import importlib
import os

importlib.reload(midi_conversion)

mid = text_to_midi(generated_text)

# Create output directory if it doesn't exist
os.makedirs("generated", exist_ok=True)

# Save to path
output_path = os.path.join("generated", "mozart_output.mid")
mid.save(output_path)