# Chapter 12: A Language model from scratch

In [None]:
import requests
import gzip
import pandas as pd

# URL of the gzipped text file
url = "https://github.com/lsb/human-numbers/blob/trunk/one-hundred-thousand-numbers.txt.gz?raw=true"

# Downloading the file using requests
response = requests.get(url)
response.raise_for_status()  # This will raise an error if the download failed

# Unzipping the content
content = gzip.decompress(response.content).decode('utf-8')

# Since the file contains numbers, each number on a new line, we can split the content into a list
numbers = content.splitlines()

In [None]:
text = ' '.join(x for x in numbers)
tokens = text.split(' ')

tokens[:10]

In [None]:
vocab = sorted(list(set(tokens)))
len(vocab)

In [None]:
# token to numbers
word2idx = {w:i for i, w in enumerate(vocab)}
nums = [word2idx[i] for i in tokens]
nums[:10]

In [None]:
for idx in nums[:25]:
    print(idx, vocab[idx])

### dataset prep

In [None]:
dummy_tokens = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k']  # Example list

for i in range(0, len(dummy_tokens) - 3, 1):
    three_tokens = dummy_tokens[i:i+3]  # Get a slice of three tokens
    next_token = dummy_tokens[i+3]      # Get the token immediately following the slice
    print(f"Three tokens: {three_tokens}, Next token: {next_token}, Step: {i}")

In [None]:
### predict next token based on three previous tokens; book uses step size of 3 which has no overlap, i prefer 1
[(tokens[i:i+3], tokens[i+3]) for i in range(0, len(tokens)-3, 1)][2000:2020] # change from 4-2 to 3-1

In [None]:
import torch

# Assuming 'mps_device' is defined as your MPS device
mps_device = torch.device('mps')

In [None]:
# create dataset karpathy style
xs = []
ys = []
for i in range(0, len(tokens) - 3, 1):
    three_tokens = torch.tensor(nums[i:i+3])  # Get a slice of three tokens
    next_token = torch.tensor(nums[i+3])      # Get the token immediately following the slice
    xs.append(three_tokens)
    ys.append(next_token)

In [None]:
import torch
import torch.nn as nn 
import torch.nn.functional as F 

class LLMModel1(nn.Module):
    def __init__(self, vocab_size, n_hidden):
        super(LLMModel1, self).__init__()  # Initialize the superclass
        self.i_h = nn.Embedding(vocab_size, n_hidden) #vocab to hidden
        self.h_h = nn.Linear(n_hidden, n_hidden) # hidden to hidden
        self.h_o = nn.Linear(n_hidden, vocab_size) # hidden to vocab (logits)
        
    def forward(self, x):
        """hidden states are accumulated. subsequent hidden state is added to embedding of next token before being passed through next linear layer and ReLU"""
        # create first hidden state from first word
        # embed --> linear --> relu
        h = F.relu(self.h_h(self.i_h(x[:, 0])))

        # second hidden state from second word
        h = h + self.i_h(x[:, 1])
        h = F.relu(self.h_h(h))

        # hidden state from third word
        h = h + self.i_h(x[:, 2])
        h = F.relu(self.h_h(h))
        return self.h_o(h)

In [None]:
from torch.utils.data import TensorDataset, DataLoader

# Assuming X and Y are your data tensors
X = torch.stack(xs)
Y = torch.stack(ys)
dataset = TensorDataset(X, Y)

# Calculate the sizes of splits
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

# Split the dataset (this method shuffles the data)
# train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Split the dataset without shuffling
train_dataset = TensorDataset(X[:train_size], Y[:train_size])
val_dataset = TensorDataset(X[train_size:], Y[train_size:])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)


In [None]:
from tqdm import tqdm

def train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs, device):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        total_loss = 0
        correct = 0
        total = 0
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Train]')
        for batch in progress_bar:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # Update progress bar
            avg_loss = total_loss / total
            accuracy = 100 * correct / total
            progress_bar.set_postfix(loss=avg_loss, accuracy=f'{accuracy:.2f}%')

        train_losses.append(total_loss / len(train_loader))
        train_accuracies.append(100 * correct / total)

        # Validation phase
        model.eval()
        total_loss = 0
        correct = 0
        total = 0
        progress_bar = tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Validation]')
        with torch.no_grad():
            for batch in progress_bar:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                total_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                # Update progress bar
                avg_loss = total_loss / total
                accuracy = 100 * correct / total
                progress_bar.set_postfix(loss=avg_loss, accuracy=f'{accuracy:.2f}%')

        val_losses.append(total_loss / len(val_loader))
        val_accuracies.append(100 * correct / total)

    return train_losses, val_losses, train_accuracies, val_accuracies


In [None]:
vocab_size = len(vocab)
n_hidden = 64
model = LLMModel1(vocab_size, n_hidden)
model.to(mps_device)  # Move model to MPS device

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

num_epochs = 3
device = mps_device

train_losses, val_losses, train_accuracies, val_accuracies = train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs, device)

In [None]:
import matplotlib.pyplot as plt

def plot_training_history(train_losses, val_losses, train_accuracies=None, val_accuracies=None):
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(12, 4))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, 'bo-', label='Training loss')
    plt.plot(epochs, val_losses, 'ro-', label='Validation loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Plot accuracy if provided
    if train_accuracies and val_accuracies:
        plt.subplot(1, 2, 2)
        plt.plot(epochs, train_accuracies, 'bo-', label='Training accuracy')
        plt.plot(epochs, val_accuracies, 'ro-', label='Validation accuracy')
        plt.title('Training and Validation Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()

    plt.show()


In [None]:
plot_training_history(train_losses, val_losses, train_accuracies, val_accuracies)