In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [2]:
device = (
    'cuda' if torch.cuda.is_available()
    else 'cpu'
)

In [3]:
batch_size = 32
block_size = 64
max_iters = 500          
learning_rate = 3e-4
eval_iters = 50        
n_embd = 128             
n_head = 4
n_layer = 3
dropout = 0.2

print(device)

cpu


In [4]:
with open('wizard_of_oz.txt', 'r', encoding='utf-8') as f:
    text = f.read()

chars = sorted(set(text))
vocab_size = len(chars)
print(chars)

['\n', ' ', '!', '&', '(', ')', ',', '-', '.', '0', '1', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '—', '‘', '’', '“', '”', '\ufeff']


In [5]:
string_to_int = {ch:i for i, ch in enumerate(chars)}
int_to_string = {i:ch for i,ch in enumerate(chars)}
encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_string[i] for i in l])

data = torch.tensor(encode(text))

In [6]:
n = int(0.8*len(data))
train_data = data[:n]
val_data = data[n:]
print(f"Train: {len(train_data):,} tokens | Val: {len(val_data):,} tokens")

x_train = train_data[:block_size]
y_train = train_data[1:block_size+1]

Train: 166,239 tokens | Val: 41,560 tokens


In [7]:
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x.to(device), y.to(device) 
    
x, y = get_batch('train')
print('inputs:')
print(x)
print('targets:')
print(y)

inputs:
tensor([[46, 55, 58,  ..., 42, 45, 45],
        [47, 41, 62,  ..., 45, 59, 49],
        [48, 45,  1,  ...,  0, 59, 48],
        ...,
        [33, 55,  6,  ..., 42, 55, 52],
        [55, 61, 59,  ..., 60, 55, 55],
        [53,  1, 48,  ...,  1, 59, 43]])
targets:
tensor([[55, 58,  1,  ..., 45, 45, 59],
        [41, 62, 45,  ..., 59, 49, 58],
        [45,  1, 59,  ..., 59, 48, 45],
        ...,
        [55,  6,  1,  ..., 55, 52, 44],
        [61, 59, 52,  ..., 55, 55, 44],
        [ 1, 48, 41,  ..., 59, 43, 58]])


In [8]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [9]:
# n_embd = how many numbers represent one word
# n_head = number of attention heads - how many different perspectives does the model see

class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        """
        head_size calculates how many numbers each attention head considers - data is segmented for 
        different perspectives
    
        Multi head attention helps understand the meaning of words in context. This is done by creating 3
        words for each vector - key, query, value. Calculation: dot product of key and query. If word A's
        Query matches word B's Key, they get a high score. score is turned into a percentage (softmax). 
        Model then multiplies the percentage by the values.
    
        Normalise data after the attention comnputation (self.ln1)
    
        Feed forward processes the data gathered from the multi head attention phase using a two layer
        neural network. data is multiplied by a weight matrix + bias, then an activation function is 
        applied to introduce non-linearity (e.g. ReLU), then multiplied by a second weight matrix
    
        Normalise data after the feed forward computation (self.ln2)
        """
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):     
        """
        pre-norm transformer architecture (laptop limited to CPU computation)
        
        steps 1-3: multi head attention
        steps 4-6: feed forward
        """        
        # 1. apply normnalisation to input
        norm_x1 = self.ln1(x)        
        # 2. Get the attention patterns from the normalized data (important words have high score)
        attention_out = self.sa(norm_x1)        
        # 3. Add the result back to the original highway (Residual connection)
        x = x + attention_out     
        # 4. apply normalisation to feed-forward input
        norm_x2 = self.ln2(x)        
        # 5. Let the model 'think' about the context (feed forward)
        ffwd_out = self.ffwd(norm_x2)        
        # 6. Add that back to the highway
        x = x + ffwd_out
        return x

In [10]:
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        """
        self.net creates a container that runs layers in a fixed order

        nn.Linear(n_embd, 4*n_embd) projects the word vector (n_embd) into a larger space, 
        giving the model more 'room' to learn complex patterns

        ReLU turns negative values into 0, introducing non-linearity

        squash larger (4*n_embd) vector back into n_embd size, so it can be added back
        to the highway (original features)

        nn.Dropout causes some neurons (nodes) to be randomly ignored in training so the nn does
        not overfit
        """        
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd,n_embd),
            nn.Dropout(dropout),
        )        

    def forward(self, x):
        return self.net(x)

In [11]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        """
        self.heads intialises a pytorch list with random weights ready to be updated
        
        self.proj is not always required as head_size * num_heads == n_embd. This line is good
        practise if you want to change the parameters, and also introduces a bias term as a learnable
        parameter for the neural network
        
        self.dropout randomly sets 20% of nodes equal to zero        
        """        
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        loop through input data, getting an answer for each input, then concatenate all the 'answers' together

        20% of the answers are removed. If the nn is robust, the same outcome should show even with 20% missing
        """
        out = torch.cat([h(x) for h in self.heads], dim=-1) # (B, T, C) -> (B,T, [h1, h 
        out = self.dropout(self.proj(out))
        return out
    

In [12]:
class Head(nn.Module):
    """
    batch - number of samples being processed e.g. 32 sentences
    time-step - how many words in each sentence
    channel - how many features per word (384, compressed to 48 - head_size)
    """

    def __init__(self, head_size):
        """
        different weight matricies and intialised by key, query and value

        two seperate weight networks turn the input into a key and query. Key and query vectors are calculated
        by multiplying the token features by their respective weighting network. Key and query mutiplied together 
        (along with the value) to get a score
        
        nn.Linear(n_embd, head_size) compresses vectors so its smaller and easier to compute with. This is similar
        to feature extraction. Out of 384 features, pick 48 that are most influential.

        key represents vector notation of what the word represents

        query represents what the token is looking for, if key includes token about 'banana' query would be looking for 
        some sort of food word

        if key and query are similar their dot product will be higher. This gives the "attention score". 
        example: "the cat sat down" - the word 'the' is not relevant so has a low score (2%) whereas 'cat'
        has a high score (80%), so when we multiply by the input value tokens, we take 80% of the 'cat' token and
        2% of the 'the' token.    

        trill - traingle lower. prevents the model from looking into the future, attention scores only on previous words
        register_buffer means model weightings do not update automatically, enforicng model from not looking at next 
        words.

        dropout randomly deactivates 20% of neurons within a layer, preventing overfitting 

        
        """
        
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        input - (batch, time-step, channel)
        output -    (batch, time-step, head size)

        attenton scores are compued by doing dot product of relevant matricies, which
        is a common method to measure similarity

        matrix multiplication requires matricies to be the correct dimensionality, so 
        the transpose of matricies is used to turn (T * head_size) into (head_size * T)

        .shape[-1]**-0.5 scales the values so they are not too big/large by dividing by 
        the square root of the last dimension of the vector (head_size)

        fill 0 values from the matrix multiplication with negative infinity, which enforces
        probability of 0 when the softmax function is applied (no 'cheating' looking into the future)

        softmax dim=-1 ensures the probabilities within a sentence add to one

        dropout prevents overfitting

        output is a blend of the input vector multiplied by the attention (weights) placed on each token (word)


        """

        B,T,C = x.shape
        k = self.key(x) # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        v = self.value(x) # (B,T,hs)

        # compute attention scores - use transpose so the matrix dimensions allow multiplication
        

        weights = q @ k.transpose(-2,-1) * x.shape[-1]**-0.5 # (B,T,hs) @ (B,hs,T) -> (B,T,T)
        weights = weights.masked_fill(self.tril[:T,:T] == 0, float("-inf")) # (B,T,T)
        weights = F.softmax(weights, dim=-1) # (B,T,T)
        weights = self.dropout(weights)

        out = weights @ v

        return out        

In [13]:
class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        """
        map token IDs to vectors, getting a vector size n_emb for each token. This is how 
        each character is stored in the 'brain' of the nn e.g. 'z' = [0.3, 0.6, 4, 6] etc.
        
        positional embedding of tokens - where each token is in the sentence
        
        stack transformer blocks - "thinking" part that builds context
        
        normalise final hidden representations
        
        take previous calculations (context) and calculate logits for every 
        possible letter in vocab_size. Logits can be turned into probabilities with softmax later
        """
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)]) 
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)
        
        self.apply(self._init_weights) # random weights assigned initially

    def _init_weights(self, module):
        """
        isinstance - Checks if the current layer is a 'Linear' (thinking) layer

        layer weights are filled with small random numbers that are normally disctributed, mean=0, std=0.02

        set bias terms equal to zero, so you calculate weights only - estimating weights and bias at the same time
        would cause confusion

        isinstance - Checks if the current layer is an 'Embedding' (lookup table) layer.

        layer weights are filled with small random numbers that are normally disctributed, mean=0, std=0.02        

        """
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0, std=0.02)    

    def forward(self, index, targets=None):
        """
        B - how many sentences the function considers
        T - how many letters in each sentence
        C - how many possible letters exist (vocab size)

        .view takes values and inserts into a tensor
        """
        # logits = self.token_embedding_table(index) # token look-up table, vectors in table contain information about meaning of each token

        B,T = index.shape

        tok_emb = self.token_embedding_table(index)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C) vector with corresponding positioning
        x = tok_emb + pos_emb # (B,T,C), combine word meaning and word location
        x = self.blocks(x) # transformer blocks attention, word meaning understood in context
        x = self.ln_f(x) # normalise matricies
        logits = self.lm_head(x) # (B,T,vocab_size), logit predictions for each token
        

        # end of text edge case
        if targets is None:
            loss = None
        else:    
            B, T, C  = logits.shape
            logits = logits.view(B*T, C) # predict on character level, not sentence level
            targets = targets.view(B*T) # check predictions against true values
            loss = F.cross_entropy(logits, targets) # big prediction mistake = big loss

        return logits, loss

    def generate(self, index, max_new_tokens):
        """
        function writes new text one token at a time, based on what it has seen so far

        the model gives predictions for every token position, so has shape (B,T,C)
        
        after logits = logits[:,-1,:] has shape (B,C) - only last token prediction is taken for each batch

        dim=-1 applies softmax to last dimension of (B,C), so only predicts on C (probability of next token)

        we sample the next possible tokens to add randomness - choosing the highest probability every time creates loops
        
        """
        # index is the (B,T) array of indicies in the current context
        

        for _ in range(max_new_tokens):
            # use only the last 64 characters to predict the next token
            index_limiter = index[:, -block_size:]
            # get predictions
            logits, loss = self.forward(index_limiter)
            # focus on only the last time step - only want last token prediction
            logits = logits[:,-1,:]
            # apply softmax to get probability of predicted token            
            probs = F.softmax(logits, dim=-1)
            # sample from predicted token distribution
            index_next = torch.multinomial(probs, num_samples=1)
            # append sampled index to running sequence
            index = torch.cat((index, index_next), dim=1) # (B, T+1)

        return index
    

model = GPTLanguageModel(vocab_size)
m = model.to(device)
print(f"Model parameters: {sum(p.numel() for p in m.parameters()):,}")

Model parameters: 620,873


In [14]:
model.to(device)

# create optimiser
optimiser = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for itr in range(max_iters):
    if itr % eval_iters == 0:
        losses = estimate_loss()
        print(f"step: {itr}, train loss: {losses['train']:.3f}, val loss: {losses['val']:.3f}")

    # sample batch of data
    xb, yb = get_batch('train')
    

    # evaluate loss
    logits, loss = model.forward(xb, yb)
    optimiser.zero_grad(set_to_none=True)
    loss.backward()
    optimiser.step()
print(loss.item())

step: 0, train loss: 4.264, val loss: 4.263
step: 50, train loss: 2.765, val loss: 2.775
step: 100, train loss: 2.524, val loss: 2.521
step: 150, train loss: 2.422, val loss: 2.429
step: 200, train loss: 2.369, val loss: 2.379
step: 250, train loss: 2.324, val loss: 2.337
step: 300, train loss: 2.288, val loss: 2.304
step: 350, train loss: 2.257, val loss: 2.266
step: 400, train loss: 2.220, val loss: 2.232
step: 450, train loss: 2.175, val loss: 2.191
2.1514599323272705


In [16]:
prompt = "The Great Wizard said "

context = torch.tensor(encode(prompt), dtype=torch.long, device=device).unsqueeze(0)

# turn off dropout
model.eval()

generated_indices = m.generate(context, max_new_tokens=50)

output_text = decode(generated_indices[0].tolist())

print("=" * 30)
print(output_text)
print("=" * 30)

The Great Wizard said thatesad to O) ter odr
da throsand codnd  aie.
Tig
