In [1]:
import torch 

In [4]:
with open('input.txt', 'r', encoding='utf-8') as f:
    text_input = f.read()
text_input[:1000]

"First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you know Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us kill him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be done: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citizens, the patricians good.\nWhat authority surfeits on would relieve us: if they\nwould yield us but the superfluity, while it were\nwholesome, we might guess they relieved us humanely;\nbut they think we are too dear: the leanness that\nafflicts us, the object of our misery, is as an\ninventory to particularise their abundance; our\nsufferance is a gain to them Let us revenge this with\nour pikes, ere we become rakes: for the gods know I\nspeak this in hunger 

In [5]:
chars = sorted(list(set(text_input)))
vocab_size = len(chars)

In [6]:
print(chars)
print("Vocab size: ", vocab_size)

['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
Vocab size:  65


In [7]:
stoi = {c:i for i, c in enumerate(chars)}
itos = {i:c for i, c in enumerate(chars)}
encode = lambda s : [stoi[c] for c in s]
decode = lambda enc : ''.join([itos[i] for i in enc])

print(encode('Hii there!'))
print(decode(encode('Hii there!')))

[20, 47, 47, 1, 58, 46, 43, 56, 43, 2]
Hii there!


In [8]:
data = torch.tensor(encode(text_input), dtype=torch.long)
n = int(0.9*len(data))
train_data = data[:n]
val_data = data[n:]
train_data[:100]

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39, 52, 63,
         1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1, 51, 43,  1,
        57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31, 54, 43, 39, 49,
         6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56, 57, 58,  1, 15, 47,
        58, 47, 64, 43, 52, 10,  0, 37, 53, 59])

In [21]:
context_length = 8
batch_size = 5
def get_batch(split):
    data = train_data if split == "train" else val_data
    ix = torch.randint(0, len(data)-context_length, (batch_size,))
    x = torch.stack([data[i:i+context_length] for i in ix])
    y = torch.stack([data[i+1:i+context_length+1] for i in ix])
    return x, y
x, y = get_batch('train')
x, y

(tensor([[ 1, 51, 43, 56, 41, 63,  1, 53],
         [56, 58, 63,  1, 59, 52, 58, 53],
         [10,  0, 14, 59, 58,  1, 46, 43],
         [43,  1, 58, 46, 43,  1, 57, 47],
         [58, 39, 44, 44,  1, 53, 44,  1]]),
 tensor([[51, 43, 56, 41, 63,  1, 53, 44],
         [58, 63,  1, 59, 52, 58, 53,  1],
         [ 0, 14, 59, 58,  1, 46, 43,  5],
         [ 1, 58, 46, 43,  1, 57, 47, 45],
         [39, 44, 44,  1, 53, 44,  1, 53]]))

In [37]:
import torch.nn as nn 

class Embedding(nn.Module):
    def __init__(self, num_embeddings, embedding_dim) -> None:
        super().__init__()
        self.E = torch.randn(num_embeddings, embedding_dim)
    def forward(self, x):
        return self.E[x]
    
emb_dim = 6
embedding = Embedding(vocab_size, emb_dim)
embedding(x).dtype

torch.float32

In [59]:
from torch.functional import F

class Attention(nn.Module):
    def __init__(self, emb_dim, context_length, query_dim) -> None:
        super().__init__()
        self.query_dim = query_dim 
        self.emb_dim = emb_dim
        self.Q = nn.Linear(emb_dim, query_dim)
        self.K = nn.Linear(emb_dim, query_dim)
        self.V = nn.Linear(emb_dim, query_dim)
        self.register_buffer('tril_mask', torch.tril(torch.ones(context_length, context_length)))
    
    def forward(self, x):
        B, C, E = x.shape # x is a batch of encoded inputs: (B, C, E)
        queries = self.Q(x)  # (B, C, query_dim)
        keys = self.K(x)  # (B, C, query_dim)
        values = self.V(x) # (B, C, query_dim)
        activations = queries @ keys.transpose(-2, -1) * self.query_dim**-0.5 # (B, C, C)
        activations = activations.masked_fill(self.tril_mask[:C, :C] == 0, float('-inf'))
        weights = F.softmax(activations)  # (B, C, C)
        outputs = weights @ values # (B, context_length, query_dim)
        return outputs

attention = Attention(emb_dim=emb_dim, query_dim=10, context_length=context_length)
attention(embedding(x)).shape


  weights = F.softmax(activations)  # (B, C, C)


torch.Size([5, 8, 10])

In [51]:
class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention is parallel """
    def __init__(self, num_heads, head_size) -> None:
        super().__init__()
        emb_dim = num_heads * head_size
        self.heads = nn.ModuleList([Attention(emb_dim=emb_dim, context_length=context_length, query_dim=head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(num_heads*head_size, num_heads*head_size)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.proj(out)
        return out 


In [53]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim) -> None:
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )
    def forward(self, x):
        x = self.net(x)
        return x 


In [56]:
class TransformerBlock(nn.Module):
    def __init__(self, emb_dim, n_heads) -> None:
        super().__init__()
        assert emb_dim % n_heads == 0
        head_size = emb_dim // n_heads
        self.sa = MultiHeadAttention(num_heads=n_heads, head_size=head_size)
        self.ffwd = MLP(emb_dim, 4*emb_dim, emb_dim)
        self.l1 = nn.LayerNorm(emb_dim)
        self.l2 = nn.LayerNorm(emb_dim)

    def forward(self, x):
        x = x + self.sa(self.l1(x)) # apply layer normalization before attention
        x = x + self.ffwd(self.l2(x)) # apply layer normalization once more before feed forward
        return x

block = TransformerBlock(emb_dim=emb_dim, n_heads=2)
block(embedding(x)).shape

  weights = F.softmax(activations)  # (B, C, C)


torch.Size([5, 8, 6])

In [60]:
n_layer = 2
n_head = 2
class GPTlanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embeding = Embedding(num_embeddings=vocab_size, embedding_dim=emb_dim)
        self.pos_embedding = Embedding(num_embeddings=context_length, embedding_dim=emb_dim)
        self.blocks = nn.Sequential(*[TransformerBlock(emb_dim=emb_dim, n_heads=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(emb_dim)
        self.lm_head = nn.Linear(emb_dim, vocab_size)

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    
    def forward(self, x, targets):
        B, C = x.shape 
        tok_emb = self.token_embeding(x)
        pos_emb = self.pos_embedding(x)
        x = tok_emb + pos_emb  # (B, C, emb_dim)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)

        if targets is None:
            loss = None 
        else:    
            B, C, E = logits.shape 
            logits = logits.view(B*C, E)
            targets = targets.view(B*C)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, x, max_new_tokens):
        # x is (B, C) array of indices in current context
        for _ in range(max_new_tokens):
            x_cond = x[:, -context_length:]
            logits, _ = self(x_cond)  # Logits has size (B, C, vocab_size)
            logits = logits[:, -1, :]  # focus only on last character
            probs = F.softmax(logits, dim=-1)
            x_next = torch.multinomial(probs, num_samples=1)
            x = x.cat((x, x_next), dim=-1)
        return x 





In [58]:
t = torch.rand(5)
t[-6:]

tensor([0.2286, 0.8453, 0.6672, 0.7659, 0.6832])