# Self attention embedding table

Adding route to custom libraries

In [1]:
import sys
import os


dirname = os.path.abspath(os.path.join(os.getcwd(), "..", "..", "scripts/lib"))
sys.path.append(dirname)

## Importing libraries

In [2]:
import torch
import torch.nn as nn
from torch.nn import functional as F

from utils.compile import compileFolder
from utils.tokenizer import CharTokenizer, END_CHAR
from utils.datasets import TextChunksDataset, split_dataset, get_batch, estimate_loss

In [3]:
# This module helps to quickly save the weights and load them
from transformers import Module

## Setting Hyperparameters

In [4]:
# The max block size (also known as max context) [in tokens]
block_size = 16

# How much does the test/validation set represent of the total data
test_train_split_ratio = 0.1

# Number of embedding
n_embd = 16

# Device (gpu or cpu)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Setting up the data and other

In [5]:
# Importing the data
raw_data = compileFolder('tate')

# Creating the tokenizer
tokenizer = CharTokenizer(raw_data)

# Tokenizing and creating the dataset object
data = TextChunksDataset(raw_data, block_size, tokenizer)

In [6]:
train_data, test_data = split_dataset(data, 0.1)

## Implementation of the self attention block

We take almost the same structure as the base-embedding structure

> Note: starting from now, we're going to use `cuda` when available

In [7]:
class BigramLanguageModel(Module):
    def __init__(self, vocab_size: int, n_embd, context_size=None):
        """
        If vocab_size is a Dataset with context_length, then no need to specify context_size
        """
        super().__init__()
        if context_size==None:
            if type(vocab_size)==TextChunksDataset:
                context_size = vocab_size.context_length
            else:
                raise Exception("You need to specify the context length")
        self.block_size = context_size
        if type(vocab_size)==TextChunksDataset:
            vocab_size=len(vocab_size.tokenizer)
        elif type(vocab_size)==CharTokenizer:
            vocab_size=len(vocab_size)
        # each token has a probability distribution of appearing depending on the last token
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(self.block_size, n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_embd = self.token_embedding_table(idx) # (B,T,C)
        pos_embd = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,C)
        x = tok_embd + pos_embd # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss=None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, idx, max_new_tokens: int):
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(idx[:,-self.block_size:])
            # focus only on the last time step
            logits = logits[:,-1,:]
            # apply softmax to get the probabilities
            probs = F.softmax(logits, dim=1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled text to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx
        

In [8]:
torch.manual_seed(89)
m = BigramLanguageModel(train_data, n_embd)
m.to(device)
xb, yb = train_data[:10]
out = m(xb, yb)
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


"y=Ts-R8/TugilI'8eL:hRvODUniEJXM!T*iFt dr03?cDh'r-?oJX?DieJ7"2MX0cM!f3TbFyZX?-zNWn p%O'qKXa3yZ!B'9!


The result is random characters

## Training the model

In [9]:
# create a PyTorch optimizer
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)

In [10]:
batch_size = 32
num_epochs = 100

# verbose
show_loss_each_epoch = 500

def train(optimizer, num_epochs=num_epochs, loss_verbose_interval=show_loss_each_epoch):
    for steps in range(num_epochs):

        # sample a batch of data
        xb, yb = get_batch(train_data, batch_size)

        # evaluate the loss
        logits, loss = m(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        if (steps+1)%loss_verbose_interval==0:
            losses = estimate_loss(m, train_data, test_data, batch_size=batch_size)
            print(f"step {steps+1}: train loss {losses ['train']:.4f}, val loss {losses ['val']:.4f}")
    print('done!')

In [11]:
train(optimizer, 1500)

step 500: train loss 2.8155, val loss 2.8754
step 1000: train loss 2.6120, val loss 2.6564
step 1500: train loss 2.5659, val loss 2.6120
done!


In [12]:
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


Dinestowese s tpe tothin igrst yo t jeine
AUp, meat



s. n aacy th
FAye thande.ov? t, tonce i k oke


Still the same result as the base model


## The mathematical trick to self attention

In [13]:
# Consider the following toy example

torch.manual_seed(198)
B, T, C = 4, 8, 2 # batch, time, channels
x = torch.randn(B, T, C)
x.shape

torch.Size([4, 8, 2])

In [14]:
# We want x[b, t] = the mean of x[b, i] with i<=t
xbow = torch.zeros((B, T, C))
for b in range(B):
    for t in range(T):
        xprev = x[b,:t+1] # (t,C)
        xbow[b,t] = torch.mean(xprev, 0)
xbow[0]

tensor([[ 0.3597,  0.1501],
        [ 0.3383,  0.7864],
        [ 0.3464,  0.5391],
        [ 0.0438,  0.4631],
        [ 0.0989,  0.1779],
        [ 0.2658,  0.2987],
        [-0.0105,  0.2283],
        [ 0.0798,  0.2066]])

In [15]:
# Second version (using Softmax)
tril = torch.tril(torch.ones(T, T))
wei = torch.zeros((T, T))
wei = wei.masked_fill(tril == 0, float('-inf'))
wei = F.softmax(wei, dim=-1)
wei

tensor([[1.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.5000, 0.5000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.3333, 0.3333, 0.3333, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.2500, 0.2500, 0.2500, 0.2500, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.0000, 0.0000, 0.0000],
        [0.1667, 0.1667, 0.1667, 0.1667, 0.1667, 0.1667, 0.0000, 0.0000],
        [0.1429, 0.1429, 0.1429, 0.1429, 0.1429, 0.1429, 0.1429, 0.0000],
        [0.1250, 0.1250, 0.1250, 0.1250, 0.1250, 0.1250, 0.1250, 0.1250]])

In [16]:
xbow2 = wei @ x
xbow2[0]

tensor([[ 0.3597,  0.1501],
        [ 0.3383,  0.7864],
        [ 0.3464,  0.5391],
        [ 0.0438,  0.4631],
        [ 0.0989,  0.1779],
        [ 0.2658,  0.2987],
        [-0.0105,  0.2283],
        [ 0.0798,  0.2066]])

In [17]:
# We get the same tensor
torch.allclose(xbow, xbow2)

True

It is the same result

### Implementing an attention head
Now we're going to have a third version: the attention head.

We introduce first this new parameter: the head size

In [18]:
# Head size
head_size = 16

In [19]:
# Third version
key = nn.Linear(C, head_size, bias=False)
query = nn.Linear(C, head_size, bias=False)


In [20]:
k = key(x) # (B, T, head_size)
q = query(x) # (B, T, head_size)
wei = q @ k.transpose(-2, -1) # (B, T, head_size) @ (B, head_size, T) --> (B, T, T)

tril = torch.tril(torch.ones(T, T))
wei = wei.masked_fill(tril == 0, float('-inf'))
wei = F.softmax(wei, dim=-1)

In [21]:
wei[0]

tensor([[1.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.4099, 0.5901, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.3780, 0.2279, 0.3942, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.1302, 0.5305, 0.1159, 0.2234, 0.0000, 0.0000, 0.0000, 0.0000],
        [0.2031, 0.0725, 0.2210, 0.0464, 0.4569, 0.0000, 0.0000, 0.0000],
        [0.1195, 0.0402, 0.1308, 0.3458, 0.3329, 0.0308, 0.0000, 0.0000],
        [0.0555, 0.5743, 0.0458, 0.0614, 0.0072, 0.2289, 0.0269, 0.0000],
        [0.1148, 0.0417, 0.1248, 0.1055, 0.2782, 0.0637, 0.1468, 0.1245]],
       grad_fn=<SelectBackward0>)

We can see that now the weights are not uniform in a row, rather have different values.

So we've implemented the keys, the queries, now we'll implement the values

In [22]:
value = nn.Linear(C, head_size, bias=False) # Same linear structure as the key and query linear models
v = value(x)
out = wei @ v

out.shape

torch.Size([4, 8, 16])

In [23]:
out[0,0] # The value of the first token for the first example

tensor([ 0.2252,  0.0816, -0.0861, -0.0980, -0.1490, -0.0824, -0.2695,  0.0722,
         0.1332, -0.1667, -0.0388, -0.2477, -0.0050,  0.1838, -0.0065,  0.1206],
       grad_fn=<SelectBackward0>)

For variance stability purposes, `wei` tensor needs to be divided by $\sqrt{\text{head\_size}}$

In [24]:
k = key(x) # (B, T, head_size)
q = query(x) # (B, T, head_size)
wei = q @ k.transpose(-2, -1) * (head_size**-0.5) # (B, T, head_size) @ (B, head_size, T) --> (B, T, T)

tril = torch.tril(torch.ones(T, T))
wei = wei.masked_fill(tril == 0, float('-inf'))
wei = F.softmax(wei, dim=-1)

v = value(x)
out = wei @ v

Let's create a `Head` class that implements a single head.

In [25]:
class Head(nn.Module):
    """One head of self attention"""

    def __init__(self, n_embd: int, head_size: int, block_size: int) -> None:
        super().__init__()
        self.head_size = head_size
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)   # (B, T, C)
        q = self.query(x) # (B, T, C)
        # Compute attention score ('affinities')
        wei = q @ k.transpose(-2, -1) * (self.head_size**-0.5) # (B, T, head_size) @ (B, head_size, T) --> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1)
        # Perform the weighted aggregation of the values
        v = self.value(x) # (B, T, C)
        out = wei @ v     # (B, T, T) @ (B, T, C) --> (B, T, C)
        return out        # (B, T, C)

Now let's redefine `BigramLanguageModel`

In [26]:
class BigramLanguageModel(Module):
    def __init__(self, vocab_size: int, n_embd, context_size=None, head_size=16):
        """
        If vocab_size is a Dataset with context_length, then no need to specify context_size
        """
        super().__init__()
        if context_size==None:
            if type(vocab_size)==TextChunksDataset:
                context_size = vocab_size.context_length
            else:
                raise Exception("You need to specify the context length")
        self.block_size = context_size
        if type(vocab_size)==TextChunksDataset:
            vocab_size=len(vocab_size.tokenizer)
        elif type(vocab_size)==CharTokenizer:
            vocab_size=len(vocab_size)
        # each token has a probability distribution of appearing depending on the last token
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(self.block_size, n_embd)
        self.sa_head = Head(n_embd, n_embd, vocab_size)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_embd = self.token_embedding_table(idx) # (B,T,C)
        pos_embd = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,C)
        x = tok_embd + pos_embd # (B,T,C)
        x = self.sa_head(x)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss=None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, idx, max_new_tokens: int):
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(idx[:,-self.block_size:])
            # focus only on the last time step
            logits = logits[:,-1,:]
            # apply softmax to get the probabilities
            probs = F.softmax(logits, dim=1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled text to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx
        

In [27]:

m = BigramLanguageModel(train_data, n_embd)
m.to(device)
xb, yb = train_data[:10]
out = m(xb, yb)
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


:TgNh$mITng6P:6r*lv TLFVJNvb
PCe>UhPe:1B:vB$'S'toLhTF =X0JV.>6->IIEClIyEuK0IVz:hE*!hTmRlh1Dg=XVEG
Y


### The training

In [28]:
# create a PyTorch optimizer
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-4)

In [29]:
train(optimizer, 500)

step 500: train loss 3.7974, val loss 3.8229
done!


In [30]:
train(optimizer, 400)

done!


In [31]:
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


n.w et.$HL5,wnU
 c zle yiw atwe8i yyuneuvv%rsouixursn iet   o et 
i
pJon
 ioebv7a tma Dna pttcetdQdf


Not yet very good!

### Let's dive into multiple head attention

Basically, we will have multiple heads that we're going to concatenate into one output

In [32]:
class MultiHeadAttention (nn.Module):
    """multiple heads of self-attention in parallel """
    def __init__(self, num_heads: int, n_embd: int, head_size: int, block_size: int):
        super().__init__()
        self.heads = nn.ModuleList([Head(n_embd, head_size, block_size) for _ in range(num_heads)])

    def forward(self, x):
        return torch.cat([h(x) for h in self.heads], dim=-1)

Let's redefine our `BigramLanguageModel` class

In [33]:
class BigramLanguageModel(Module):
    def __init__(self, vocab_size: int, n_embd, context_size=None, head_size=16, num_heads=4):
        """
        If vocab_size is a Dataset with context_length, then no need to specify context_size
        """
        super().__init__()
        if context_size==None:
            if type(vocab_size)==TextChunksDataset:
                context_size = vocab_size.context_length
            else:
                raise Exception("You need to specify the context length")
        self.block_size = context_size
        if type(vocab_size)==TextChunksDataset:
            vocab_size=len(vocab_size.tokenizer)
        elif type(vocab_size)==CharTokenizer:
            vocab_size=len(vocab_size)
        # each token has a probability distribution of appearing depending on the last token
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(self.block_size, n_embd)
        self.sa_heads = MultiHeadAttention(num_heads, n_embd, n_embd//num_heads, vocab_size)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_embd = self.token_embedding_table(idx) # (B,T,C)
        pos_embd = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,C)
        x = tok_embd + pos_embd # (B,T,C)
        x = self.sa_heads(x)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss=None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, idx, max_new_tokens: int):
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(idx[:,-self.block_size:])
            # focus only on the last time step
            logits = logits[:,-1,:]
            # apply softmax to get the probabilities
            probs = F.softmax(logits, dim=1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled text to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx
        

In [34]:
m = BigramLanguageModel(train_data, n_embd, num_heads=4)
m.to(device)
xb, yb = train_data[:10]
out = m(xb, yb)
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


JnMLU$F zGqHHF7,e27M5d0DyauP.RH-YB?NBAnni,G67*ti-%i>rf:p422QBB>T4oNU
AOwy-BUk liFD8rSf3%5Cja:iHyyJf


It's working!

In [35]:
# create a PyTorch optimizer
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-4)

In [36]:
train(optimizer, 500)

step 500: train loss 3.7187, val loss 3.7363
done!


In [37]:
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


wyPjuhV
IuE=6!0DsX jpp4Ofd>px2prd9*t7vlulyoee>obmk8v dDuBe0VGwBMwJCV,x'ZhfQVPn MJ*J*OQF?Bj43s…gqM?rb


Wow, started looking like something. We're very close to the goal!

### Let's add the feed forward

In [38]:
class FeedForward(nn.Module):
    """A simple linear layer followed by a non-linearity"""

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, n_embd),
            nn.ReLU(),
        )

    def forward(self, x):
        return self.net(x)

Let's reimplement again our `BigramLanguageModel` class

In [39]:
class BigramLanguageModel(Module):
    def __init__(self, vocab_size: int, n_embd, context_size=None, head_size=16, num_heads=4):
        """
        If vocab_size is a Dataset with context_length, then no need to specify context_size
        """
        super().__init__()
        if context_size==None:
            if type(vocab_size)==TextChunksDataset:
                context_size = vocab_size.context_length
            else:
                raise Exception("You need to specify the context length")
        self.block_size = context_size
        if type(vocab_size)==TextChunksDataset:
            vocab_size=len(vocab_size.tokenizer)
        elif type(vocab_size)==CharTokenizer:
            vocab_size=len(vocab_size)
        # each token has a probability distribution of appearing depending on the last token
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(self.block_size, n_embd)
        self.sa_heads = MultiHeadAttention(num_heads, n_embd, n_embd//num_heads, vocab_size)
        self.ffwd = FeedForward(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_embd = self.token_embedding_table(idx) # (B,T,C)
        pos_embd = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,C)
        x = tok_embd + pos_embd # (B,T,C)
        x = self.sa_heads(x)
        x = self.ffwd(x)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss=None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, idx, max_new_tokens: int):
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(idx[:,-self.block_size:])
            # focus only on the last time step
            logits = logits[:,-1,:]
            # apply softmax to get the probabilities
            probs = F.softmax(logits, dim=1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled text to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx
        

In [40]:
m = BigramLanguageModel(train_data, n_embd, num_heads=4)
m.to(device)
xb, yb = train_data[:10]
out = m(xb, yb)
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


f2…o?ZeLaW=LwR!H'O…>…5l/TmM2J
veB>8'K7?OSe>70Wj0K4T>'8?xLZu1>WSdV
zfx=sxFvJ:gZ62laVMN/N373ufJtd37QE5


Working again!

In [41]:
# create a PyTorch optimizer
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-4)

> Note: for quick execution purposes, only a small number of epochs have been put here by default, to enable quick execution. Please note that you should at least have 25000 epochs

In [42]:
train(optimizer, 1000, 1000)

step 1000: train loss 3.1368, val loss 3.2254
done!


In [43]:
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


Cs?g es wt Fe buhp:Wato yoo  h.ehtimsderra d 
7hg ra sehbh cootottdLW f
eEnsa  Oedio 
fGseeh.s wzino


## Implementing **The Block** class (that we put in a sequence multiple times)

For a better optimisation, we will also redefine `MultiHeadAttention` and `FeedForward` by adding a projection layer.
We also add `Dropout` layers everywhere.

In [44]:
class Head(nn.Module):
    """One head of self attention"""

    def __init__(self, n_embd: int, head_size: int, block_size: int, dropout=0.2) -> None:
        super().__init__()
        self.head_size = head_size
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)   # (B, T, C)
        q = self.query(x) # (B, T, C)
        # Compute attention score ('affinities')
        wei = q @ k.transpose(-2, -1) * (self.head_size**-0.5) # (B, T, head_size) @ (B, head_size, T) --> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1)
        # Perform the weighted aggregation of the values
        v = self.value(x) # (B, T, C)
        out = wei @ v     # (B, T, T) @ (B, T, C) --> (B, T, C)
        return out        # (B, T, C)

class MultiHeadAttention (nn.Module):
    """multiple heads of self-attention in parallel """
    def __init__(self, num_heads: int, n_embd: int, head_size: int, block_size: int, dropout=0.2):
        super().__init__()
        self.heads = nn.ModuleList([Head(n_embd, head_size, block_size, dropout) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out


class FeedForward(nn.Module):
    """A simple linear layer followed by a non-linearity"""

    def __init__(self, n_embd: int, dropout=0.2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """Transformer block: communication followed by computation"""

    def __init__(self, n_embd, n_heads, block_size, dropout=0.2):
        # n_embd: embedding dimension, n_heads: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_heads
        self.sa = MultiHeadAttention(n_heads, n_embd, head_size, block_size, dropout=dropout)
        self.ffwd = FeedForward(n_embd, dropout=dropout)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

Reimplementing our `BigramLanguageModel` class

In [45]:
class BigramLanguageModel(Module):
    def __init__(self, vocab_size: int, n_embd, n_layers, context_size=None, head_size=16, n_heads=4, dropout=0.2):
        """
        If vocab_size is a Dataset with context_length, then no need to specify context_size
        """
        super().__init__()
        if context_size==None:
            if type(vocab_size)==TextChunksDataset:
                context_size = vocab_size.context_length
            else:
                raise Exception("You need to specify the context length")
        self.block_size = context_size
        if type(vocab_size)==TextChunksDataset:
            vocab_size=len(vocab_size.tokenizer)
        elif type(vocab_size)==CharTokenizer:
            vocab_size=len(vocab_size)
        # each token has a probability distribution of appearing depending on the last token
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(self.block_size, n_embd)
        self.blocks = nn.Sequential(
            Block(n_embd, n_heads,self.block_size, dropout=dropout),
            Block(n_embd, n_heads,self.block_size, dropout=dropout),
            Block(n_embd, n_heads,self.block_size, dropout=dropout),
            nn.LayerNorm(n_embd),
        )
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_embd = self.token_embedding_table(idx) # (B,T,C)
        pos_embd = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,C)
        x = tok_embd + pos_embd # (B,T,C)
        x = self.blocks(x)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss=None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, idx, max_new_tokens: int):
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(idx[:,-self.block_size:])
            # focus only on the last time step
            logits = logits[:,-1,:]
            # apply softmax to get the probabilities
            probs = F.softmax(logits, dim=1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled text to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx
        

In [46]:
m = BigramLanguageModel(train_data, n_embd, n_layers=3, n_heads=4, head_size=16)
xb, yb = train_data[:10]
out = m(xb, yb)
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=100)[0]))


$!Va…whj%27vaD%qcR5PTMyXFA0zB5v8V7yk/En?z!Ik6Nf%G"L…9!"Hm=R!YLhI5M2m3c3KXGYvU"mYx!'Ccy1WuKesNO
IY=K


In [47]:
# create a PyTorch optimizer
optimizer = torch.optim.AdamW(m.parameters(), lr=3*1e-4)

In [48]:
train(optimizer, 120, 40)

step 1000: train loss 2.7266, val loss 2.7741
step 2000: train loss 2.5372, val loss 2.6025
step 3000: train loss 2.4203, val loss 2.5030
done!


In [49]:
print(tokenizer.decodeText(m.generate(idx= torch.zeros((1,1), dtype=torch.long), max_new_tokens=200)[0]))



$ oniser bun heatyx thinleale t I q aory dko t me fo I2ongot i sisriith
pisthe wout be deyenou'pns weppy a t eave s he c yoise. ngho ingicong indod lap maotepat wel y yo IHandoout t chitssreo
!t'rely


In [50]:
def autoCompletePrint(model, text, max_tokens = 300, step=1):
    idx = tokenizer.encode(text)
    i = len(idx)
    print(text, end='')
    idx = torch.reshape(idx, (1, len(idx)))
    for j in range(0, max_tokens, step):
        res = m.generate(idx= idx, max_new_tokens=2)
        print(tokenizer.decodeText(res[0][i + j: i+j+step]), end='')
        idx = res


In [51]:
autoCompletePrint(None, "I am the best", max_tokens=400)

I am the best I'the thhengy ban ve ate he ciman iyoull ame bngora ipere y? herl
atousear
merppe bapine t' y tha to icoutsu alee'ct t maro madinillseclle s focon owyoue llndot on'sea erean 
ouamellrve the shonde swiko a I gucethidopoe t d yndou'courels ow ang, thancrlyontend ouitl
nel at
acany oufa t hlene on ceingom andisfHhe ald Sldogd woumey wou'en ciend syof lneth w thoudo ag o7
adomm, d Ke mincond las de o

## Adding these classes to the libs

You can now import directly these files as follows

In [52]:
from models import BasicSelfAttentionLanguageModel

from models.utils import Head, MultiHeadAttention, FeedForward, Block