In [24]:
import torch
import torch.nn as nn
from torch.nn import functional as F
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 2 important hyperparameters important for training

block_size = 32
batch_size = 256

max_iters = 2000
learning_rate = 0.00009 # alpha
eval_iters = 100
dropout = 0.2 # helps the model learn better in case if there is any noise
n_embd = 384 # size of the embedding vector
n_head = 6 # number of heads we are running
n_layers = 4 # number of encoder-decoder blocks

In [4]:
with open('sorcerers_stone.txt', 'r', encoding='utf-8') as f:
    text = f.read()
    chars = sorted(list(set(text)))
vocab_size = len(chars)

In [5]:
# encoding corresponds to converting the character to an integer

string_to_int = {ch:i for i,ch in enumerate(chars)}
int_to_string = {i:ch for i,ch in enumerate(chars)}

encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_string[i] for i in l])

In [6]:
data = torch.tensor(encode(text), dtype=torch.long)

In [15]:
n = int(0.8*len(text))
train_data = data[:n]
val_data = data[n:]

def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix]) #input batches from block
    y = torch.stack([data[i+1:i+block_size+1] for i in ix]) #target batches from block
    x,y = x.to(device), y.to(device)
    return x,y

creating input-output pairs for a sequence prediction task by iterating over the training data

In [16]:
x = train_data[:block_size]
y = train_data[1:block_size+1]

for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print('when input is', context, 'target is ', target)

when input is tensor([32]) target is  tensor(52)
when input is tensor([32, 52]) target is  tensor(69)
when input is tensor([32, 52, 69]) target is  tensor(69)
when input is tensor([32, 52, 69, 69]) target is  tensor(76)
when input is tensor([32, 52, 69, 69, 76]) target is  tensor(2)
when input is tensor([32, 52, 69, 69, 76,  2]) target is  tensor(40)
when input is tensor([32, 52, 69, 69, 76,  2, 40]) target is  tensor(66)
when input is tensor([32, 52, 69, 69, 76,  2, 40, 66]) target is  tensor(71)
when input is tensor([32, 52, 69, 69, 76,  2, 40, 66, 71]) target is  tensor(71)
when input is tensor([32, 52, 69, 69, 76,  2, 40, 66, 71, 71]) target is  tensor(56)
when input is tensor([32, 52, 69, 69, 76,  2, 40, 66, 71, 71, 56]) target is  tensor(69)
when input is tensor([32, 52, 69, 69, 76,  2, 40, 66, 71, 71, 56, 69]) target is  tensor(2)
when input is tensor([32, 52, 69, 69, 76,  2, 40, 66, 71, 71, 56, 69,  2]) target is  tensor(52)
when input is tensor([32, 52, 69, 69, 76,  2, 40, 66,

In [17]:
# makes sure that pytorch doesn't make use of gradients at all
# reduces computation and memory usage as it is not required for computing losses
@torch.no_grad()

def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X,Y = get_batch(split)
            logits, loss = model(X,Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [26]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        # each linear layer takes an input tensor of size n_embd and projects it to lower-dimensional space of size head_size
        # key tensor is used to compute attention scores, indicating the relevance of each token in the input sequence
        # the query tensor represents the token for which attention scores are computed
        # the value tensor contains the actual information associated with each token
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        # initializing the lower triangular matrix, which is used to mask attention scores during self-attention to prevent tokens from attending to future tokens in the input sequence
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        # applying linear transformation on the input x
        k = self.key(x) # (B,T,hs)
        # same linear transformation but a different learnable transformation
        q = self.query(x) # (B,T,hs)
        # computing the attention scores/weights ("affinities")
        # scaling with 1/sqrt(length of a row in the keys or queries matrix)
        # transposing does the flipping of the second last dimension(-2) with the last dimension(-1)
        # scaling is performed to normalize high dimension of the dot product between key and query
        # this scaling ensures that no single head becomes dominant
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        # applying masking - no look ahead
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        # matrix multiplication of the softmax with the value
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        # output is a blend of input vector values and attention placed on each token
        return out

# [1, 0, 0] - here 0 is replaced with -inf as specified above in masking
# [1, 0.6, 0]
# [1, 0.6, 0.4]
class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """
    # multiple heads of attention in parallel
    # takes an input tensor and applies multiple attention heads in parallel, combines the outputs of these heads
    # and then projects the concatenated output tensor back to the original embedding size
    def __init__(self, num_heads, head_size):
        super().__init__()
        # a bunch of heads(num_heads=4) in parallel
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        # linear projection layer used to combine the outputs of all attention heads into a single tensor
        # projects the concatenated multi-head output tensor into the original embedding size
        # the purpose of linear projection is to allow the model to learn a complex combination of the information from all the heads and project it back to the original embedding size
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        # self.proj = nn.Linear()
        # used for regularization - dropping out 20% of the neurons to avoid overfitting
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # for each attention head h, input tensor is passed through the attention head, resulting in a set of output tensors
        # these output tensors are concatenated along the last dimension to create a single tensor with shape (B,T,F) where F is the combined size of all attention heads(num_heads*head_size)
        # concatenating each head together, along the last dimension
        out = torch.cat([h(x) for h in self.heads], dim=-1) # (B, T, F) -> (B, T, [h1, h1, h1, h1, h2, h2, h2, h2, h3, h3, h3, h3])
        # the concatenated output tensor is then passed through dropout for regularization, and then through the linear projection layer to reduce the size of the tensor back to the original embedding size(n_embd)
        out = self.dropout(self.proj(out))
        return out


class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            # ReLU(x) = max(x,0)
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            # dropout makes certain percentage of the neurons to dropout and become 0 to prevent overfitting
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

# transformer block
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        # sa stands for self-attention
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        # 2 layer normalizations are applied
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        # post norm is applied here as that showed better convergence
        # refer to architecture to know more about pre and post normalization which specifies
        # multihead attention -> norm -> feed forward -> norm  --> next block
        y = self.sa(x)
        x = self.ln1(x + y)
        y = self.ffwd(x)
        x = self.ln2(x + y)
        return x

# inherited from nn.Module class which is a base class for neural networks
class GPTLanguageModel(nn.Module):

    # this __init__ method initializes an instance of the GPTLanguageModel class
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd) # learnable parameter
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        # how many decoder blocks we have running sequentially(4)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layers)])
        # adding at the end of the network and it is useful for the model to converge better
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)


        self.apply(self._init_weights)

    # helps training converge better
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            # initializing the weights of the linear layer from normal distribution
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, index, targets=None):
        B, T = index.shape
        # idx and targets are both (B,T) tensor of integers
        # token embedding is similar to a dictionary - capturing meaning and relationships to other words
        tok_emb = self.token_embedding_table(index) # (B,T,C)
        # position embedding is similar to a map as to where the words have to be placed
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        # pos_emb = self.position_embedding_table(torch.arange(T))
        # both dictionary and map work together to understand language fully
        x = tok_emb + pos_emb # (B,T,C)
        # feeding into the model/network
        # the combined embeddings is passed through the transformer blocks
        x = self.blocks(x) # (B,T,C)
        # the output tensor from transformer blocks is passed through a layer normalization
        # final layer normalization
        x = self.ln_f(x) # (B,T,C)
        # Linear transformation as defined above in __init__
        logits = self.lm_head(x) # (B,T,vocab_size)

        # loss calculation
        if targets is None:
            loss = None
        else:
            # reshaping the logits
            # B - batch; T - time(sequence of integers / sequence length); C - channels/vocab_size
            B, T, C = logits.shape
            # view is analogous to reshape in numpy
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            # reshaping the logits and targets for them to fit into the cross_entropy
            # https://pytorch.org/docs/stable/generated/torch.nn.functional.cross_entropy.html
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    # index represents the initial context for the text generation
    # max_new_tokens specifies the max no.of tokens to generate beyond the initial context
    def generate(self, index, max_new_tokens):
        # index is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            index_cond = index[:, -block_size:]
            # get the predictions
            # calls forward to obtain the logits and loss
            # it returns the logits which are the raw predictions of the model before applying the softmax
            # and logits refer to that raw and unnormalized predictions of the model
            logits, loss = self.forward(index_cond)
            # focus only on the last time step
            # extracting the logits
            # extract only the logits corresponding to the last time step of each sequence batch - 2nd index/argument
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            # -1 as we are focusing on the last dimension
            # applying softmax activation on the logits tensor containing B and C alone
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            index_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            index = torch.cat((index, index_next), dim=1) # (B, T+1)
        return index

model = GPTLanguageModel(vocab_size)
# print('loading model parameters...')
# with open('model-01.pkl', 'rb') as f:
#     model = pickle.load(f)
# print('loaded successfully!')
m = model.to(device)
# context = torch.zeros((1,1), dtype=torch.long)
# generated_chars = decode(model.generate(context, max_new_tokens=500)[0].tolist())
# print(generated_chars)
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

7.167055 M parameters


In [27]:
# creating a pytorch optimizer
# defining an optimizer AdamW
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    if (iter % eval_iters == 0):
        losses = estimate_loss()
        print(f"step {iter} - training loss: {losses['train']:.4f} validation loss: {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model.forward(xb, yb) # forward pass
    # by default, pytorch accumulates the gradient by adding them
    # by putting the zero_grad, we make sure that they do not add over time
    # the previous gradients do not affect the current ones as previous gradients are from prev data
    optimizer.zero_grad(set_to_none=True)
    loss.backward() # backward pass
    optimizer.step() # letting the gradient descent work

print()
print(loss)
print(loss.item())

step 0 - training loss: 4.4546 validation loss: 4.4554
step 100 - training loss: 2.5112 validation loss: 2.5020
step 200 - training loss: 2.2698 validation loss: 2.2579
step 300 - training loss: 2.0855 validation loss: 2.0795
step 400 - training loss: 1.9681 validation loss: 1.9649
step 500 - training loss: 1.8773 validation loss: 1.8826
step 600 - training loss: 1.8116 validation loss: 1.8190
step 700 - training loss: 1.7582 validation loss: 1.7714
step 800 - training loss: 1.7203 validation loss: 1.7315
step 900 - training loss: 1.6837 validation loss: 1.7027
step 1000 - training loss: 1.6506 validation loss: 1.6753
step 1100 - training loss: 1.6157 validation loss: 1.6474
step 1200 - training loss: 1.5966 validation loss: 1.6280
step 1300 - training loss: 1.5720 validation loss: 1.6113
step 1400 - training loss: 1.5482 validation loss: 1.5973
step 1500 - training loss: 1.5368 validation loss: 1.5783
step 1600 - training loss: 1.5173 validation loss: 1.5717
step 1700 - training loss:

In [28]:
cxt = encode("""Perhaps it had something to do with living in a dark cupboard, but Harry
had always been small and skinny for his age.""")
context = torch.LongTensor([cxt]).to(device)
generated_chars = decode(m.generate(context, max_new_tokens=500)[0].tolist())
print(generated_chars)

Perhaps it had something to do with living in a dark cupboard, but Harry
had always been small and skinny for his age. And thesday all conce keep yeh, leare that was so scat Goon featory, findor he heads. The read, and the probes
fives, thought, my ning culd like behind all.

"It's -- yee hung to be looked out toward, I maging
Toon, ockat the saw stuffully or tells, Did been hut a
gorval down. Petwing get rest them," he said, whighten, shuffly.


"7usturieh, ro siny to mesde his lackful stud
the withowy he loncevelly and hit wood ask at did this."

Great stimps thard a fire, then were mornited see
didn't wellea
