<a href="https://colab.research.google.com/github/Vaishnavi-TCD/TCD/blob/main/FinalML_with_data_augmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from nltk.translate.bleu_score import sentence_bleu
import random

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2
# ------------

torch.manual_seed(1337)

# Load melody dataset
with open('inputMelodiesAugmented.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# Define unique tokens for melody generation
chars = sorted(list(set(text)))
vocab_size = len(chars)
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s]  # Encode sequence
decode = lambda l: ''.join([itos[i] for i in l])  # Decode sequence

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9 * len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# Data loading
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i + block_size] for i in ix])
    y = torch.stack([data[i + 1:i + block_size + 1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

# Function to compute BLEU score
def compute_bleu(reference, candidate):
    reference_tokens = list(reference)  # Tokenize reference melody
    candidate_tokens = list(candidate)  # Tokenize generated melody
    return sentence_bleu([reference_tokens], candidate_tokens)

@torch.no_grad()
def evaluate_bleu_score():
    model.eval()
    references = []
    candidates = []
    for _ in range(eval_iters):
        X, Y = get_batch('val')  # Validation batch
        logits, _ = model(X)
        generated = model.generate(X[:, :1], max_new_tokens=block_size)
        references.append(decode(Y[0].tolist()))  # Actual melody
        candidates.append(decode(generated[0].tolist()))  # Predicted melody

    # Compute BLEU scores for all sequences
    bleu_scores = [compute_bleu(ref, cand) for ref, cand in zip(references, candidates)]
    avg_bleu = sum(bleu_scores) / len(bleu_scores)
    model.train()
    return avg_bleu

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        avg_bleu = evaluate_bleu_score()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}, BLEU score {avg_bleu:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated_sequence = model.generate(context, max_new_tokens=500)
print("Generated Melody:", decode(generated_sequence[0].tolist()))

# Random baseline melody
def random_melody(length):
    notes = list(stoi.keys())
    return ''.join(random.choices(notes, k=length))

print("Random Melody:", random_melody(100))


10.74971 M parameters


KeyboardInterrupt: 

In [None]:
datasets = [
    '/content/inputMelodiesAugmented.txt',

]

for file_path in datasets:
    with open(file_path, 'r', encoding='utf-8') as f:
        dataset_text = f.read()

    chars = sorted(list(set(dataset_text)))
    vocab_size = len(chars)
    total_length = len(dataset_text)

    print(f"Dataset: {file_path}")
    print(f"Total number of characters: {total_length}")
    print(f"Vocabulary size: {vocab_size}")
    print(f"Unique characters: {chars}")
    print('-' * 50)


Dataset: /content/inputMelodiesAugmented.txt
Total number of characters: 710155
Vocabulary size: 14
Unique characters: ['\n', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'R', 'a', 'c', 'd', 'f', 'g']
--------------------------------------------------


In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from nltk.translate.bleu_score import sentence_bleu

# hyperparameters
batch_size = 64
block_size = 256
max_iters = 1000  # Reduced for faster experimentation
eval_interval = 200
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
dropout = 0.2
torch.manual_seed(1337)

file_path = '/content/inputMelodiesAugmented.txt'
with open(file_path, 'r', encoding='utf-8') as f:
    text = f.read()

chars = sorted(list(set(text)))
vocab_size = len(chars)

stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])

data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i : i + block_size] for i in ix])
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

# Function to compute BLEU score
def compute_bleu(reference, candidate):
    reference_tokens = list(reference)
    candidate_tokens = list(candidate)
    return sentence_bleu([reference_tokens], candidate_tokens)

@torch.no_grad()
def evaluate_bleu_score():
    model.eval()
    references = []
    candidates = []
    for _ in range(eval_iters):
        X, Y = get_batch('val')
        logits, _ = model(X)
        generated = model.generate(X[:, :1], max_new_tokens=block_size)
        references.append(decode(Y[0].tolist()))
        candidates.append(decode(generated[0].tolist()))

    bleu_scores = [compute_bleu(ref, cand) for ref, cand in zip(references, candidates)]
    avg_bleu = sum(bleu_scores) / len(bleu_scores)
    model.train()
    return avg_bleu

class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        wei = q @ k.transpose(-2, -1) * k.shape[-1] ** -0.5
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        out = wei @ v
        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))
        x = tok_emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)
        loss = F.cross_entropy(logits.view(-1, vocab_size), targets.view(-1)) if targets is not None else None
        return logits, loss

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, _ = self(idx_cond)
            probs = F.softmax(logits[:, -1, :], dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

# Downsize configurations |
configs = [
    {"n_embd": 128, "n_head": 2, "n_layer": 2},
    {"n_embd": 96, "n_head": 4, "n_layer": 5},
    {"n_embd": 128, "n_head": 3, "n_layer": 3},
    {"n_embd": 160, "n_head": 3, "n_layer": 3},
    {"n_embd": 144, "n_head": 2, "n_layer": 3},
    {"n_embd": 192, "n_head": 3, "n_layer": 3},
    {"n_embd": 256, "n_head": 2, "n_layer": 2},
    {"n_embd": 128, "n_head": 4, "n_layer": 4},
    {"n_embd": 160, "n_head": 4, "n_layer": 3},
    {"n_embd": 192, "n_head": 4, "n_layer": 2},
]

for i, config in enumerate(configs):
    n_embd = config["n_embd"]
    n_head = config["n_head"]
    n_layer = config["n_layer"]

    model = GPTLanguageModel()
    model = model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    param_count = sum(p.numel() for p in model.parameters())
    print(f"Configuration {i + 1}: {config}")
    print(f"Total Parameters: {param_count / 1e6:.6f} M")

    for iter in range(max_iters):
        if iter % eval_interval == 0 or iter == max_iters - 1:
            losses = estimate_loss()
            avg_bleu = evaluate_bleu_score()
            print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}, BLEU score {avg_bleu:.4f}")
        xb, yb = get_batch('train')
        logits, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

    context = torch.zeros((1, 1), dtype=torch.long, device=device)
    generated_text = decode(model.generate(context, max_new_tokens=500)[0].tolist())
    file_name = f"generated_melody_config_{i + 1}_params_{param_count}.txt"
    # with open(file_name, "w") as f:
    #     f.write(generated_text)
    # print(f"Generated melody saved to {file_name}")


Configuration 1: {'n_embd': 128, 'n_head': 2, 'n_layer': 2}
Total Parameters: 0.432398 M


The hypothesis contains 0 counts of 4-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 3-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()


step 0: train loss 2.6544, val loss 2.6582, BLEU score 0.0307
step 200: train loss 1.8435, val loss 1.7512, BLEU score 0.1180
step 400: train loss 1.7395, val loss 1.6473, BLEU score 0.1229


The hypothesis contains 0 counts of 2-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()


step 600: train loss 1.6970, val loss 1.6175, BLEU score 0.1181
step 800: train loss 1.6439, val loss 1.5652, BLEU score 0.1055
step 999: train loss 1.5697, val loss 1.5029, BLEU score 0.1069
Configuration 2: {'n_embd': 96, 'n_head': 4, 'n_layer': 5}
Total Parameters: 0.585230 M


KeyboardInterrupt: 

In [None]:
# melody generation
import torch
import torch.nn as nn
from torch.nn import functional as F

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 1000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
# n_embd = 384
# n_head = 6
# n_layer = 6
dropout = 0.2

n_embd= 192
n_head = 4
n_layer = 2
# ------------

torch.manual_seed(1337)

# Load melody data
with open('inputMelodiesAugmented.txt', 'r', encoding='utf-8') as f:
    text = f.read()
text = text.replace(' ', '')  # Remove spaces between tokens

# Define the vocabulary for musical notes
# chars = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B", "R"]
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from notes/rests to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))


0.943502 M parameters
step 0: train loss 2.6607, val loss 2.6690
step 500: train loss 1.7032, val loss 1.6212
step 999: train loss 1.4990, val loss 1.4471

fGfDFddDcfAAAfGfGAAGGGAfRCCBAACAAAfBDRCAGfDRDDddccRfffFdRfFfFFdDEFdDdDccfGfGFdDDAfGfGAffffDGAGffEDBRGCCABAAfGfRAAfGGGfGGffGfGAfgGfGAgfREDcRDDdcBCAAC
RDEDDDFFdDDDFDFdFDDDDDFFFdFCCaDDDRFdDDdDDCdFDDDDRcaDFFDdcdDcDCaFDDDDFDCCAAFDdDDddcDdDFddDCaaAFFdCDDDDDdDCaaadAEDdDCACEDaFGaaAGGFddDdFdcRBCRCDcDdAaCCaRDDDDFDDdCDdEdEDdDFDdDCaRaCaCaCRDDFdfgfGFEEdDdddDdDDdddEDDDdDcDDCcDdDcDEdDCDCaagDEDDdDDdDCdFDDddcCaGRaCCRCDdfRDacDDCcDDdDDdCACRCCaCCDRaCCaAARDddDFGARCEGDRaRaAGGRCDdfEAFEDFFEFRFEDFFRfEDdDFaaDDdDCRfffgGRa


In [None]:
# multiple configurations
import torch
import torch.nn as nn
from torch.nn import functional as F

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 1000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
dropout = 0.2

configs = [
    {"n_embd": 128, "n_head": 2, "n_layer": 2},
    {"n_embd": 96, "n_head": 4, "n_layer": 5},
    {"n_embd": 128, "n_head": 3, "n_layer": 3},
    {"n_embd": 160, "n_head": 3, "n_layer": 3},
    {"n_embd": 144, "n_head": 2, "n_layer": 3},
    {"n_embd": 192, "n_head": 3, "n_layer": 3},
    {"n_embd": 256, "n_head": 2, "n_layer": 2},
    {"n_embd": 128, "n_head": 4, "n_layer": 4},
    {"n_embd": 160, "n_head": 4, "n_layer": 3},
    {"n_embd": 192, "n_head": 4, "n_layer": 2},
]

# ------------

torch.manual_seed(1337)

# Load melody data
with open('inputMelodiesAugmented.txt', 'r', encoding='utf-8') as f:
    text = f.read()
text = text.replace(' ', '')  # Remove spaces between tokens

# Define the vocabulary for musical notes
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from notes/rests to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self, n_embd, n_head, n_layer):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

# Testing multiple configurations
for config in configs:
    n_embd = config["n_embd"]
    n_head = config["n_head"]
    n_layer = config["n_layer"]

    print(f"Training with config: n_embd={n_embd}, n_head={n_head}, n_layer={n_layer}")
    model = GPTLanguageModel(n_embd, n_head, n_layer).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    for iter in range(max_iters):

        # every once in a while evaluate the loss on train and val sets
        if iter % eval_interval == 0 or iter == max_iters - 1:
            losses = estimate_loss()
            print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

        # sample a batch of data
        xb, yb = get_batch('train')

        # evaluate the loss
        logits, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

    print("Completed training for current configuration\n")


Training with config: n_embd=128, n_head=2, n_layer=2
step 0: train loss 2.6544, val loss 2.6582
step 500: train loss 1.7169, val loss 1.6351
step 999: train loss 1.5744, val loss 1.4997
Completed training for current configuration

Training with config: n_embd=96, n_head=4, n_layer=5
step 0: train loss 2.6553, val loss 2.6552
step 500: train loss 1.7524, val loss 1.6719
step 999: train loss 1.6870, val loss 1.6166
Completed training for current configuration

Training with config: n_embd=128, n_head=3, n_layer=3
step 0: train loss 2.6547, val loss 2.6484
step 500: train loss 1.7236, val loss 1.6430
step 999: train loss 1.6731, val loss 1.6022
Completed training for current configuration

Training with config: n_embd=160, n_head=3, n_layer=3
step 0: train loss 2.6630, val loss 2.6627
step 500: train loss 1.7061, val loss 1.6245
step 999: train loss 1.6184, val loss 1.5585
Completed training for current configuration

Training with config: n_embd=144, n_head=2, n_layer=3
step 0: train l

In [None]:
# Training with config: n_embd=256, n_head=2, n_layer=2
import torch
import torch.nn as nn
from torch.nn import functional as F

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
# n_embd = 384
# n_head = 6
# n_layer = 6
dropout = 0.2

n_embd= 256
n_head = 2
n_layer = 2
# ------------

torch.manual_seed(1337)

# Load melody data
with open('inputMelodiesAugmented.txt', 'r', encoding='utf-8') as f:
    text = f.read()
text = text.replace(' ', '')  # Remove spaces between tokens

# Define the vocabulary for musical notes
# chars = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B", "R"]
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from notes/rests to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))


1.651214 M parameters
step 0: train loss 2.6910, val loss 2.6946
step 500: train loss 1.6156, val loss 1.5390
step 1000: train loss 1.3920, val loss 1.3461
step 1500: train loss 1.3045, val loss 1.2728
step 2000: train loss 1.2738, val loss 1.2534
step 2500: train loss 1.2471, val loss 1.2409
step 3000: train loss 1.2364, val loss 1.2377
step 3500: train loss 1.2251, val loss 1.2275
step 4000: train loss 1.2065, val loss 1.2211
step 4500: train loss 1.2004, val loss 1.2238
step 4999: train loss 1.1870, val loss 1.2185

RfEEDFFEECEDCBCCCCCCEECCDCCARAffEfAfEDRfffREEDFFEEDRFDaFEDDFEECCGCCCARAAGGfRCCCCCDEFFEEDCgFRAAacFECECGCCCCCCEGCGACRGFCCAaARAAAAGGfRDaDCCCCDEFFECCEGCECEDEFECCECREEDCEFFECCECCCGCGACRACCCARGFGAaccccaEFCEEDDCEFECCGFEECGFEEDDEFCCEECCCCCDDDRBCCCCCaCCCEGCECCCCCCCCCARDCCARAAGGDCCRCCCCCCARFFcEEEDEFECCEFECREFECCgEECCCCCaaCCCGACRAGGRFCCFfEEDDEEFECCCCGFGECCCCGFEEEDCCECCCCCCCCDDARFFcBBBBBcDBDCEFECCCGFEECCCCCCDRFFcECCCGFGECGCEFECCCGFEECCCCCGFEECCCARAGGRFFcECCCCdCCGFCEECCCGFECCCCGGFECC

In [None]:
# perplexity and base line model included
import torch
import torch.nn as nn
from torch.nn import functional as F
from collections import Counter
import random

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
dropout = 0.2

n_embd= 256
n_head = 2
n_layer = 2
# ------------

torch.manual_seed(1337)

# Load melody data
with open('inputMelodiesAugmented.txt', 'r', encoding='utf-8') as f:
    text = f.read()
text = text.replace(' ', '')  # Remove spaces between tokens

# Define the vocabulary for musical notes
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from notes/rests to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

@torch.no_grad()
def calculate_perplexity(loss):
    return torch.exp(loss)

# Baseline unigram model
def unigram_model(data):
    freq = Counter(data.tolist())
    total = sum(freq.values())
    prob = {k: v / total for k, v in freq.items()}
    return prob

unigram_probs = unigram_model(train_data)

def unigram_generate(length):
    return ''.join([itos[random.choices(list(unigram_probs.keys()), list(unigram_probs.values()))[0]] for _ in range(length)])

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        perplexity = {split: calculate_perplexity(losses[split]) for split in ['train', 'val']}
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}, train perplexity {perplexity['train']:.4f}, val perplexity {perplexity['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print("Generated Melody (GPT Model):")
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))

# Generate melody using unigram baseline
print("Generated Melody (Unigram Model):")
print(unigram_generate(500))




1.651214 M parameters
step 0: train loss 2.6910, val loss 2.6946, train perplexity 14.7471, val perplexity 14.7997
step 500: train loss 1.6156, val loss 1.5390, train perplexity 5.0308, val perplexity 4.6599
step 1000: train loss 1.3920, val loss 1.3461, train perplexity 4.0230, val perplexity 3.8426
step 1500: train loss 1.3045, val loss 1.2728, train perplexity 3.6858, val perplexity 3.5708
step 2000: train loss 1.2738, val loss 1.2534, train perplexity 3.5743, val perplexity 3.5022
step 2500: train loss 1.2471, val loss 1.2409, train perplexity 3.4803, val perplexity 3.4588
step 3000: train loss 1.2364, val loss 1.2377, train perplexity 3.4434, val perplexity 3.4475
step 3500: train loss 1.2251, val loss 1.2275, train perplexity 3.4046, val perplexity 3.4126
step 4000: train loss 1.2065, val loss 1.2211, train perplexity 3.3418, val perplexity 3.3911
step 4500: train loss 1.2004, val loss 1.2238, train perplexity 3.3216, val perplexity 3.4000
step 4999: train loss 1.1870, val loss 1

In [5]:
#playing the sequence

# -*- coding: utf-8 -*-
"""
@author: Giovanni Di Liberto
See description in the assignment instructions.
"""

from pydub import AudioSegment
import numpy as np
import simpleaudio as sa

# Define note frequencies (A4 = 440 Hz)
#NOTE_FREQUENCIES = {
#    'C': 261.63,
#    'D': 293.66,
#    'E': 329.63,
#    'F': 349.23,
#    'G': 392.00,
#    'A': 440.00,
#    'B': 493.88,
#    'R': 0  # Rest (no sound)
#}

NOTE_FREQUENCIES = {
    'C': 261.63,
    'c': 277.18,  # C#
    'D': 293.66,
    'd': 311.13,  # D#
    'E': 329.63,
    'F': 349.23,
    'f': 369.99,  # F#
    'G': 392.00,
    'g': 415.30,  # G#
    'A': 440.00,
    'a': 466.16,  # A#
    'B': 493.88,
    'R': 0     # Rest
}


# Generate a sine wave for a given frequency
def generate_sine_wave(frequency, duration_ms, sample_rate=44100, amplitude=0.5):
    t = np.linspace(0, duration_ms / 1000, int(sample_rate * duration_ms / 1000), False)
    wave = 0.5 * amplitude * np.sin(2 * np.pi * frequency * t)
    wave = (wave * 32767).astype(np.int16)
    audio_segment = AudioSegment(
        wave.tobytes(),
        frame_rate=sample_rate,
        sample_width=wave.dtype.itemsize,
        channels=1
    )
    return audio_segment

# Function to create a sequence of notes
def create_sequence(note_sequence, duration_ms=500):
    song = AudioSegment.silent(duration=0)
    for note in note_sequence:
        if note == 'R':  # Handle rest
            segment = AudioSegment.silent(duration=duration_ms)
        else:
            frequency = NOTE_FREQUENCIES[note]
            segment = generate_sine_wave(frequency, duration_ms)
        song += segment
    return song

# Example sequence (You can replace this with your sequence)
#sequence = "C C G G A A G F F E E D D C G G F F E E D G G F F E E D C C G G A A G F F E E D D C".split()
# gpt sequence
# sequence = "RfEEDFFEECEDCBCCCCCCEECCDCCARAffEfAfEDRfffREEDFFEEDRFDaFEDDFEECCGCCCARAAGGfRCCCCCDEFFEEDCgFRAAacFECECGCCCCCCEGCGACRGFCCAaARAAAAGGfRDaDCCCCDEFFECCEGCECEDEFECCECREEDCEFFECCECCCGCGACRACCCARGFGAaccccaEFCEEDDCEFECCGFEECGFEEDDEFCCEECCCCCDDDRBCCCCCaCCCEGCECCCCCCCCCARDCCARAAGGDCCRCCCCCCARFFcEEEDEFECCEFECREFECCgEECCCCCaaCCCGACRAGGRFCCFfEEDDEEFECCCCGFGECCCCGFEEEDCCECCCCCCCCDDARFFcBBBBBcDBDCEFECCCGFEECCCCCCDRFFcECCCGFGECGCEFECCCGFEECCCCCGFEECCCARAGGRFFcECCCCdCCGFCEECCCGFECCCCGGFECCCCCCGFEEECCCGFEECCCCCGFEE"
# baseline sequence
sequence = "ABacfFFGEEgEFCagggcAFRDGcRDEBRGFgGBfGcfaaEgEBGFGBdCCADdfDGRCdBcgcFgFAfEEFCDfRBFRgABCfGfGfgFddRBCDgdBcDGGdgfcBRgAdfEaDaEfFdffEdgEGgEAGFgEAdDBGDdgFEAFfEAcGacfagABCCfGgGDCcBABBafcBAEcDGBcgagGFdRDCEdEFRAddEFBBFFDagBfaFDFgcGgBdfAdACfCDAFEcAaAaBgRGCfCAFdAcECadARcafdBCBDfgAfgCGGGffgAaRAEFGdECEFfFBBBcAAddFcRFEcEAffADgDDAFgdEFRGCcgAEgcdcCFfgcBfccEBGdaDgCGfgEGRRcfCDCFFcGCRAcCARaFgGCfcCBfFRDgAaCECfFcAgECfRAdGgRdfCGgAECDfFaGADRfdEdADfDfcCffARRAFEgDDFCGfEdBfEdCBGBcfddfcFddGdBGFagCAcDGFCRCCGgEDFAaEdEdGgBAdDAg"

# Create the sequence
song = create_sequence(sequence, duration_ms=500)  # 500ms per note

# Save the song to a .wav file
song.export("nursery_rhyme_baseline.wav", format="wav")

# Play the .wav file using simpleaudio
wave_obj = sa.WaveObject.from_wave_file("nursery_rhyme_baseline.wav")
play_obj = wave_obj.play()
play_obj.wait_done()


SimpleaudioError: Error opening PCM device. -- CODE: -2 -- MSG: No such file or directory

In [5]:
# normalize the dataset
# Normalization: Convert all notes to uppercase
def normalize_notes(melodies):
    normalized = []
    for melody in melodies:
        normalized.append(melody.strip().upper())  # Convert to uppercase and remove extra spaces
    return normalized

# Load melodies
with open('inputMelodiesAugmented.txt', 'r') as file:
    melodies = file.readlines()

# Apply normalization
normalized_melodies = normalize_notes(melodies)

# Save normalized melodies
with open('normalizedMelodies.txt', 'w') as file:
    for melody in normalized_melodies:
        file.write(melody + '\n')

print("Normalization completed. Saved to 'normalizedMelodies.txt'")


Normalization completed. Saved to 'normalizedMelodies.txt'


In [6]:
# introduce rhythm and duration
import random

# Define rhythmic durations
durations = ["1/4", "1/8", "1/16"]

# Add rhythm and duration to notes
def add_rhythm(melodies):
    rhythmic_melodies = []
    for melody in melodies:
        rhythmic_melody = []
        for note in melody.split():
            if note != "R":  # Only notes get durations, rests remain the same
                rhythmic_melody.append(f"{note}:{random.choice(durations)}")
            else:
                rhythmic_melody.append(note)  # Keep rests as is
        rhythmic_melodies.append(" ".join(rhythmic_melody))
    return rhythmic_melodies

# Apply rhythm
rhythmic_melodies = add_rhythm(normalized_melodies)

# Save rhythmic melodies
with open('rhythmicMelodies.txt', 'w') as file:
    for melody in rhythmic_melodies:
        file.write(melody + '\n')

print("Rhythm added. Saved to 'rhythmicMelodies.txt'")


Rhythm added. Saved to 'rhythmicMelodies.txt'


In [7]:
# ocatve expansion.. transpose melodies into lower and higher ocatves
# Define pitch-shifting function
def transpose_octave(melodies, shift):
    notes = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B", "R"]
    transposed_melodies = []
    for melody in melodies:
        transposed_melody = []
        for token in melody.split():
            if ":" in token:  # Token with duration
                note, duration = token.split(":")
                if note in notes and note != "R":
                    index = notes.index(note)
                    transposed_note = notes[(index + shift) % 12]  # Transpose note
                    transposed_melody.append(f"{transposed_note}:{duration}")
                else:
                    transposed_melody.append(token)  # Keep rests unchanged
            elif token in notes and token != "R":  # Plain note
                index = notes.index(token)
                transposed_note = notes[(index + shift) % 12]
                transposed_melody.append(transposed_note)
            else:
                transposed_melody.append(token)
        transposed_melodies.append(" ".join(transposed_melody))
    return transposed_melodies

# Apply octave expansion
expanded_melodies = []
for shift in [-1, 0, 1]:  # Shift down an octave, keep original, shift up an octave
    expanded_melodies += transpose_octave(rhythmic_melodies, shift)

# Save expanded melodies
with open('expandedMelodies.txt', 'w') as file:
    for melody in expanded_melodies:
        file.write(melody + '\n')

print("Octave expansion completed. Saved to 'expandedMelodies.txt'")


Octave expansion completed. Saved to 'expandedMelodies.txt'


In [8]:
# additional data sugmentation techniques
#Apply random pitch-shifting, inversion, and noise injection.
# Data Augmentation Techniques

# Random pitch shifting
def random_pitch_shift(melodies, semitones_range=2):
    notes = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B", "R"]
    shifted_melodies = []
    for melody in melodies:
        shifted_melody = []
        shift = random.randint(-semitones_range, semitones_range)  # Random shift within range
        for token in melody.split():
            if ":" in token:
                note, duration = token.split(":")
                if note in notes and note != "R":
                    index = notes.index(note)
                    shifted_note = notes[(index + shift) % 12]
                    shifted_melody.append(f"{shifted_note}:{duration}")
                else:
                    shifted_melody.append(token)
            elif token in notes and token != "R":
                index = notes.index(token)
                shifted_note = notes[(index + shift) % 12]
                shifted_melody.append(shifted_note)
            else:
                shifted_melody.append(token)
        shifted_melodies.append(" ".join(shifted_melody))
    return shifted_melodies

# Invert melody
def invert_melody(melodies):
    notes = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B", "R"]
    inverted_melodies = []
    for melody in melodies:
        inverted_melody = []
        for token in melody.split():
            if ":" in token:
                note, duration = token.split(":")
                if note in notes and note != "R":
                    index = notes.index(note)
                    inverted_note = notes[-(index + 1)]  # Invert by reversing the index
                    inverted_melody.append(f"{inverted_note}:{duration}")
                else:
                    inverted_melody.append(token)
            else:
                inverted_melody.append(token)
        inverted_melodies.append(" ".join(inverted_melody))
    return inverted_melodies

# Apply augmentations
pitch_shifted_melodies = random_pitch_shift(expanded_melodies)
inverted_melodies = invert_melody(expanded_melodies)

# Combine all augmented datasets
augmented_dataset = expanded_melodies + pitch_shifted_melodies + inverted_melodies

# Save the final augmented dataset
with open('finalAugmentedMelodies.txt', 'w') as file:
    for melody in augmented_dataset:
        file.write(melody + '\n')

print("Data augmentation completed. Saved to 'finalAugmentedMelodies.txt'")


Data augmentation completed. Saved to 'finalAugmentedMelodies.txt'


In [9]:
# perplexity and base line model included
import torch
import torch.nn as nn
from torch.nn import functional as F
from collections import Counter
import random

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
dropout = 0.2

n_embd= 256
n_head = 2
n_layer = 2
# ------------

torch.manual_seed(1337)

# Load melody data
with open('/content/finalAugmentedMelodies.txt', 'r', encoding='utf-8') as f:
    text = f.read()
text = text.replace(' ', '')  # Remove spaces between tokens

# Define the vocabulary for musical notes
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from notes/rests to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

@torch.no_grad()
def calculate_perplexity(loss):
    return torch.exp(loss)

# Baseline unigram model
def unigram_model(data):
    freq = Counter(data.tolist())
    total = sum(freq.values())
    prob = {k: v / total for k, v in freq.items()}
    return prob

unigram_probs = unigram_model(train_data)

def unigram_generate(length):
    return ''.join([itos[random.choices(list(unigram_probs.keys()), list(unigram_probs.values()))[0]] for _ in range(length)])

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        perplexity = {split: calculate_perplexity(losses[split]) for split in ['train', 'val']}
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}, train perplexity {perplexity['train']:.4f}, val perplexity {perplexity['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print("Generated Melody (GPT Model):")
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))

# Generate melody using unigram baseline
print("Generated Melody (Unigram Model):")
print(unigram_generate(500))




1.651727 M parameters
step 0: train loss 2.8176, val loss 2.8142, train perplexity 16.7371, val perplexity 16.6800
step 500: train loss 1.4976, val loss 1.4925, train perplexity 4.4709, val perplexity 4.4481
step 1000: train loss 1.3028, val loss 1.2995, train perplexity 3.6798, val perplexity 3.6673
step 1500: train loss 1.2237, val loss 1.2189, train perplexity 3.3997, val perplexity 3.3834
step 2000: train loss 1.2023, val loss 1.1968, train perplexity 3.3277, val perplexity 3.3094
step 2500: train loss 1.1831, val loss 1.1825, train perplexity 3.2643, val perplexity 3.2624
step 3000: train loss 1.1670, val loss 1.1629, train perplexity 3.2123, val perplexity 3.1992
step 3500: train loss 1.1454, val loss 1.1400, train perplexity 3.1436, val perplexity 3.1268
step 4000: train loss 1.1365, val loss 1.1360, train perplexity 3.1157, val perplexity 3.1142
step 4500: train loss 1.1290, val loss 1.1228, train perplexity 3.0924, val perplexity 3.0735
step 4999: train loss 1.1129, val loss 1

In [21]:
#playing the sequence

# -*- coding: utf-8 -*-
"""
@author: Giovanni Di Liberto
See description in the assignment instructions.
"""

from pydub import AudioSegment
import numpy as np
import simpleaudio as sa

# Define note frequencies (A4 = 440 Hz)
#NOTE_FREQUENCIES = {
#    'C': 261.63,
#    'D': 293.66,
#    'E': 329.63,
#    'F': 349.23,
#    'G': 392.00,
#    'A': 440.00,
#    'B': 493.88,
#    'R': 0  # Rest (no sound)
#}

NOTE_FREQUENCIES = {
    'C': 261.63,
    'c': 277.18,  # C#
    'D': 293.66,
    'd': 311.13,  # D#
    'E': 329.63,
    'F': 349.23,
    'f': 369.99,  # F#
    'G': 392.00,
    'g': 415.30,  # G#
    'A': 440.00,
    'a': 466.16,  # A#
    'B': 493.88,
    'R': 0     # Rest
}


# Generate a sine wave for a given frequency
def generate_sine_wave(frequency, duration_ms, sample_rate=44100, amplitude=0.5):
    t = np.linspace(0, duration_ms / 1000, int(sample_rate * duration_ms / 1000), False)
    wave = 0.5 * amplitude * np.sin(2 * np.pi * frequency * t)
    wave = (wave * 32767).astype(np.int16)
    audio_segment = AudioSegment(
        wave.tobytes(),
        frame_rate=sample_rate,
        sample_width=wave.dtype.itemsize,
        channels=1
    )
    return audio_segment

# Function to create a sequence of notes
def create_sequence(note_sequence, duration_ms=500):
    song = AudioSegment.silent(duration=0)
    for note in note_sequence:
        if note == 'R':  # Handle rest
            segment = AudioSegment.silent(duration=duration_ms)
        else:
            frequency = NOTE_FREQUENCIES[note]
            segment = generate_sine_wave(frequency, duration_ms)
        song += segment
    return song

# Example sequence (You can replace this with your sequence)
#sequence = "C C G G A A G F F E E D D C G G F F E E D G G F F E E D C C G G A A G F F E E D D C".split()
# gpt sequence
# sequence = "DRAGGAGAARGGGGDGFEDRDGGGEDRBBBBAGEDRDGGFEDBRDGGFEDDBRBDDCBAGAARGABCGABGFDRFBAGFGAGEGGGGGGGFFGGGFGGGFGFDFGDGGGGAFDDCDFGGGGGAGCAACCDFGFGGGFFDFGCDRAGFRDEFRFABCDDRGGGEFGFGFDFGGGGFGGGGFFDFGDDDDDDCARADFGFGAARAGAAFDFAFDDCDDCACRAAAAGFFDFGDDCDFGCCDDCARAGRDDFGFGGFFDFFDDCDCARAACAGFRGGFRFEDEGFGFGFGFFFDDBCDRCCGGGGFDFGGGGGGAFDCDRAGGFFGFGFGGGGFFFFDDCDRAGFFRAGARAARAAGAAFDDCDGDGAGCAACCDFGFFFDDFGGGGFFDFGGGGGAGAGAGGGGGGGFFFFDDGGGGGAAGCCCRGGAGAGGGGGGGGGFFFDDRDDDDRFGGGGGGGGGFFDDDCGAGCAACCRGAGGAGFDDFDCDDFGGRFFFG"
# baseline sequence
sequence = "AADDGRRAEREFDAADCFACAFDFGRCARGDFBCCBACGDFCGAFBEADDCDGGDDCBBAABRBRGGAGCDFCCCGFDGFACDBGCGAFFGFGDGFDDCGFBEBCEAAGDDARFCFBADAARDEFFRBRGFRGAECFBFCFBAADEAAECFGAARCCFGRFFACFAEBDFRAADEGFRCCBGCGAGFFDGAFRRFAGDDFCRFCCRGDREAAGDDFGCDDDRRCDGCAACFBCGARGCGDFGADARBCCFBFGFDGFAAEFFCCAGGDAAAEAAACAGEAADDBFRCFGFFDGCDCDAGGDBDFRGEEBGBCRARBGECDRCAAFRCAGERGCCFDBEGGDAABRCGADFFAARGAFGCRFAFDAAFABFAFFGRBARFDCRDDFRCFDDFARAFCRFRARFDDCBBCDREFBGBDAGFFCCDBFFGEDBFGADDGFDCGAGGAEGDFCAGDDAEGGGDFCRARCRFBEFEAGFBEAFGCAGDFGDFRED"

# Create the sequence
song = create_sequence(sequence, duration_ms=500)  # 500ms per note

# Save the song to a .wav file
song.export("nursery_rhyme_baseline_augmented.wav", format="wav")

# Play the .wav file using simpleaudio
wave_obj = sa.WaveObject.from_wave_file("nursery_rhyme_baseline_augmented.wav")
play_obj = wave_obj.play()
play_obj.wait_done()


SimpleaudioError: Error opening PCM device. -- CODE: -2 -- MSG: No such file or directory