In [1]:
# Byte Pair Encoder
import re
from collections import defaultdict
training_data = open("family-guy.txt", "r")
text = training_data.read()
charset = "".join(sorted(set(text)))

tokens = re.split(r'([\(\)\[\].:,!\s])', text)
tokens = list(filter(lambda x: x != "", tokens))

tokens = [list(token) for token in tokens]
k = 80

token_map = defaultdict(int)

# prepare initial tokenset of purely characters
for ch in text:
    token_map[ch] += 1


for _ in range(k):
    updated_lst = []
    bpe_table = defaultdict(int)
    # find the highest frequency byte
    for token in tokens:
        if len(token) == 1:
            continue
        for i in range(len(token) - 1):
            bpe_table["".join(token[i:i+2])] += 1

    # get maximum frequency byte
    max_bp = max(bpe_table, key=bpe_table.get)

    for token in tokens:
        temp_token = []
        if len(token) == 1:
            updated_lst.append(token)
            continue

        cont_flag = 0

        # go over each character in token
        for i in range(len(token)):
            # avoid repeating tokens        
            if cont_flag:
                cont_flag = 0
                continue
            if i != len(token) - 1:
                pair = token[i:i+2]
                # print(pair)
                if "".join(pair) == max_bp:
                    temp_token.append(max_bp)
                    token_map[max_bp] += 1
                    token_map[pair[0]] -= 1
                    token_map[pair[1]] -= 1
                    cont_flag = 1
                    continue

            # else add character
            temp_token.append(token[i])
            token_map[token[i]] += 1
            
        updated_lst.append(temp_token)

    # set tokens to updated_lst of merged token
    tokens = updated_lst

In [2]:
token_set = sorted(list(set([token_key for token_key in token_map if token_map[token_key] > -1])))

from itertools import chain

bpe_text_in = list(chain.from_iterable(updated_lst))

# # training_data = open("family-guy.txt", "r")
# # training_data = training_data.read()
# # charset = "".join(sorted(set(training_data)))

# def encoder(text_in):
#     pass
            

encoder = lambda text_in: [token_set.index(s) for s in text_in]
decoder = lambda indices: [token_set[index] for index in indices]
device = "mps"

# token_set
# print(token_map)
# token_set[-40:]
# encoder(bpe_text_in[])

In [1]:
import torch
from random import randint
import numpy as np

# context window of 512
block_size = 380
batch_size = 64
vocab_size = len(token_set)
n_embed = 80*6
dropout = 0.2
n_heads = 6
num_blocks = 6
n_steps = 5000


data = torch.tensor(encoder(bpe_text_in), dtype=torch.long, device=device)
n = int(0.9*len(data))
training_set = data[:n]
validation_set = data[n:]


def generate_batch():
        randlst = torch.randint(len(training_set) - block_size, (batch_size,))#.to(device="mps")
        batch = torch.stack([training_set[i:i+block_size] for i in randlst])#.to(device="mps")
        targets = torch.stack([training_set[i+1: i+block_size+1] for i in randlst])#.to(device="mps")
        return batch,targets

# x,y = generate_batch()

NameError: name 'token_set' is not defined

In [4]:
from torch import nn
from torch.nn import functional as F


class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        # print(head_size)
        self.head_size = head_size
        self.batch_qkv_matrices = nn.Linear(n_embed, head_size * n_heads * 3, bias=False) 
        # self.query = nn.Linear(n_embed, head_size, bias=False)
        # self.key = nn.Linear(n_embed, head_size, bias=False)
        # self.value = nn.Linear(n_embed, head_size, bias=False)
        self.register_buffer("tril", torch.tril(torch.ones((block_size, block_size))))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # print(x.shape)
        # initially tensor of B,T,C
        q,k,v = self.batch_qkv_matrices(x).split(self.head_size * n_heads, dim=-1) # Now Q,K,V of dim B, T, head size * n_heads
        
        B,T,C = x.shape
        # print(q.shape)
        # assert C % n_heads == 0
        # print(q.shape)
        q = q.view(B, T, n_heads, self.head_size).transpose(1,2) # Now of shape B, n_heads, T, head_size for BMM
        k = k.view(B, T, n_heads, self.head_size).transpose(1,2)
        v = v.view(B, T, n_heads, self.head_size).transpose(1,2)
        # transpose because that's how matmul works
        weight_mat = q @ k.transpose(-2, -1)
        weight_mat = weight_mat * (self.head_size ** -0.5) # 
        # print(weight_mat.shape)
        weight_mat = weight_mat.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
        weight_mat = F.softmax(weight_mat, dim=-1)
        # # regularisation prevent overfitting
        weight_mat = self.dropout(weight_mat)

        # print(v.shape, weight_mat.shape)

        res = weight_mat @ v
        res = res.transpose(1,2) # B, n_heads, T, C --> B, T, n_heads, C
        res = res.contiguous().view(B, T, C)
        # print(res.shape)
        return res

# head = Head(64)
# x = torch.randn((4, 5, n_embed))
# head(x)
# 
# 
class MHAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.att_heads = Head(head_size=head_size) #nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.projection = nn.Linear(n_embed, n_embed)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        res = self.att_heads(x)
        # res = torch.cat([att_head(x) for att_head in self.att_heads], dim=-1)
        res = self.dropout(self.projection(res))
        return res 

In [5]:
class Feedforward(nn.Module):
    def __init__(self, n_embed) -> None:
        super().__init__()
        scale_factor = 4
        self.ff = nn.Sequential(
            nn.Linear(n_embed, n_embed * scale_factor),
            nn.ReLU(),
            nn.Linear(n_embed * scale_factor, n_embed),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.ff(x)


class Block(nn.Module):
    def __init__(self, n_embed, n_heads) -> None:
        super().__init__()
        self.ff = Feedforward(n_embed)
        self.mhatt = MHAttention(n_heads, (n_embed // n_heads))
        self.layer_norm1 = nn.LayerNorm(n_embed) 
        self.layer_norm2 = nn.LayerNorm(n_embed)
    def forward(self, x):
        x = x + self.mhatt(self.layer_norm1(x))
        x = x + self.ff(self.layer_norm2(x))
        return x 
    


In [6]:
class GPT(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding_table = nn.Embedding(vocab_size, n_embed)
        self.pos_embedding_table = nn.Embedding(block_size, n_embed)
       
        # self.att = Head(n_embed)
        self.blocks = nn.Sequential(*[Block(n_embed, n_heads) for _ in range(num_blocks)])

        self.layernorm =  nn.LayerNorm(n_embed)

        self.lin1 = nn.Linear(n_embed, vocab_size)

        # self.att = MHAttention(n_heads, n_embed // n_heads)
        # self.ff = Feedforward(n_embed)
        
    def forward(self, data, target=None):
        token_layer = self.embedding_table(data)
        # print(token_layer.shape)
        B,T = data.shape
        pos_embed = self.pos_embedding_table(torch.arange(T).to(device))
        total = token_layer + pos_embed
        total = self.layernorm(self.blocks(total))
        logits = self.lin1(total)
          
        if target != None:
            B,T,C = logits.shape
            logits = logits.view(B*T, C) 
            target = target.view(B*T)
            loss = F.cross_entropy(logits, target)
            return logits, loss
        return logits, None
    

    def predict(self, data, num_tokens=100):
        curr = data
        for _ in range(num_tokens):
            data_mod = curr[:, -block_size:]
            # print(data.shape)
            logits,lossNone = self(data_mod)

            logits = logits[:,-1,:]
            prob_dist = F.softmax(logits, dim=-1)
            sample = torch.multinomial(prob_dist, num_samples=1)
            curr = torch.cat((curr, sample), dim=1)

        return curr

gpt = GPT().to(device)
gpt.train()
# logits, loss = bgm(x,y)
# bgm()
# print("".join(decoder(bgm.predict(data=torch.zeros(1,1, dtype=torch.long))[0].tolist())))

BigramEmbeddingModel(
  (embedding_table): Embedding(188, 480)
  (pos_embedding_table): Embedding(380, 480)
  (blocks): Sequential(
    (0): Block(
      (ff): Feedforward(
        (ff): Sequential(
          (0): Linear(in_features=480, out_features=1920, bias=True)
          (1): ReLU()
          (2): Linear(in_features=1920, out_features=480, bias=True)
          (3): Dropout(p=0.2, inplace=False)
        )
      )
      (mhatt): MHAttention(
        (att_heads): Head(
          (batch_qkv_matrices): Linear(in_features=480, out_features=1440, bias=False)
          (dropout): Dropout(p=0.2, inplace=False)
        )
        (projection): Linear(in_features=480, out_features=480, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
      )
      (layer_norm1): LayerNorm((480,), eps=1e-05, elementwise_affine=True)
      (layer_norm2): LayerNorm((480,), eps=1e-05, elementwise_affine=True)
    )
    (1): Block(
      (ff): Feedforward(
        (ff): Sequential(
          (0): Linea

In [7]:
# # training loop

# optimiser = torch.optim.AdamW(bgm.parameters(), lr=3e-4)
# for steps in range(n_steps):
#     batch,target = generate_batch()
#     logits, loss = bgm(batch,target)
#     optimiser.zero_grad(set_to_none=True)
#     loss.backward()
    
#     optimiser.step() 
#     if steps % 50 == 0:
#         print(loss.item())


In [22]:
test = torch.tensor(encoder(["Peter", ":", " "]), dtype=torch.long, device=device)
# print(test)
# # # # # print(test.shape)
K = 1
test = test.unsqueeze(0).repeat(K, 1)
new_model = gpt().to(device)
new_model.load_state_dict(torch.load("family-guy-lm-BPE-C380-E80"))
new_model.eval()
# # # # print(test.shape)
# # # # bgm.eval()
# # # family_guy_file = open("family-guy-text-400.txt", "w+")

 
# # # # bgm.eval()
with torch.no_grad():
# #     pass
# #     # print("".join(decoder(new_model.predict(torch.zeros(1,1, dtype=torch.long).to(device), num_tokens=20)[0].tolist())))
    out = new_model.predict(test, num_tokens=400)
#     # print(out)
    print("".join(decoder(out[0].tolist())))
# # #     for i in range(K): 
# # #         family_guy_file.write("".join(decoder(out[i].tolist())))
# # #         family_guy_file.write("\n")

# # # family_guy_file.close()

Peter: Come on in,  fella. Great news Lowe husband? Come
  on!
  Stewie: Well, here we are. Chris, your father and show them who are
  still family.
  Back to Peter & Lois in the outside. And Meg is beginning to Brian
  and Stewie are still with a child big stretch
  beangst the night birthday.
  [Alarming instrumental music]
  Lois: Oh, Brian! Okay, I don't know any of this, did you take this army
  jouster?
  Brian: Thanks anyway. Devon this far before I farted, I have Calpa
  Farpet Farm.
  Chris: You don't find your insideeeeeeeeeeeeeeeeeeeeeee


In [9]:
# B,T,C = 4,8,2
# rand_tensor = torch.randn(size=(B,T,C))
# avg_tensor = torch.zeros(size=(B,T,C))
# for b in range((B)):
#     for t in range(T):
#         window = rand_tensor[b, :t+1]
#         avg_tensor[b,t] = torch.mean(window,0)
# avg_tensor[0]



In [10]:
# filter_matrix = torch.tril(torch.ones((T,T)))
# affinity = torch.zeros((T,T))
# affinity = affinity.masked_fill(filter_matrix == 0, float("-inf"))
# affinity = F.softmax(affinity, dim=1)
# affinity @ rand_tensor

In [11]:
# torch.save(bgm.state_dict(), "family-guy-lm-BPE-C380-E80")

In [12]:
sum(p.numel() for p in gpt.parameters() if p.requires_grad)

16981628