In [4]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000 # how many iterations to train for
eval_interval = 500 # how often to evaluate the loss
learning_rate = 3e-4 # learning rate
n_head = 6 # number of heads in the multi-head attention
n_layer = 6 # number of layers in the transformer
dropout = 0.2 # dropout rate
eval_iters = 200 # how many eval iterations to run
num_embd = 384 # embedding dimension

# device configuration
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU device: {torch.cuda.get_device_name()}")
    print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
# ------------

torch.manual_seed(1337) # set the random seed to 1337 so the numbers generaterd are the same every time

!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt # download the dataset
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()


# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars) 

# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad() # this means that the gradient is not computed for this function
def estimate_loss():
    out = {} # dictionary to store the losses for train and val
    model.eval()
    for split in ['train', 'val']: # loop through train and val
        losses = torch.zeros(eval_iters) # create a tensor to store the losses
        for k in range(eval_iters): # loop through the eval iterations
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out


class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)]) # list of heads
        self.proj = nn.Linear(num_embd, num_embd) # linear layer to project the output of the heads to the embedding dimension
        self.dropout = nn.Dropout(dropout) # dropout layer to prevent overfitting
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedForward(nn.Module): # feed forward network
    def __init__(self, num_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(num_embd,4*num_embd),
            nn.ReLU(),
            nn.Linear(4*num_embd, num_embd),
            nn.Dropout(dropout)
        )
    def forward(self, x):
        return self.net(x)


class Block(nn.Module): # transformer block
    def __init__(self, num_embd, n_head):
        super().__init__()
        head_size = num_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(num_embd)        
        self.ln1 = nn.LayerNorm(num_embd)
        self.ln2 = nn.LayerNorm(num_embd)
    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x



# super simple bigram model (only looks at the last token to predict the next token)
# Potential upgrade could be taking subword tokens instead of letters
class BigramLanguageModel(nn.Module):

    def __init__(self, vocab_size):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, num_embd)
        self.position_embedding_table = nn.Embedding(block_size, num_embd)
        self.blocks = nn.Sequential(*[Block(num_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(num_embd)
        self.lm_head = nn.Linear(num_embd, vocab_size)
        
    def forward(self, idx, targets=None):
        B,T = idx.shape
        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x) # (B,T,vocab_size)

        # if targets is None, we are in the training phase, so we return the logits and the loss
        if targets is None:
            loss = None
        else:
            # in the evaluation phase, we want to get the loss
            B, T, C = logits.shape
            logits = logits.view(B*T, C) # (B*T, C)
            targets = targets.view(B*T) # (B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # get the predictions
            idx_cond = idx[:, -block_size:]
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

class Head(nn.Module): # head of the multi-head attention
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(num_embd, head_size, bias=False) # linear layer to project the input to the key
        self.query = nn.Linear(num_embd, head_size, bias=False) # linear layer to project the input to the query
        self.value = nn.Linear(num_embd, head_size, bias=False) # linear layer to project the input to the value

        # We utilize the triangular matrix to mask the future tokens, if this was another application, we would not need this as the model should be able to see the future tokens     
        self.dropout = nn.Dropout(dropout)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))   

    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)
        wei = q @ k.transpose(-2,-1) * C**-0.5 # We multiply the query and key to get the attention weights, and we divide by the square root of the head size to prevent the attention weights from becoming too large
        wei = wei.masked_fill(self.tril[:T,:T] == 0, float('-inf')) # We mask the future tokens
        wei = F.softmax(wei, dim=-1) # We apply the softmax function to the attention weights
        wei = self.dropout(wei) # We apply the dropout to the attention weights
        out = wei @ v
        return out

model = BigramLanguageModel(vocab_size) # create the model
m = model.to(device) # move the model to the device (Graphics Card)

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.9)

print("Starting training...")
print(f"Using device: {device}")

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
    scheduler.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated_text = decode(m.generate(context, max_new_tokens=10000)[0].tolist())

# Save the generated text to a file
with open('output2.txt', 'w', encoding='utf-8') as f:
    f.write(generated_text)

print(f"Generated {len(generated_text)} characters and saved to output.txt")

CUDA available: True
GPU device: Tesla P100-PCIE-16GB
GPU memory: 15.9 GB
--2025-08-03 17:00:41--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.3’


2025-08-03 17:00:41 (31.6 MB/s) - ‘input.txt.3’ saved [1115394/1115394]

Starting training...
Using device: cuda
step 0: train loss 4.2849, val loss 4.2823
step 500: train loss 2.0119, val loss 2.0984
step 1000: train loss 1.5956, val loss 1.7743
step 1500: train loss 1.4398, val loss 1.6369
step 2000: train loss 1.3404, val loss 1.5664
step 2500: train loss 1.2798, val loss 1.5329
step 3000: train loss 1.2252, val loss 1.5036
step 3500: train loss 1.1840, val loss 1.4878