# Character-Level GPT from Scratch
Implementation of a small GPT-like language model in PyTorch, trained on Hemingway text.

In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# Hyperparameters
batch_size = 64 
block_size = 128 
max_iters = 5000 

eval_interval = 500
learning_rate = 3e-4

device = 'cuda' if torch.cuda.is_available() else 'cpu'

eval_iters = 200
d_model = 384
n_head = 6 # Number of attention heads
n_layer = 6 # Number of transformer layers
dropout = 0.2
# ------------

torch.manual_seed(1337)

<torch._C.Generator at 0x7b82e04eb7d0>

In [2]:
!gdown 'https://drive.google.com/uc?export=download&id=1RlmRmXiWVKpZq98ftdtOIdM2lsA1uw3j'

Downloading...
From: https://drive.google.com/uc?export=download&id=1RlmRmXiWVKpZq98ftdtOIdM2lsA1uw3j
To: /content/hemingway.txt
  0% 0.00/133k [00:00<?, ?B/s]100% 133k/133k [00:00<00:00, 118MB/s]


In [3]:
!ls

hemingway.txt  sample_data


In [4]:
with open('hemingway.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# Here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
# Create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # Encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # Decoder: take a list of integers, output a string

# Train and Test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # First 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

In [5]:
# Data loading
def get_batch(split):
    # Generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    # Randomly select batch_size rows from data's row indices
    ix = torch.randint(0, len(data) - block_size, (batch_size,))
    # Select batch_size chuncks of text each of size block_size; stack them
    xb = torch.stack([data[i : i+block_size] for i in ix])

    yb = torch.stack([data[i+1 : i+block_size+1] for i in ix])
    xb, yb = xb.to(device), yb.to(device)

    return xb, yb

In [6]:
@torch.no_grad()
def estimate_loss(model):
    out = {}

    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            # Get a batch of data
            xb, yb = get_batch(split)
            # Get the mean and loss
            logits, loss = model(xb, yb)
            # Get the loss for this batch
            losses[k] = loss.item()

        out[split] = losses.mean().item()

    return out

In [7]:
class Head(nn.Module):
    """
    Single head of masked self-attention (decoder-only setup).
    """

    def __init__(self, d_head):
        super().__init__()
        self.d_head = d_head
        # Map each key, query, or value in to a d_head dimensional model.
        # Each should be matrices from d_model to d_head
        self.W_K = nn.Linear(d_model, d_head, bias=False)
        self.W_Q = nn.Linear(d_model, d_head, bias=False)
        self.W_V = nn.Linear(d_model, d_head, bias=False)
        self.d_head = d_head
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,d = x.shape
        # Get the key and query representations from the embedding x
        k = self.W_K(x)

        q = self.W_Q(x)

        v = self.W_V(x)


        scores = q @ k.transpose(1, 2) / (self.d_head ** 0.5)

        # Apply a mask to scores, making all scores above the diagonal -inf
        scores = scores.masked_fill(self.tril[:T, :T] == 0, float('-inf'))

        # Apply softmax to the final dimension of scores
        a =  F.softmax(scores, dim=-1)

        # Apply dropout
        a = self.dropout(a)
        # Perform the weighted aggregation of the values
        out = a @ v
        # For each token, return the weighted sum of the values
        return out

class MultiHeadAttention(nn.Module):

    def __init__(self, num_heads, d_head):
        super().__init__()
        self.heads = nn.ModuleList([Head(d_head) for _ in range(num_heads)])
        # This is to project back to the dimension of d_model. In this case, it is just a learned linear map
        self.W_O = nn.Linear(num_heads * d_head, num_heads * d_head)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Concatenate the different representations per head along the last dimension
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        # Project the concatenation and apply dropout; this is the W_O in "Attention is all you need"
        out = self.W_O(out)
        out = self.dropout(out)
        return out


In [8]:
class FeedFoward(nn.Module):
    """
    A simple linear layer followed by a non-linearity; this is applied at the token level
    """

    def __init__(self, d_model):
        super().__init__()
        d_ff = 4 * d_model
        
        self.ff = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.ReLU(),
            nn.Linear(4 * d_model, d_model),
            nn.Dropout(dropout)
        )
    def forward(self, x):
        return self.ff(x)

In [9]:
class DecoderBlock(nn.Module):
    """
    Transformer decoder block: communication followed by computation
    These are stacked on top of each other one after another
    """

    def __init__(self, d_model, n_head):
        super().__init__()
        # Each head gets a smaller dimensional representation of the data
        # Assume each head gets a representation of dimension d_head and d_model is divisible by n_head
        d_head = d_model // n_head
        self.sa = MultiHeadAttention(n_head, d_head)
        self.ff = FeedFoward(d_model)
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)

    def forward(self, x):
        """
        Transformer decoder block: self-attention + feedforward with residuals and layer norm
        """

        x = x + self.sa(self.ln1(x))
        x = x + self.ff(self.ln2(x))
        return x


In [10]:
class GPT(nn.Module):
    def __init__(self):
        super().__init__()

        self.token_embedding_table = nn.Embedding(vocab_size, d_model)
        # Position embeddings are from block_size (T) to d_model
        self.position_embedding_table = nn.Embedding(block_size, d_model)

        self.blocks = nn.Sequential(*[DecoderBlock(d_model, n_head) for _ in range(n_layer)])
         # Final layer norm
        self.ln = nn.LayerNorm(d_model)
        self.ff = nn.Linear(d_model, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device))

        x = tok_emb + pos_emb


        x = self.blocks(x)

        # Apply layer norm
        x = self.ln(x)

        # Apply the final linear map, to get to dimension vocab_size
        logits = self.ff(x)

        if targets is None:
            loss = None
        else:
            B, T, V = logits.shape
            logits = logits.view(B*T, V)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):

        self.eval()
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]

            # Get the predictions
            logits, loss = self(idx_cond)

            # Focus only on the last time step, get the logits
            logits = logits[:, -1, :]

            # Apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1)

            # Sample from the distribution proporttional to probs
            idx_next = torch.multinomial(probs, num_samples=1)

            # Append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1)
        self.train()
        return idx




In [11]:
class EarlyStopping:
    def __init__(self, tolerance=5, min_delta=0):

        self.tolerance = tolerance
        self.min_delta = min_delta
        self.counter = 0
        self.early_stop = False

    def __call__(self, train_loss, validation_loss):
        if (validation_loss - train_loss) / train_loss > self.min_delta:
            self.counter += 1
            if self.counter >= self.tolerance:
                self.early_stop = True

In [17]:
model = GPT().to(device)
# Print the number of parameters in the model
print(f"Number of parameters: {sum(p.numel() for p in model.parameters())}")

# Create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)
early_stopping = EarlyStopping(tolerance=5, min_delta=0)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        if iter:
          scheduler.step()
        losses = estimate_loss(model)
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
        early_stopping(losses['train'], losses['val'])
        if early_stopping.early_stop:
          print("We stop at epoch {}".format(iter))
          break


    # Sample a batch of data
    xb, yb = get_batch('train')

    # Evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()


Number of parameters: 10737470
step 0: train loss 4.2316, val loss 4.2343
step 500: train loss 1.4925, val loss 1.5491
step 1000: train loss 1.1821, val loss 1.3861
step 1500: train loss 1.0072, val loss 1.3924
step 2000: train loss 0.8604, val loss 1.4523
We stop at epoch 2000


In [18]:
# This is technically doing generations in batches, but here we have a batch size of 1 and 1 element to start in the batch
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(model.generate(context, max_new_tokens=100)[0].tolist()))
open('fake_hemingway.txt', 'w').write(decode(model.generate(context, max_new_tokens=100)[0].tolist()))



On the sea and picked a hurricane with his left hand had all to worked the dark and take the tuna o


101

In [19]:
torch.save(model.state_dict(), 'gpt.pt')

In [20]:
!ls

fake_hemingway.txt  gpt.pt  hemingway.txt  sample_data


In [21]:
from google.colab import files

# Download each file
files.download('fake_hemingway.txt')
files.download('gpt.pt')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>