In [None]:
# !kaggle datasets download -d eswarreddy12/family-guy-dialogues-with-various-lexicon-ratings
# !mkdir -p data
# !unzip family-guy-dialogues-with-various-lexicon-ratings.zip -d data
# !rm family-guy-dialogues-with-various-lexicon-ratings.zip

In [None]:
!ls data/

In [None]:
import csv

file_path = "data/Family_Guy_Final_NRC_AFINN_BING.csv"

# Print headings
with open(file_path, "r") as f:
    csvf = csv.reader(f, delimiter=",", quotechar="'")
    headings = next(csvf)  # Read the first row (headings)
    print(", ".join(headings))

In [None]:
corpus = []

# Read dialogue and create the corpus
with open(file_path, "r") as f:
    csvf = csv.DictReader(f, delimiter=",", quotechar="'")
    for row in csvf:
        dialogue = row["Dialogue"].strip('"')  # Remove double quotes
        corpus.append(dialogue)

In [None]:
# Print the first 5 entries of the corpus
print("Corpus:")
for i in range(5):
    print(corpus[i])

print(f"Corpus line num: {len(corpus)}")

corpus_str = "\n".join(corpus)

print(f"Corpus str len: {len(corpus_str)}")

print(corpus_str[:1000])

In [None]:
# Unique chars in the corpus

chars = sorted(list(set(corpus_str)))
vocab_size = len(chars)
print("".join(chars))
print(f"Vocab size: {vocab_size}")

# TODO: would need to remove chars from corpus too
# chars = chars[:91] # remove special chars
# vocab_size = len(chars)
# print(''.join(chars))
# print(f"Vocab size: {vocab_size}")

In [None]:
# Create mapping from chars to ints

stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}

encode = lambda s: [stoi[c] for c in s]
decode = lambda l: "".join([itos[i] for i in l])

print(encode("Hi there!"))
print(decode(encode("Hi there!")))

In [None]:
import torch

data = torch.tensor(encode(corpus_str), dtype=torch.long)

In [None]:
print(data.shape)
print(data.dtype)
print(data[:100])

In [None]:
# Split dataset into train and validation
n = int(0.9 * len(corpus_str))

train_data, val_data = data[:n], data[n:]

In [None]:
block_size = 8
train_data[: block_size + 1]

In [None]:
x = train_data[:block_size]
y = train_data[1 : block_size + 1]

for t in range(block_size):
    context = x[: t + 1]
    target = y[t]
    print(f"when input is {context} the target is {target}")

In [None]:
torch.manual_seed(1337)

batch_size = 4  # how many independent sequences will we process in parallel?
block_size = 8  # what is the max context length for predictions?


def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == "train" else val_data

    ix = torch.randint(len(data) - block_size, (batch_size,))

    x = torch.stack([data[i : i + block_size] for i in ix])
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix])

    return x, y


(
    xb,
    yb,
) = get_batch("train")

print("inputs:")
print(xb.shape)
print(xb)
print("targets:")
print(yb.shape)
print(yb)

print("\n-----\n")

for b in range(batch_size):  # batch dim
    for t in range(block_size):  # time dim
        context = xb[b, : t + 1]
        target = yb[b, 1]
        print(f"when input is {context.tolist()} the target is {target}")

In [None]:
import torch.nn as nn
from torch.nn import functional as F


class BigramLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        # each token reads the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

    def forward(self, idx, targets=None):
        # idx and targets are both (B, T) tensors of ints
        logits = self.token_embedding_table(idx)  # (B, T, C)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)  # (B, C, T)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context

        for _ in range(max_new_tokens):
            # get predictions
            logits, loss = self(idx)
            # focus only on the last time step
            logits = logits[:, -1, :]  # ==> (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1)  # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)  # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1)  # (B, T+1)

        return idx

In [None]:
import math

m = BigramLanguageModel(vocab_size)
logits, loss = m(xb, yb)
print(logits.shape)
print(loss)

print("Expected loss: " + str(-math.log(1 / vocab_size)))

print("\n-----\n")

idx = torch.zeros((1, 1), dtype=torch.long)

generated = decode(m.generate(idx , max_new_tokens=100)[0].tolist())

print(generated)

In [None]:
# Train model
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)

In [None]:
batch_size = 32
for steps in range(10000):
    # sample a batch of data
    xb, yb = get_batch('train')

    # eval the loss
    logits, loss = m(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())

In [None]:
generated = decode(m.generate(idx , max_new_tokens=500)[0].tolist())
print(generated)

---

# The Mathematical Trick in Self-Attention

In [31]:
import torch

torch.manual_seed(1337)

B, T, C = 4, 8, 2 # batch, time, channels
x = torch.randn(B, T, C)
x.shape

torch.Size([4, 8, 2])

In [43]:
"""
We want:

    x[b, t] = mean_{i<=t} x[b, i]
"""
xbow = torch.zeros((B, T, C))  # bow ==> bag of words
for b in range(B):
    for t in range(T):
        xprev = x[b, : t + 1]  # (t, C)
        xbow[b, t] = torch.mean(xprev, 0)


In [44]:
x[0]

tensor([[ 0.1808, -0.0700],
        [-0.3596, -0.9152],
        [ 0.6258,  0.0255],
        [ 0.9545,  0.0643],
        [ 0.3612,  1.1679],
        [-1.3499, -0.5102],
        [ 0.2360, -0.2398],
        [-0.9211,  1.5433]])

In [None]:
xbow[0]

In [None]:
"""
Matrix multiplcation trick to get sequential averages
"""

# torch.manual_seed(42)

# a = torch.ones(3, 3)
a = torch.tril(torch.ones(3, 3))
a = a / torch.sum(a, 1, keepdim=True)
b = torch.randint(0, 10, (3, 2)).float()
c = a @ b

print(f"a =\n{a}")
print("-----")
print(f"b =\n{b}")
print("-----")
print(f"c =\n{c}")

In [45]:
weights = torch.tril(torch.ones(T, T))
weights = weights / weights.sum(1, keepdim=True)

xbow2 = weights @ x # (B, T, T) @ (B, T, C) ---> (B, T, C)

torch.allclose(xbow, xbow2)

# xbow[0], xbow2[0]

False