In [2]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from torch.nn import functional as F

In [3]:
batch_size = 64 # B
block_size = 8 # T
# C = 65 -> vocab length
head_size = 16
steps = 5000
device = 'cuda' if torch.cuda.is_available() else 'cpu'
learning_rate = 1e-3
eval_iters = 200
n_embd = 64
n_head = 4
n_layer = 4
dropout_percent = 0.0

In [4]:
from datasets import load_dataset
ds = load_dataset("Trelis/tiny-shakespeare")

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
ds_ = ds['train']['Text']

In [6]:
chars = sorted(set(''.join(ds_))) # vocabulory
len(chars)

65

In [7]:
encoder_dict = {k:v for k,v in zip(chars, range(len(chars)))}
decoder_dict = {v:k for k,v in zip(chars, range(len(chars)))}

# Encoder, Decoder
encode = lambda x: [encoder_dict[letter] for letter in x]
decode = lambda x: ''.join([decoder_dict[letter] for letter in x])
encode('hello'), decode([46, 43, 50, 50, 53])

([46, 43, 50, 50, 53], 'hello')

In [8]:
ds_all = '\n'.join(ds_)
ds_encoded = encode(ds_all)

In [9]:
n = int(.9*len(ds_encoded))
train_data = ds_encoded[:n]
val_data = ds_encoded[n:]

In [10]:
def get_batch(data, block_size=block_size, batch_size=batch_size):
    # print(len(data))
    ix = torch.randint(len(data)-block_size, (batch_size,)) # these numbers are the start of each batch
    xy = torch.tensor([[data[i] for i in range(ix[i],ix[i]+block_size+1)] for i in range(batch_size)])
    xb = xy[:, :block_size]
    yb = xy[:,1:]
    # print(xb.shape, yb.shape) # (B x T)
    return xb, yb

get_batch(train_data)

(tensor([[51, 39, 49, 43,  1, 35, 47, 50],
         [42, 43, 57,  1, 39, 50, 50,  1],
         [43, 58,  1, 39, 40, 53, 39, 56],
         [21,  1, 51, 59, 57, 58,  1, 46],
         [ 1, 21,  1, 61, 47, 50, 50,  1],
         [50, 47, 52, 45,  1, 47, 52,  1],
         [43, 56, 60, 43, 57,  1, 51, 63],
         [51, 53, 56, 43,  1, 53, 44,  1],
         [43,  1, 52, 53,  1, 51, 39, 52],
         [45, 46, 39, 51,  1, 58, 53,  1],
         [54, 39, 58, 47, 43, 52, 58,  8],
         [17, 26, 15, 17, 10,  0, 27, 52],
         [43, 56, 43, 44, 53, 56, 43,  1],
         [47, 57,  1, 42, 39, 51, 52, 39],
         [14, 17, 26, 34, 27, 24, 21, 27],
         [46, 43,  1, 54, 39, 56, 50, 47],
         [ 1, 51, 43,  1, 44, 39, 58, 46],
         [ 1, 45, 47, 60, 43, 52,  1, 41],
         [43, 43, 54,  1, 39, 52, 42,  1],
         [46, 43,  1, 42, 43, 39, 58, 46],
         [57, 46, 43,  7,  7,  0,  0, 30],
         [56, 43,  1, 40, 59, 56, 52, 57],
         [42,  1, 15, 39, 51, 47, 50, 50],
         [5

In [11]:
# write code to estimate loss with train and test dataset

In [12]:
class Head(nn.Module):
    '''one head in self attention'''
    def __init__(self):
        super().__init__()
        self.key = nn.Linear(n_embd, int(n_embd/n_head), bias=False)
        self.query = nn.Linear(n_embd, int(n_embd/n_head), bias=False)
        self.value = nn.Linear(n_embd, int(n_embd/n_head), bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size))) # registers are saved along with the model, but are never updated during training. They dont count as model parameters
        self.dropout = nn.Dropout(dropout_percent)
    
    def forward(self, x):
        B ,T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        # compute attention score
        wei = q @ k.transpose(-2, -1) / C**0.5
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # mask future tokens
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        
        return wei@v

In [15]:
class MultiHeadAttention(nn.Module):
    '''multiple attention heads in parallel'''

    def __init__(self, num_heads):
        super().__init__()
        self.heads = nn.ModuleList([Head(int(n_embd/n_head)) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd) # to apply a linear transformation to concat heads
        self.dropout = nn.Dropout(dropout_percent)
    
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))

In [16]:
class FeedForward(nn.Module):
    
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd, n_embd),
            nn.Dropout(dropout_percent),
        )

    def forward(self, x):
        return self.net(x)

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, n_embd, n_head):
        

In [13]:
class LLM(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)
    
    def forward(self, idx, targets=None):
        logits = self.token_embedding_table(idx)
        if targets == None:
            loss = None
        else:
            B, T, C = logits.shape
            # print(B, T, C)
            logits = logits.view(B*T, C)
            # print(targets.shape, B*T)
            # print(targets.dtype)
            targets = targets.float().view(B*T).long()
            # print(targets.shape)
            # print(targets)
            loss = F.cross_entropy(logits, targets)
            # print(logits.shape, loss.shape)
        return logits, loss
    
    def generate(self, idx, max_tokens=100):
        for _ in range(max_tokens):
            logits, loss = self(idx)
            # logits is (B x T x C)
            logits = logits[:, -1, :] # take only the last(latest) one in T component
            probs = F.softmax(logits, dim=-1) # (B, C)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

llm = LLM(len(chars))
# llm.forward(get_batch(train_data)[0])
# decode(llm.generate(get_batch(train_data)[0]))
print(decode(llm.generate(idx = torch.zeros((1, 1), dtype=torch.long))[0].tolist()))



iVivcMsXLlk HQsqrUSPfbNydYys?EuX xzE!rOWrdTid$-wwaOn?QtqxRkPBzkyWusZDo&YTNSx'GIbo-ii!GF:IGI$u,F,,gzo


In [14]:
optimizer = torch.optim.AdamW(llm.parameters(), lr=1e-3)
for steps in range(1000): # increase number of steps for good results...

    # sample a batch of data
    xb, yb = get_batch(train_data)

    # evaluate the loss
    logits, loss = llm(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
    # if steps % 100 == 0: print(loss.item())
print(loss.item())

3.677424192428589
