In [1]:
import datasets
import argparse
import torch
import tqdm

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"device: {device}")

device: cuda


# DATA

## Load data

In [14]:
# load data
dataset = datasets.load_from_disk("dataset")
dataset

DatasetDict({
    train: Dataset({
        features: ['abc notation', 'control code'],
        num_rows: 214122
    })
    validation: Dataset({
        features: ['abc notation', 'control code'],
        num_rows: 2162
    })
})

## Tokenizer Encoding/Decoding

In [None]:
# encoding and decoding  
vocab_size = 100

def char_level_tokenizer(dataset):
    chars = sorted(set("\n\n".join(dataset["train"]["abc notation"]+dataset["validation"]["abc notation"])))
    vocab_size = len(chars) 
    print(f"vocab_size: {vocab_size}")
    print(f"chars: {chars}")
    chat2index = {ch:i for i, ch in enumerate(chars)}
    index2chat = {i:ch for i, ch in enumerate(chars)}
    encode = lambda x: [chat2index[c] for c in x]
    decode = lambda x: "".join([index2chat[c] for c in x])
    return encode, decode, vocab_size

def BPETokenizer(dataset):
    from tokenizers import Tokenizer, models, trainers, pre_tokenizers
    # Initialize a tokenizer
    tokenizer = Tokenizer(models.BPE())

    # Define pre-tokenization rules (split on |, :, and whitespace)
    # tokenizer.pre_tokenizer = pre_tokenizers.WhitespaceSplit()

    # Train tokenizer on your dataset
    trainer = trainers.BpeTrainer(special_tokens=["<START>", "<END>", "<PAD>"], vocab_size=vocab_size)
    tokenizer.train_from_iterator(dataset["train"]["abc notation"], trainer=trainer, )

    def encode(sequence):
        """
        Encodes an ABC notation sequence into a list of token IDs.
        
        Args:
        - tokenizer: The tokenizer object.
        - sequence: A string of ABC notation to encode.

        Returns:
        - List of token IDs.
        """
        # Add <START> and <END> tokens for sequence boundaries
        sequence_with_tokens = f"<START>{sequence}<END>"
        encoded = tokenizer.encode(sequence_with_tokens)
        return encoded.ids
    
    def _decode(token_ids):
        """
        Decodes a list of token IDs back into an ABC notation sequence.
        
        Args:
        - tokenizer: The tokenizer object.
        - token_ids: A list of token IDs to decode.

        Returns:
        - Decoded string of ABC notation.
        """
        decoded = tokenizer.decode(token_ids)
        # Remove <START> and <END> tokens if present
        return decoded.replace("<START>", "").replace("<END>", "")

    def decode(token_ids):
        return "".join([_decode([t]) for t in token_ids])
    return encode, decode, len(tokenizer.get_vocab()), tokenizer

    

In [40]:
encode, decode, vocab_size, tokenizer = BPETokenizer(dataset)
vocab = tokenizer.get_vocab()
vocab = dict(sorted(vocab.items(), key=lambda item: item[1]))
print(vocab)

{'<START>': 0, '<END>': 1, '<PAD>': 2, '\n': 3, ' ': 4, '!': 5, '"': 6, '#': 7, '$': 8, '&': 9, "'": 10, '(': 11, ')': 12, '*': 13, '+': 14, ',': 15, '-': 16, '.': 17, '/': 18, '0': 19, '1': 20, '2': 21, '3': 22, '4': 23, '5': 24, '6': 25, '7': 26, '8': 27, '9': 28, ':': 29, ';': 30, '<': 31, '=': 32, '>': 33, '?': 34, '@': 35, 'A': 36, 'B': 37, 'C': 38, 'D': 39, 'E': 40, 'F': 41, 'G': 42, 'H': 43, 'I': 44, 'J': 45, 'K': 46, 'L': 47, 'M': 48, 'N': 49, 'O': 50, 'P': 51, 'Q': 52, 'R': 53, 'S': 54, 'T': 55, 'U': 56, 'V': 57, 'W': 58, 'X': 59, 'Y': 60, 'Z': 61, '[': 62, '\\': 63, ']': 64, '^': 65, '_': 66, '`': 67, 'a': 68, 'b': 69, 'c': 70, 'd': 71, 'e': 72, 'f': 73, 'g': 74, 'h': 75, 'i': 76, 'j': 77, 'k': 78, 'l': 79, 'm': 80, 'n': 81, 'o': 82, 'p': 83, 'q': 84, 'r': 85, 's': 86, 't': 87, 'u': 88, 'v': 89, 'w': 90, 'x': 91, 'y': 92, 'z': 93, '{': 94, '|': 95, '}': 96, '~': 97, ' |': 98, ' | ': 99}


In [43]:
sample = """X:1\nL:1/8\nM:6/8\nK:Bb\n F | B2 d c2 f | edc B2 F | GAB cec | BAG FGA | B2 d c2 f | edc B2 F | Gec AGA | B2 d B2 |: \n !fermata!F | D2 F D2 F | EGB cED | C2 E C2 E | DFA Bdf | geg fdb | gab [df]bb | dba gf=e |1 \n fff f2 :|2 fgf _edc!D.C.! ||"""
# sample = """1\n A \n1"""
print(sample)
print(decode(encode(sample)))

X:1
L:1/8
M:6/8
K:Bb
 F | B2 d c2 f | edc B2 F | GAB cec | BAG FGA | B2 d c2 f | edc B2 F | Gec AGA | B2 d B2 |: 
 !fermata!F | D2 F D2 F | EGB cED | C2 E C2 E | DFA Bdf | geg fdb | gab [df]bb | dba gf=e |1 
 fff f2 :|2 fgf _edc!D.C.! ||
X:1
L:1/8
M:6/8
K:Bb
 F | B2 d c2 f | edc B2 F | GAB cec | BAG FGA | B2 d c2 f | edc B2 F | Gec AGA | B2 d B2 |: 
 !fermata!F | D2 F D2 F | EGB cED | C2 E C2 E | DFA Bdf | geg fdb | gab [df]bb | dba gf=e |1 
 fff f2 :|2 fgf _edc!D.C.! ||


## Train/Validation data envoding

In [None]:
# encode training data
# dataset = dataset.map(lambda x: {"abc notation": encode(x["abc notation"])})

training_data = torch.tensor(encode(dataset["train"]["abc notation"]))
print(f"training_data: {training_data.shape}")


validation_data = torch.tensor(encode(dataset["validation"]["abc notation"]))
print(f"validation_data: {validation_data.shape}")

training_data: torch.Size([57371649])
validation_data: torch.Size([570241])


## Data batch

In [48]:
# bach generator
def get_batch(split, block_size=8, bach_size=32):
    if split == "train":
        data = training_data
    elif split == "validation":
        data = validation_data
    else:
        raise ValueError("split must be 'train' or 'validation'")
    start_idx = torch.randint(0, data.size(0) - block_size, (bach_size,))
    x = torch.stack([data[idx:idx+block_size] for idx in start_idx]).to(device)
    y = torch.stack([data[idx+1:idx+block_size+1] for idx in start_idx]).to(device)
    return x, y

In [49]:
torch.manual_seed(42)
bach_size = 1
block_size = 48
x, y = get_batch("train", block_size=block_size, bach_size=bach_size)
for b in range(bach_size):
    for t in range(block_size):
        context = x[b, :t+1]
        target = y[b, t]
    print(context.tolist(), "->", target.item())
    print(decode(context.tolist()), "->", decode([target.item()]))

[59, 29, 27, 22, 24, 21, 63, 81, 47, 29, 20, 18, 27, 63, 81, 52, 29, 22, 18, 27, 32, 20, 19, 19, 63, 81, 48, 29, 25, 18, 27, 63, 81, 46, 29, 42, 63, 81, 4, 37, 71, 71, 4, 74, 71, 71, 99, 72] -> 37
X:8352\nL:1/8\nQ:3/8=100\nM:6/8\nK:G\n Bdd gdd | e -> B


## Estimate Loss

In [56]:
with torch.no_grad():
    def estimate_loss(model, eval_iters, block_size):
        out = {}
        model.eval()
        for split in dataset:
            losses = torch.zeros(eval_iters)
            for i in range(eval_iters):
                x, y = get_batch(split, block_size)
                _, loss = model(x, y)
                losses[i] = loss.item()
            out[split] = losses.mean()
        model.train()
        return out

## Bigram Model

In [50]:
import torch.nn as nn
import torch.nn.functional as F

In [None]:
class BigramModel(nn.Module):
    def __init__(self, vocab_size):
        super(BigramModel, self).__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)
    def forward(self, idx, targets=None):
        logits = self.token_embedding_table(idx)
        B, T, C = logits.size()
        
        if targets is not None:
            loss = F.cross_entropy(logits.view(B*T, C), targets.view(B*T))
        else:
                loss = None
        return logits, loss
    
    def generate(self, idx, n):
        for _ in range(n):
            logits = self.token_embedding_table(idx)
            next_idx = torch.multinomial(F.softmax(logits[:, -1], dim=1), 1)
            idx = torch.cat([idx, next_idx], dim=1)
        return idx

In [None]:
x, y = get_batch("train")
m = BigramModel(vocab_size)
m.to(device)
logits, loss = m(x, y)  
print(logits.shape)
print(loss)

torch.Size([32, 8, 1000])
tensor(7.3779, device='cuda:0', grad_fn=<NllLossBackward0>)


In [None]:
idx = torch.zeros(1, 1).long().to(device)
g = m.generate(idx, 100)
decode(g[0].tolist())

'dedc ]" ged FAA gdc >^ Amin BGAG cBAB \\\'/ q e 88 Bc AFA edBd A4 3EFG ded dor Bdg ]/[ {/ ga dBA GAG dBBA f2e2 a D7 3def /). DGBG Adf 116 s cee BB e2e2 \', afa |:" FDE 44 ||", fe Tf2 AGFE Z 69 - Bd FAc 77 egdg efed 12 afdf ga cAc egdB faa geg #" 24 a3 ly gec cdc BdBG BAA AGEG cAGE eB af GFD gagf B2B2 ce 3DEF |]: BdBG nL EFA d2 c2B2 c2e2 or 77 GEC fded low F6 ac 55 Bdf ]/ >" 2 dfd'

### training

In [None]:
with torch.no_grad():
    def estimate_loss(model, eval_iters, block_size):
        out = {}
        model.eval()
        for split in dataset:
            losses = torch.zeros(eval_iters)
            for i in range(eval_iters):
                x, y = get_batch(split, block_size)
                _, loss = model(x, y)
                losses[i] = loss.item()
            out[split] = losses.mean()
        model.train()
        return out

In [None]:
n_iters = 1000
optimizer = torch.optim.Adam(m.parameters(), lr=0.001)

In [None]:
for step in range(n_iters):
    x, y = get_batch("train")
    optimizer.zero_grad()
    logits, loss = m(x, y)
    loss.backward()
    optimizer.step()
    if step % (n_iters//10) == 0:
        losses = estimate_loss(m, 100, block_size)
        print(f"step: {step}, train loss: {losses['train']:.3f}, validation loss: {losses['validation']:.3f}")

step: 0, train loss: 7.402, validation loss: 7.404
step: 100, train loss: 7.324, validation loss: 7.322
step: 200, train loss: 7.229, validation loss: 7.233
step: 300, train loss: 7.146, validation loss: 7.148
step: 400, train loss: 7.054, validation loss: 7.054
step: 500, train loss: 6.969, validation loss: 6.969
step: 600, train loss: 6.890, validation loss: 6.888
step: 700, train loss: 6.811, validation loss: 6.803
step: 800, train loss: 6.728, validation loss: 6.720
step: 900, train loss: 6.650, validation loss: 6.642


In [None]:
idx = torch.zeros(1, 1).long().to(device)
g = m.generate(idx, 500)

decode(g[0].tolist()).replace("\n", "")

'39 dc fddf ve EAA dcd efgf 3ABA Bee Bdf fdcA cBcA \'/ 69 dGBG e3 cAc cBc efg } im B3 ]) DEFG BF 42 efg A4 !> egdg adfd ,, GGG 54 dBG 88 cAB ff 48 gg fdec Bdef Cm GG c2 36 bag |"^( Gc agef 70 |] efga z4 Bf 3efg GFE BGdG f3 eaag ABG 16 FGAc ceg BcBA dAF _B "" BGBd ag Ec 17 BABc 3 ea |" C7 gedc B2d2 _e 3A gfg BGB FD FAdA cdB dcAG & EAA edef 34 BGA it al Db gef gab 26 F2A2 D6 aec eG Adde GFEF ecdB ve ||\', FGAF dBAB g6 FAdf n 35 ABde dBAG ,< ]" dg FAF 3ded edB gd fdcA ||"^ GFEF 11 cAFA >^ G2G2 SO lide 59 3BcB age GFE Gc # BGE ecBc aba 38 egdB im Ae BAF Edor 180 BF Af CF aA "{ ba ca 35 fdf age d3 - dF GEC ." S cBcA AGEF GFED BdBG 47 Ab GABc 21 s fgag d4 fa 23 im dGBG gabg efg cBc ar dcA Te GF 9 A z3 3BAG DC BAA fece cBAc I BG H fA Amin GABc dfec AFF cdB eG 3ABA on GBA 27 dE B4 g3 gfe BAF cGE wedge z f2f2 AD dcAF 3AAA eA I V g6 : & 92 GBB U ga Add 54 BdAF AcBA cAG eAcA c4 f2f2 b2 dAFA CA Gg ." baf aba s 46 FE EB Ab #" cdeg _ 56 wedge j /). 66 im AGFG DAFA DGG c2B2 ||" AGE 40 d2B2 EGB efg de

## LanguageModel

In [51]:
# Basic  transformer components
class SelfAttention(nn.Module):
    def __init__(self, embed_size, head_size, dropout=0.1, block_size=8):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.head_size = head_size
        
        self.keys = nn.Linear(self.embed_size, self.head_size, bias=False)
        self.queries = nn.Linear(self.embed_size, self.head_size, bias=False)
        self.value = nn.Linear(self.embed_size, self.head_size, bias=False)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        B, T, C = x.shape
        k = self.keys(x) # B, block_size, head_size
        q = self.queries(x) # B, block_size, head_size
        v = self.value(x)

        tril = torch.tril(torch.ones(T, T)).to(x.device)    
        wei = q @ k.transpose(-2, -1) * self.head_size**-0.5  # (B, block_size, head_size) @ (B, head_size, block_size) -> (B, block_size, block_size)
        wei = wei.masked_fill(tril == 0, float('-inf')) # B, block_size, block_size
        wei = torch.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        out = wei @ v
        return out
 
class Mlp(nn.Module):
    def __init__(self, embed_size, mlp_size, dropout=0.1):
        super(Mlp, self).__init__()
        self.embed_size = embed_size
        self.mlp_size = mlp_size
        self.mlp = nn.Sequential(
            nn.Linear(embed_size, mlp_size),
            nn.ReLU(),
            nn.Linear(mlp_size, embed_size),
            nn.Dropout(dropout)
        )
        
    def forward(self, x):
        return self.mlp(x)
    
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, head_size, n_heads, dropout=0.1, block_size=8):
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.head_size = head_size
        self.num_heads = n_heads
        self.attentions = nn.ModuleList([SelfAttention(embed_size, head_size, block_size=block_size) for _ in range(n_heads)])
        self.proj = nn.Linear(n_heads * head_size, embed_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        out = torch.cat([attn(x) for attn in self.attentions], dim=-1)
        out = self.proj(out)
        out = self.dropout(out)
        return out

In [52]:
# Block
class Block(nn.Module): 
    def __init__(self, embed_size, mlp_size, n_heads, dropout=0.1, block_size=8):
        super(Block, self).__init__()
        self.embed_size = embed_size
        self.head_size = embed_size // n_heads
        self.mlp_size = mlp_size
        self.n_heads = n_heads
        
        self.ln1 = nn.LayerNorm(embed_size)
        self.ln2 = nn.LayerNorm(embed_size)
        self.mha = MultiHeadAttention(embed_size, self.head_size, n_heads, dropout, block_size)
        self.mlp = Mlp(embed_size, mlp_size, dropout)
        
    def forward(self, x):
        out = self.mha(x) + self.ln1(x)
        out = self.mlp(self.ln2(out)) + out
        return out

In [53]:
class LanguageModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_blocks=8, block_size=8, n_heads=8, dropout=0.1):
        super(LanguageModel, self).__init__()
        self.block_size = block_size
        self.token_embedding_table = nn.Embedding(vocab_size, embedding_dim)
        self.positional_embedding_table = nn.Embedding(block_size, embedding_dim)
        self.blocks = nn.Sequential(*
                                    [Block(embedding_dim,
                                           embedding_dim*4,
                                           n_heads, dropout,
                                           block_size) for _ in range(n_blocks)],
                                    nn.LayerNorm(embedding_dim)
                                    )
        self.lm_head = nn.Linear(embedding_dim, vocab_size)
        
    def forward(self, x, targets=None):
        B, T = x.shape
        token_embeddings = self.token_embedding_table(x) # B, T, C
        positional_embeddings = self.positional_embedding_table(torch.arange(T).to(device) )# T, C
        x = token_embeddings + positional_embeddings # 
        x = self.blocks(x)
        logits = self.lm_head(x)
        
        B, T, C = logits.size()
        if targets is not None:
            loss = F.cross_entropy(logits.view(B*T, C), targets.view(B*T))
        else:
                loss = None
                
        return logits, loss
    
    def generate(self, idx, n):
        for _ in range(n):
            logits, _ = self(idx[:, -self.block_size:])
            next_idx = torch.multinomial(F.softmax(logits[:, -1], dim=1), 1)
            idx = torch.cat([idx, next_idx], dim=1)
        return idx

    def save_model(self, path):
        torch.save(self.state_dict(), path)
        
    def load_model(self, path):
        self.load_state_dict(torch.load(path))
          

In [57]:
embedding_dim = 384 
n_heads = 6
head_size = 32//n_heads 
block_size = 256 # context window size
bach_size = 64
n_iters = 1000
lr = 3e-4
n_blocks = 6    
dropout = 0.2
m = LanguageModel(vocab_size=vocab_size,
                  embedding_dim=embedding_dim,
                  block_size=block_size,
                  n_heads=n_heads,
                  dropout=dropout,)
m.to(device)
optimizer = torch.optim.Adam(m.parameters(), lr=lr)

In [None]:
from tqdm import tqdm
for step in tqdm(range(n_iters+1),  total=n_iters+1, desc="Training Iterations"): 
    x, y = get_batch("train", block_size, bach_size)
    optimizer.zero_grad()
    logits, loss = m(x, y)
    loss.backward()
    optimizer.step()
    if step % (n_iters//5) == 0 and step :
        losses = estimate_loss(m, 25, block_size=block_size)
        print(f"step: {step}, train loss: {losses['train']:.3f}, validation loss: {losses['validation']:.3f}")

m.save_model(r"models/model.pth")

Training Iterations:  10%|█         | 101/1001 [01:09<1:45:14,  7.02s/it]

step: 100, train loss: 2.179, validation loss: 2.168


Training Iterations:  20%|██        | 201/1001 [02:19<1:36:04,  7.21s/it]

step: 200, train loss: 1.840, validation loss: 1.837


Training Iterations:  30%|███       | 301/1001 [03:31<1:24:00,  7.20s/it]

step: 300, train loss: 1.625, validation loss: 1.623


Training Iterations:  40%|████      | 401/1001 [04:43<1:12:42,  7.27s/it]

step: 400, train loss: 1.509, validation loss: 1.498


Training Iterations:  50%|█████     | 501/1001 [05:56<1:00:36,  7.27s/it]

step: 500, train loss: 1.401, validation loss: 1.401


Training Iterations:  60%|██████    | 601/1001 [07:10<48:26,  7.27s/it]  

step: 600, train loss: 1.314, validation loss: 1.314


Training Iterations:  70%|███████   | 701/1001 [08:21<37:01,  7.40s/it]

step: 700, train loss: 1.253, validation loss: 1.253


Training Iterations:  80%|███████▉  | 800/1001 [09:10<01:44,  1.92it/s]

In [None]:
def generate(model, prompt, n):
    encoded_prompt = torch.tensor(encode(prompt)).unsqueeze(0).to(device)
    print(encoded_prompt)
    out = model.generate(encoded_prompt, n)
    return decode(out[0].tolist())

In [None]:
prompt = """
M:4/4
C:Trad.
K:G
|:GABc dedB|dedB dedB|c2ec B2dB|c2A2 A2BA|
"""
m = LanguageModel(vocab_size=vocab_size,
                  embedding_dim=embedding_dim,
                  block_size=block_size,
                  n_heads=n_heads,
                  dropout=dropout)
# m.load_model(r"models/model.pth")
m.to(device)
print(generate(m, prompt, 64))

tensor([[  0,  46,  27,  21,  16,  21,  36,  27,  53,  83, 341,  15,  44,  27,
          40, 135, 324, 594,  93, 594, 594,  93, 103, 125, 102, 113,  93, 722,
          98,  99,  93,   1]], device='cuda:0')
M : 4 / 4 C : T r ad . K : G |: GABc dedB | dedB dedB | c2 ec B2 dB | c2A2 A2 BA | abc ADD ` C2 |"^( GABd GBAF ccc 14 94 EF edcB G3 j fdB 115 72 |[ N 34 "^/" fd EF edBd B2G2 fg 6 aga gfe F F d2d2 cBAB Q BdAF cdeg ABcA a ). G2G2 94 48 cdef GEE Bd ceg dfa AGFD GFEF k ddd cAeA C3 gef "{ Amin Ee afec ||[ c2A2 egdB BGGB dBAG c2
