In [1]:
import torch
import torch.nn as nn
import math
import urllib.request
import tiktoken

from torch.utils.data import Dataset, DataLoader

In [2]:
seq_size: int = 1
hidden_size: int = 4
batch_sz: int = 3

In [None]:
X, W_xh = torch.randn(batch_sz, seq_size), torch.randn(seq_size, hidden_size)
H, W_hh = torch.randn(batch_sz, hidden_size), torch.randn(hidden_size, hidden_size)
mul = torch.matmul(X, W_xh) + torch.matmul(H, W_hh)

print(H.shape, mul.shape)

In [2]:
class NaiveLSTM(nn.Module):
    def __init__(
            self,
            input_size: int,
            hidden_size: int
    ):
        '''
        input_size: int -> size of the embedding dimension in the transformer parlance
        hidden_size: int -> size of the hidden state
        '''
        super().__init__()
        self.input_size: int = input_size
        self.hidden_size: int = hidden_size

        # Input gate (I):
        # I_t = sigma(X_t.W_xi + H_t-1.W_hi + b_i)
        # Where:
        # X_t is X(the sequence) at time t
        # H_t-1 is H(the hidden state) at time t-1
        # W_xi is the weight matrix of X to I gate
        self.W_xi: nn.Parameter = nn.Parameter(torch.Tensor(self.input_size, self.hidden_size))

        # W_hi is the weight matrix of h to the I gate
        self.W_hi: torch.Tensor = nn.Parameter(torch.Tensor(self.hidden_size, self.hidden_size))

        # self.b_i is the bias to the I gate
        self.b_i: torch.Tensor = nn.Parameter(torch.Tensor( self.hidden_size))


        # Forget gate (F):
        # F_t = sigma(X_t.W_xf + H_t-1.W_hf + b_f)
        # Where:
        # X_t is X(the sequence) at time t
        # H_t-1 is H(the hidden state) at time t-1
        # W_xf is the weight matrix of X to F gate
        self.W_xf: nn.Parameter = nn.Parameter(torch.Tensor(self.input_size, self.hidden_size))

        # W_hf is the weight matrix of h to the F gate
        self.W_hf: torch.Tensor = nn.Parameter(torch.Tensor(self.hidden_size, self.hidden_size))

        # self.b_f is the bias to the F gate
        self.b_f: torch.Tensor = nn.Parameter(torch.Tensor( self.hidden_size))


        # Output gate (O):
        # O_t = sigma(X_t.W_xo + H_t-1.W_ho + b_o)
        # Where:
        # X_t is X(the sequence) at time t
        # H_t-1 is H(the hidden state) at time t-1
        # W_xo is the weight matrix of X to O gate
        self.W_xo: nn.Parameter = nn.Parameter(torch.Tensor(self.input_size, self.hidden_size))

        # W_ho is the weight matrix of h to the O gate
        self.W_ho: torch.Tensor = nn.Parameter(torch.Tensor(self.hidden_size, self.hidden_size))

        # self.b_o is the bias to the O gate
        self.b_o: torch.Tensor = nn.Parameter(torch.Tensor( self.hidden_size))

        
        # Cell (C):
        # C_t = sigma(X_t.W_xc + H_t-1.W_hc + b_c)
        # Where:
        # X_t is X(the sequence) at time t
        # H_t-1 is H(the hidden state) at time t-1
        # W_xc is the weight matrix of X to C cell
        self.W_xc: nn.Parameter = nn.Parameter(torch.Tensor(self.input_size, self.hidden_size))

        # W_ho is the weight matrix of h to the C cell
        self.W_hc: torch.Tensor = nn.Parameter(torch.Tensor(self.hidden_size, self.hidden_size))

        # self.b_o is the bias to the C cell
        self.b_c: torch.Tensor = nn.Parameter(torch.Tensor( self.hidden_size))

        # Initializes all weights 
        self.initialize_weights()

    def initialize_weights(self):
        stdev: float = 1.0 / math.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdev, stdev)

    def forward(self, X: torch.Tensor, states: torch.Tensor = None):
        print(f"X shape: {X.shape}")
        """
        assumes x.shape represents (batch_size, sequence_size, embedding_dimension)
        """
        bs, sequence_size, input_size = X.size()

        if states is None:
            H_t, C_t = (
                torch.zeros(bs, hidden_size).to(device=X.device),
                torch.zeros(bs, hidden_size).to(device=X.device)
            )
        else:
            H_t, C_t = states

        outputs = []
        for t in range(sequence_size):
            x = X[:, t, :]
            # I is the input gate
            I_t = torch.sigmoid(torch.matmul(x, self.W_xi) + torch.matmul(H_t, self.W_hi) + self.b_i)

            # F is the forget state
            F_t = torch.sigmoid(torch.matmul(x, self.W_xf) + torch.matmul(H_t, self.W_hf) + self.b_f)

            # O is the output state
            O_t = torch.sigmoid(torch.matmul(x, self.W_xo) + torch.matmul(H_t, self.W_ho) + self.b_o)

            # C_t, the memory (C)ell is:
            # C_t = F(.)C_t-1 + I_t(.)C_temp
            # C_temp = tanh(X_t.W_xc + H_t-1.W_hc + b_c)
            C_temp = torch.tanh(torch.matmul(x, self.W_xc) + torch.matmul(H_t, self.W_hc) + self.b_c)
            C_t = F_t * C_t + I_t * C_temp
            H_t = O_t * torch.tanh(C_t)
            outputs.append(H_t)
        return outputs, (H_t, C_t)


In [3]:
seq_size: int = 4 # the size of the sequence
hidden_size: int = 32 # the size of the internal states
batch_sz: int = 2
input_size: int = 6 # size of the embedding dimension
lstm = NaiveLSTM(input_size, hidden_size)

In [4]:
input: torch.Tensor = torch.ones((batch_sz, seq_size, input_size))
input

tensor([[[1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1.]],

        [[1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1.]]])

In [5]:
outputs, states = lstm(input)
print(outputs[0].shape)
assert len(outputs) == seq_size

X shape: torch.Size([2, 4, 6])
torch.Size([2, 32])


In [6]:
# Gets the verdict
url: str = "https://www.gutenberg.org/ebooks/67237.txt.utf-8"
urllib.request.urlretrieve(url, "books/the-verdict.txt")


('books/the-verdict.txt', <http.client.HTTPMessage at 0x75cfa693d580>)

In [7]:
raw_text: str = None
with open("books/the-verdict.txt", "r", encoding="utf-8") as file:
    raw_text = file.read()

print(f"Total number of characters: {len(raw_text)}")
print(raw_text[: 50])

Total number of characters: 357726
﻿The Project Gutenberg eBook of An open verdict
  


In [8]:
tokenizer = tiktoken.get_encoding("gpt2")

In [9]:
tokens = tokenizer.encode(raw_text, allowed_special={"<|endoftext|>"})
print(f"Number of tokens: {len(tokens)}")

Number of tokens: 97421


In [10]:
context_size: int = 4
for i in range(1, context_size+1):
    context: int = tokens[: i]
    desired: int = tokens[i]
    print(tokenizer.decode(context), "---->", tokenizer.decode([desired]))

� ----> �
� ----> �
﻿ ----> The
﻿The ---->  Project


In [11]:
class GPTDatasetV1(Dataset):
    def __init__(self, text, tokenizer, max_length, stride):
        self.input_ids = []
        self.target_ids = []

        token_ids = tokenizer.encode(text)

        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)
    
    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]

In [12]:
def create_dataloader_v1(txt, batch_size=4, max_length=256,
    stride=128, shuffle=True, drop_last=True,
    num_workers=0):

    tokenizer = tiktoken.get_encoding("gpt2")
    dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)
    
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers)
    
    return dataloader

In [13]:
dataloader = create_dataloader_v1(
    txt=raw_text,
    batch_size=2,
    max_length=4,
    stride=1,
    shuffle=False,
)

data_iter = iter(dataloader)
first_batch = next(data_iter)
print(first_batch)
second_batch = next(data_iter)
print(second_batch)

[tensor([[ 171,  119,  123,  464],
        [ 119,  123,  464, 4935]]), tensor([[  119,   123,   464,  4935],
        [  123,   464,  4935, 20336]])]
[tensor([[  123,   464,  4935, 20336],
        [  464,  4935, 20336, 46566]]), tensor([[  464,  4935, 20336, 46566],
        [ 4935, 20336, 46566,   286]])]


In [14]:
torch.manual_seed(123)

<torch._C.Generator at 0x75cfd0115d90>

In [24]:
input_size: int = 50257
output_dim: int = 6
token_embedding_layer = nn.Embedding(num_embeddings=input_size, embedding_dim=output_dim)
input_tensor: torch.Tensor = torch.tensor([[171, 119, 123, 464],[119, 123, 464, 4935]])
print(f"Input tensor shape: {input_tensor.shape}")

output_embedding_layer: torch.Tensor = token_embedding_layer(input_tensor)
print(f"Output embedding layer shape: {output_embedding_layer.shape}")

Input tensor shape: torch.Size([2, 4])
Output embedding layer shape: torch.Size([2, 4, 6])


In [25]:
max_length: int = 4
dataloader = create_dataloader_v1(
    txt=raw_text,
    batch_size=2,
    max_length=max_length,
    stride=1,
    shuffle=False,
)

data_iter = iter(dataloader)
inputs, targets = next(data_iter)
print("Inputs shape:", inputs.shape)
print(f"Targets shape: {targets.shape}")

token_embeddings = token_embedding_layer(inputs)
print(f"Outout token embeddings shape shape: {token_embeddings.shape}")

Inputs shape: torch.Size([2, 4])
Targets shape: torch.Size([2, 4])
Outout token embeddings shape shape: torch.Size([2, 4, 6])


In [26]:
context_length = max_length
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)
pos_embeddings = pos_embedding_layer(torch.arange(context_length))
print(pos_embeddings.shape)

torch.Size([4, 6])


In [27]:
input_embeddings = token_embeddings + pos_embeddings
print(input_embeddings.shape)

torch.Size([2, 4, 6])


In [28]:
embedding_dim: int = 6
model = NaiveLSTM(input_size=embedding_dim, hidden_size=output_dim)
model.eval()

NaiveLSTM()

In [29]:
def generate_text_simple(model,
                         idx: torch.Tensor,
                         max_new_tokens,
                         context_size):
    
    print(f"IDX shape: {idx.shape}")
    token_embeddings = token_embedding_layer(idx)
    
    for _ in range(max_new_tokens):
        # idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(token_embeddings)

        logits = logits[:, -1, :]
        probas = torch.softmax(logits, dim=-1)
        idx_next = torch.argmax(probas, dim=-1, keepdim=True)
        idx = torch.cat((idx, idx_next), dim=1)

    return idx

In [30]:
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)
    return encoded_tensor

def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

In [31]:
start_context = "Every effort moves you"
tokenizer = tiktoken.get_encoding("gpt2")

In [32]:
token_ids = generate_text_simple(
    model=model,
    idx=text_to_token_ids(start_context, tokenizer),
    max_new_tokens=10,
    context_size=256
)
# print("Output text:\n", token_ids_to_text(token_ids, tokenizer))

IDX shape: torch.Size([1, 4])
X shape: torch.Size([1, 4, 6])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x32 and 6x6)