In [1]:
# Props to this sensei
# https://www.youtube.com/watch?v=kCc8FmEb1nY&list=PLAqhIrjkxbuWI23v9cThsA9GvCAUhRvKZ&index=8

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm # progress bar

## Hyper-parameters

In [3]:
batch_size = 64
text_file = "tiny-shakespeare.txt"



## Reading Data

In [4]:
# read file
with open(text_file, "r") as f:
    text = f.read()
text[:100]

'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'

In [5]:
char_list = sorted(list(set(text)))
char_size = len(char_list)
print(f"All the characters in the text: {''.join(char_list)}")
print(f"Length of the characters: {char_size}")

All the characters in the text: 
 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
Length of the characters: 65


## Tokenizer (character based, index/ascii)

In [6]:
class MyTokenizer:
    def __init__(self):
        self.char_to_index = None
        self.index_to_char = None

    def fit(self, char_list):  
        self.char_to_index = {char: idx for idx, char in enumerate(char_list)}
        self.index_to_char = {idx: char for char, idx in self.char_to_index.items()}

    def encode_index(self, input_str):
        return [self.char_to_index[char] for char in input_str]

    def decode_index(self, encoded_list):
        return ''.join([self.index_to_char[idx] for idx in encoded_list])

    @staticmethod
    def ascii_tokenizer(char):
        return ord(char)

    @staticmethod
    def ascii_decoder(ascii_value):
        return chr(ascii_value)

    def encode_combined(self, input_str, use_ascii=False):
        if use_ascii:
            return [self.ascii_tokenizer(char) for char in input_str]
        else:
            return self.encode_index(input_str)

    def decode_combined(self, encoded_list, use_ascii=False):
        if use_ascii:
            return ''.join([self.ascii_decoder(ascii_value) for ascii_value in encoded_list])
        else:
            return self.decode_index(encoded_list)

In [7]:
# Example usage:
tokenizer = MyTokenizer()
tokenizer.fit(char_list)

input_str = "Hello there"
encoded_list_ascii = tokenizer.encode_combined(input_str, use_ascii=True)
decoded_str_ascii = tokenizer.decode_combined(encoded_list_ascii, use_ascii=True)

encoded_list_index = tokenizer.encode_combined(input_str, use_ascii=True)
decoded_str_index = tokenizer.decode_combined(encoded_list_index, use_ascii=True)

print("Original String:", input_str)
print("Encoded List (ASCII):", encoded_list_ascii)
print("Decoded String (ASCII):", decoded_str_ascii)

print("Encoded List (Index):", encoded_list_index)
print("Decoded String (Index):", decoded_str_index)


Original String: Hello there
Encoded List (ASCII): [72, 101, 108, 108, 111, 32, 116, 104, 101, 114, 101]
Decoded String (ASCII): Hello there
Encoded List (Index): [72, 101, 108, 108, 111, 32, 116, 104, 101, 114, 101]
Decoded String (Index): Hello there


In [8]:
# Encode all the data 
encoded_data = tokenizer.encode_combined(text) 
encoded_data[:10]

[18, 47, 56, 57, 58, 1, 15, 47, 58, 47]

## Data Loader

In [9]:
data = torch.tensor(encoded_data)
data.shape[0]

1115393

In [10]:
class MyDataset(Dataset):
    def __init__(self, encoded_data):
        self.encoded_data = encoded_data

    def __len__(self):
        return len(self.encoded_data)

    def __getitem__(self, idx):
        return self.encoded_data[idx]
        
dataset = MyDataset(data)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# printing the first batch
for batch in dataloader: 
    print(batch)
    break

tensor([50,  1, 54, 39, 56, 14,  1, 30,  1,  1, 53, 31, 58, 53, 57,  1, 61,  6,
        10, 56, 26, 50, 59,  1,  1, 56, 61, 57,  1, 53, 57, 56, 43, 43, 39, 21,
        54,  8,  1, 47, 49, 41, 58, 57,  2, 16, 46, 57, 23, 47,  1, 58, 63, 53,
        47, 40, 52, 53,  6,  6, 57, 46, 54, 51])


## GPT and language models
https://github.com/iVishalr/GPT/tree/main

In [11]:
# https://medium.com/@mingzehe/implement-transformer-via-pytorch-step-by-step-part-2-69f020d580c6

#attention 
def attention(k,q,v):
    # q dim [batch_size,n_heads,length,d_tensor]
    d_tensor = q.size(-1) 
    # assume dim of query/key/value vector should be same 
    # and it should be to make below calculation happen      
    k_t = k.transpose(-2,-1) #[batch_size,n_heads,d_tensor,length]
    score = (q @ k_t)/math.sqrt(d_tensor)
    v= torch.softmax(score,dim=-1) @ v
    return v,score

import copy
def clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
  # reduced dim for each Q,K,V, but added up to d_model
        self.d_k = d_model // n_head 
        self.n_head = n_head
        self.attn = None
  # use the attention class defined above
        self.attention = attention() 

  # 3 for K,Q,V, the forth layer is on the top for final attention score
        self.linears = clones(nn.Linear(d_model, d_model), 4) 

    def forward(self, q, k, v):
        samples = q.size(0) #q init as 512x512
    # split tensor by number of heads
        q, k, v = [   lin(x).view(samples, -1, self.n_head, self.d_k).transpose(1, 2)
    # [512,512] => [512,1,8,64] => [512,8,1,64] now we have 8 heads, 
    #length 1 since conv of size 1, dim of 64 for each q,k,v, 
    #ready for input to attention [batch_size, head, length, d_tensor]
            for lin, x in zip(self.linears, (q, k, v)) 
    # we only used 3 first linear layers since zip would 
        ]
        
    # calculate the attention score 
        x, self.attn = attention(q, k, v)

    # concat by view func [512, 8, 1, 64] => [512,1,512] add it back to 512
        x = (x.transpose(1, 2).contiguous().view(samples, -1, self.n_head * self.d_k))
    # now apply the final linear layer copy
        return self.linears[-1](x) 
   

class EncoderLayer(nn.Module):
    def __init__(self,n_head,d_model,hidden):
        super(Encoder_layer, self).__init__()
        self.norm = nn.LayerNorm(layer.size)
        self.attention_layer= MultiHeadAttention(d_model, n_head)
        self.feed_forward_layer= FeedForwardLayer(d_model, hidden)

    def forward(self, x):
        # we make a copy for later residue adding
        _x = x
        # use multi-head attention we defined in part 1
        atten = self.attention_layer(x)
        # add residue and normalize layer
        _atten = _x + self.norm(atten)
        # feed forward layer which we will define later 
        x = self.feed_forward_layer(x)
        return self.norm(x)+_atten

class FeedForwardLayer(nn.Module):
    def __init__(self, d_model, hidden):
        super(FeedForwardLayer, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        return x

class Encoder(nn.Module):
    def __init__(self, d_model, hidden, n_head, n_copy):
        super().__init__()
        # n_copy = 6 
        self.layers = clones(EncoderLayer(d_model,hidden,n_head), n_copy)

    def forward(self, x):
        x = layer(x)
        return x


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super(PositionalEncoding, self).__init__()
        # init as 515x512 matrix to make adding pos with input possible
        self.encoding = torch.zeros(max_len, d_model)
        # produce 0 to 511 pos index 
        pos = torch.arange(0, max_len)
        # convert to 512x1 size
        pos = pos.float().unsqueeze(dim=1)
        # pick 0,2,4...etc 256 even numbers, 
        # _2i refers to the index in above formula
        _2i = torch.arange(0, d_model, step=2).float()
        # pos index (512,1) divide by _2i (256)
        # broadcasting to (512,256), so every even column apply sin func
        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        # odd column go through cos func
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))
        
    def forward(self, x):
        batch_size, seq_len = x.size() 
        #now to apply encoding
        return self.encoding[:seq_len, :]
        


In [12]:
class GPTModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_heads, num_layers):
        super(GPTModel, self).__init__()

        # Token embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # Positional encoding
        self.positional_encoding = self.create_positional_encoding(embedding_dim)

        # Transformer layers
        self.transformer_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=hidden_dim)
            for _ in range(num_layers)
        ])

        # Fully connected layer for prediction
        self.fc = nn.Linear(embedding_dim, vocab_size)

    def forward(self, x):
        # Token embedding
        embedded = self.embedding(x)

        # Add positional encoding
        positional_encoded = embedded + self.positional_encoding[:embedded.size(0), :]

        # Transformer layers
        transformer_output = positional_encoded
        for layer in self.transformer_layers:
            transformer_output = layer(transformer_output)

        # Fully connected layer for prediction
        output = self.fc(transformer_output)

        return output

    def create_positional_encoding(self, d_model, max_len=512):
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(torch.log(torch.tensor(10000.0)) / d_model))
        positional_encoding = torch.zeros((max_len, d_model))
        positional_encoding[:, 0::2] = torch.sin(position * div_term)
        positional_encoding[:, 1::2] = torch.cos(position * div_term)
        return positional_encoding

# Example usage:
vocab_size = 10000  # replace with your vocabulary size
embedding_dim = 256
hidden_dim = 512
num_heads = 8
num_layers = 6

model = GPTModel(vocab_size, embedding_dim, hidden_dim, num_heads, num_layers)

# Print the model architecture
print(model)


GPTModel(
  (embedding): Embedding(10000, 256)
  (transformer_layers): ModuleList(
    (0-5): 6 x TransformerEncoderLayer(
      (self_attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
      )
      (linear1): Linear(in_features=256, out_features=512, bias=True)
      (dropout): Dropout(p=0.1, inplace=False)
      (linear2): Linear(in_features=512, out_features=256, bias=True)
      (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (dropout1): Dropout(p=0.1, inplace=False)
      (dropout2): Dropout(p=0.1, inplace=False)
    )
  )
  (fc): Linear(in_features=256, out_features=10000, bias=True)
)


In [19]:

training_data = ["This is an example sentence.", "Another example here."]
training_data = text[:1000]

# Tokenize the dataset
# tokenized_data = encoded_data

tokenized_data = tokenizer.encode_combined(training_data) 

# Create an instance of the GPT model
model = GPTModel(vocab_size, embedding_dim, hidden_dim, num_heads, num_layers)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    # Convert tokenized_text to a PyTorch tensor
    input_tensor = torch.tensor(tokenized_data[:-1]).unsqueeze(0)  # Input sequence (excluding the last token)
    target_tensor = torch.tensor(tokenized_data[1:])  # Target sequence (excluding the first token)

    # Forward pass
    output_logits = model(input_tensor)

    # Calculate loss
    loss = criterion(output_logits.view(-1, vocab_size), target_tensor)

    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    average_loss = total_loss / len(tokenized_data)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

    print(f"Epoch {epoch + 1}, Loss: {average_loss:.4f}")


Epoch [1/10], Loss: 9.4068
Epoch 1, Loss: 0.0000
Epoch [2/10], Loss: 7.5305
Epoch 2, Loss: 0.0000
Epoch [3/10], Loss: 6.8457
Epoch 3, Loss: 0.0000
Epoch [4/10], Loss: 6.4180
Epoch 4, Loss: 0.0000
Epoch [5/10], Loss: 5.9814
Epoch 5, Loss: 0.0000
Epoch [6/10], Loss: 5.5259
Epoch 6, Loss: 0.0000
Epoch [7/10], Loss: 5.1399
Epoch 7, Loss: 0.0000
Epoch [8/10], Loss: 4.7227
Epoch 8, Loss: 0.0000
Epoch [9/10], Loss: 4.3529
Epoch 9, Loss: 0.0000
Epoch [10/10], Loss: 4.0297
Epoch 10, Loss: 0.0000


In [27]:
print(training_data)

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hunger for bread, not in thirst for revenge.



In [26]:
# Inference after training
model.eval()
with torch.no_grad():
    # Generate text from a seed input
    seed_text = ''
    seed_tokenized = tokenizer.encode_combined(seed_text)
    input_tensor = torch.tensor(seed_tokenized).unsqueeze(0)
    generated_indices = torch.argmax(model(input_tensor), dim=-1).squeeze().tolist()
    generated_text = tokenizer.decode_combined(generated_indices) 

    print("Generated Text:", ''.join(generated_text))

Generated Text: 
tr  r  


In [None]:
class SimpleLanguageModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers):
        super(SimpleLanguageModel, self).__init__()
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # LSTM layers
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        # Output layer
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, hidden):
        # Embedding layer
        embedded = self.embedding(x)
        # LSTM layers
        output, hidden = self.lstm(embedded, hidden)
        # Output layer
        output = self.fc(output)
        return output, hidden

# Example usage:
# Set your vocabulary size, embedding dimension, hidden dimension, and number of LSTM layers
vocab_size = 100  # replace with the actual size of your vocabulary
embedding_dim = 64
hidden_dim = 128
num_layers = 2

# Create an instance of the SimpleLanguageModel
model = SimpleLanguageModel(vocab_size, embedding_dim, hidden_dim, num_layers)

# Print the model architecture
print(model)


In [None]:
device = ("cuda" if torch.cuda.is_available() else "mps"
          if torch.backends.mps.is_available() else "cpu")
print(f"Using {device} device")