In [21]:
import torch
import torch.nn as nn
from dataclasses import dataclass

import requests
import unicodedata

from jaxtyping import Int, Float
from collections import Counter
import numpy as np

# Classes

In [22]:
@dataclass
class Config:
    d_model: int
    d_vocab: int
    d_hidden: int
    n_context: int
    n_layers: int

In [23]:
# class Embedding(nn.Module):
#     def __init__(self):
#         super().__init__()
    
#     def forward(self):
#         pass

class Attention(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        # self.W_qk = nn.Linear(config.d_model, config.d_vocab)
        self.bilinear = nn.Bilinear(config.d_model, config.d_model, config.n_context, bias=False)
        self.M = torch.triu(torch.ones((config.n_context, config.n_context)), diagonal=1)
        self.M = self.M.masked_fill(self.M.bool(), -torch.inf)
        self.second_matmult = nn.Linear(config.d_model, config.d_model, bias=False)
        self.softmax = nn.Softmax()
    
    def forward(self, x):
        xwx = self.bilinear(x, x) # d_m x d_m
        x_masked = xwx+ self.M 
        x_softmaxed = self.softmax(x_masked)
        x_fin = x_softmaxed@x
        #multiply softmaxed by x
        #multiply that by wov
        x_fin = self.second_matmult(x_fin)
        return x_fin

class MLP(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        self.linear_up = nn.Linear(config.d_model, config.d_hidden)
        self.linear_down = nn.Linear(config.d_hidden, config.d_model)
    
    def forward(self, x):
        x = self.linear_up(x)
        x = torch.relu(x)
        x = self.linear_down(x)
        return x
    
class TransformerBlock(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        self.config = config

        self.MLP = MLP(config=self.config)
        self.Attention = Attention(config=self.config)
    
    def forward(self, x):
        return x + self.Attention(x) + self.MLP(x)
    
class Transformer(nn.Module):
    def __init__(self, config:Config):
        super().__init__()
        self.embedding = nn.Embedding(num_embeddings=config.d_vocab, embedding_dim=config.d_model)
        self.transformerBlock = nn.ModuleList([TransformerBlock(config) for _ in range(config.n_layers)])

    def forward(self, x):
        x = self.embedding(x)
        for i, l in enumerate(self.transformerBlock):
            x = self.transformerBlock[i](x)
        return x

$n_c$: Context window length

$d_m$: Model Dimension

$d_v$: Vocab Dimension

In [24]:
text_sample = "The quick brown fox jumped over the lazy dog."

# Tokenization Code

In [25]:
from pathlib import Path

def get_gutenberg_book(
	id: int | None = 84,
	data_temp: Path | str = "../data/gutenberg_data",
	remove_gutenberg_meta: bool = True,
) -> str:
	
	data_temp: Path = Path(data_temp)
	data_temp.mkdir(parents=True, exist_ok=True)
	
	url: str = f"https://www.gutenberg.org/cache/epub/{id}/pg{id}.txt"
	data_path: Path = Path(data_temp) / f"{id}.txt"
	data: str
	# read from cache if it exists
	if data_path.exists():
		with open(data_path, 'r', encoding='utf-8') as file:
			data = file.read()
	else:
		# download if it doesn't exist
		response: requests.Response = requests.get(url)
		response.raise_for_status()  # Ensure that the download was successful
		data = response.text

		# save to cache
		with open(data_path, 'w', encoding='utf-8') as file:
			file.write(data)

	# remove header/footer
	if remove_gutenberg_meta:
		data = '***'.join(data.split('***')[2:])
		data = '***'.join(data.split('***')[:-1])
	
	return data

def get_many_books(
		ids: list[int],
		data_temp: Path | str = "../data/gutenberg_data",
	) -> list[str]:
	
	data: list[str] = []
	for id in ids:
		print(f"Getting book {id}...")
		item: str = get_gutenberg_book(id, data_temp)
		print(f"\t{len(item)} characters read")
		data.append(item)
	
	return data

In [26]:
def process_text(
	text: str,
	allowed_punctuation: str = "-.,;:!?()\"\\" + "".join(str(x) for x in range(10)),
	punctuation_convert: dict[str, str] = {'â€”': '-'},
) -> str:
	
	# replace some special characters which unicode won't normalize properly
	for char, replacement in punctuation_convert.items():
		text = text.replace(char, replacement)

	# if a line has ".jpg" in it, remove that line (this is specific to Don Quixote)
	text = '\n'.join(
		line 
		for line in text.split('\n')
		if '.jpg' not in line
	)

	# Normalize the string to decompose Unicode characters
	text = unicodedata.normalize('NFKD', text)

	# Encode to ASCII bytes, then decode back to string, ignoring errors
	text = text.encode('ascii', 'ignore').decode('ascii')

	# remove newlines and tabs
	text = text.replace('\n', ' ').replace('\t', ' ')


	# put spaces around allowed punctuation
	for char in allowed_punctuation:
		text = text.replace(char, f' {char} ')


	# remove leading and trailing spaces
	text = text.strip()

	# remove multiple spaces
	while '  ' in text:
		text = text.replace('  ', ' ')


	# remove all characters except (alphanumeric, allowed_punctuation, ' ')
	text = ''.join(
		(
			char 
			if (
				char.isalnum() 
				or char in allowed_punctuation 
				or char == ' '
			)
			else ' '
		)
		for char in text 
	)

	# convert to lowercase
	text = text.lower()

	text = text.strip()

	return text

In [27]:
def tokenize(
	text: str,
	process: bool = False,
) -> list[str]:
	if process:
		text = process_text(text)
	return text.split(' ')

In [28]:
# Getting books from Plato and Aristotle
DATA_RAW: list[str] = get_many_books([6762, 1497, 8438, 1600, 1656])
DATA: str = " ".join(process_text(x) for x in DATA_RAW)
DATA_TOKENIZED: list[str] = tokenize(DATA)

Getting book 6762...
	574887 characters read
Getting book 1497...
	1194507 characters read
Getting book 8438...
	636926 characters read
Getting book 1600...
	178335 characters read
Getting book 1656...
	85851 characters read


In [29]:
# sorted by frequency
VOCAB_FREQ: Counter[str] = Counter(DATA_TOKENIZED)
VOCAB_ARR: list[str] = [word for word, _ in VOCAB_FREQ.most_common()]
VOCAB_DICT: dict[str, int] = {word: i for i, word in enumerate(VOCAB_ARR)}

def encode(
	text: str | list[str],
) -> Int[np.ndarray, " n_tokens"]:
	if isinstance(text, str):
		text = tokenize(text)
	return np.array([VOCAB_DICT[word] for word in text])

def decode(
	encoded_text: Int[np.ndarray, " n_tokens"] | list[int],
) -> str:
	return ' '.join(VOCAB_ARR[i] for i in encoded_text)

DATA_ENCODED: Int[np.ndarray, " n_tokens"] = encode(DATA)

print(f"{DATA_ENCODED = }")
print(len(DATA_ENCODED))

DATA_ENCODED = array([1181,   25, 9326, ..., 4819, 4354, 1842], shape=(556819,))
556819


In [30]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, x):
        self.input = x[:-1]
        self.output = x[1:]
    
    def __len__(self):
        return len(self.input)
    
    def __getitem__(self, idx):
        inp = self.input[idx]
        out = self.output[idx]
        return inp, out

# Tests

In [31]:
d_model = 10
d_vocab = 10
d_hidden = 10
n_context = 5
n_layers = 10

x = torch.randn((n_context, d_model))

conf = Config(d_model, d_vocab, d_hidden, n_context, n_layers)
mlp = MLP(conf)
attention = Attention(conf)
Aoutput = attention(x)
print(Aoutput.shape)

output = mlp(x)
print(output)

torch.Size([5, 10])
tensor([[-0.5493, -0.2684, -0.0968,  0.3741,  0.0723,  0.0959, -0.2844, -0.1960,
         -0.4201,  0.3676],
        [-0.1929,  0.0076, -0.2940,  0.2168,  0.1678,  0.2253, -0.2690, -0.0567,
         -0.1951,  0.1583],
        [-0.2450, -0.0012, -0.1868,  0.3602, -0.1031, -0.1000, -0.2043,  0.0320,
         -0.2973,  0.1837],
        [ 0.0243, -0.0332, -0.1092,  0.2460,  0.1265,  0.1983, -0.2735, -0.0014,
         -0.2671,  0.1175],
        [-0.1961, -0.0961, -0.3776,  0.2605, -0.1658,  0.0275, -0.0723,  0.1392,
         -0.4074,  0.1715]], grad_fn=<AddmmBackward0>)


  return self._call_impl(*args, **kwargs)


In [32]:
# Transformer Block test

d_model = 10
d_vocab = len(VOCAB_DICT)
d_hidden = 10
n_context = 5
n_layers = 10

config = Config(
    d_model = d_model,
    d_vocab = d_vocab,
    d_hidden = d_hidden,
    n_context = n_context,
    n_layers = n_layers,
)

x = torch.randn((n_context, d_model))
conf = Config(d_model, d_vocab, d_hidden, n_context, n_layers)

tb = TransformerBlock(config)

output_x = tb(x)
output_x


tensor([[ 0.2773,  0.7253,  0.5719, -1.7431, -0.1302, -0.3975, -0.2319,  0.2165,
          1.0754, -0.0426],
        [-2.0967, -0.3822, -0.7674, -0.7428,  0.7722,  1.1141, -0.1975, -0.2895,
          3.2041, -0.6895],
        [-0.3861,  1.1865, -0.7214,  0.8240,  0.2769,  1.4384, -0.4193, -0.0209,
          1.0260,  1.0568],
        [ 0.7886, -0.1820,  0.4263,  0.3803,  0.8514,  1.6074, -0.7250,  0.4351,
          1.4699, -0.4978],
        [ 0.5343,  0.5985,  1.1189,  0.9977, -0.7954, -1.7470,  1.5594, -0.5268,
          1.3567, -0.8963]], grad_fn=<AddBackward0>)

## Training Loop

In [None]:
conf = Config(d_model = 10, 
              d_vocab = len(VOCAB_DICT), 
              d_hidden = 10, 
              n_context = 10, 
              n_layers = 2
              )

#dataset and dataloader
training_data = torch.utils.data.TensorDataset(torch.from_numpy(DATA_ENCODED[:-1]),torch.from_numpy(DATA_ENCODED[1:]))
train_dataloader = torch.utils.data.DataLoader(training_data, batch_size=64, shuffle=True)


model = Transformer(config=conf)


TypeError: embedding(): argument 'indices' (position 2) must be Tensor, not numpy.ndarray