In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import random
import numpy as np
%matplotlib inline

In [None]:
with open("../data/names.txt", "r") as file:
    words = file.read().splitlines()

words[:8]

In [None]:
chars = sorted(list(set("".join(words))))
stoi = {s:i+1 for i,s in enumerate(chars)}
stoi['.'] = 0
itos = {i:s for s,i in stoi.items()}
print(stoi)
print(itos)
vocab_size = len(stoi)
print("vocab_size: ", vocab_size)

In [None]:
block_size = 3

X, Y = [], []

for w in words[:3]:
    context = [0] * block_size
    for ch in w + '.':
        ix = stoi[ch]
        X.append(context)
        Y.append(ix)
        print("".join(itos[i] for i in context), "--->", itos[ix])
        context = context[1:] + [ix]

In [None]:
# build the dataset
block_size = 3

def build_dataset(words):
    X, Y = [], []
    for w in words:
        context = [0] * block_size
        for ch in w + '.':
            ix = stoi[ch]
            X.append(context)
            Y.append(ix)
            context = context[1:] + [ix]

    X = torch.tensor(X)
    Y = torch.tensor(Y)
    print(X.shape, X.dtype, Y.shape, Y.dtype)
    return X, Y

random.seed(42)
random.shuffle(words)

n1 = int(0.8 * len(words))
n2 = int(0.9 * len(words))

Xtr, Ytr = build_dataset(words[:n1])
Xdev, Ydev = build_dataset(words[n1:n2])
Xte, Yte = build_dataset(words[n2:])


In [None]:
for i in range(7):
    print("Input: ", Xtr[i].tolist()," Target: ", Ytr[i].item())
    print("Input: ", "".join((itos[w]) for w in Xtr[i].tolist()), " Target: ", itos[Ytr[i].item()])


In [None]:

class MLP(nn.Module):
    def __init__(self, vocab_size, n_embd, block_size, n_hidden, use_norm_layer=True):
        super(MLP, self).__init__()
        # Create the embedding table
        self.embedding_table = nn.Embedding(vocab_size, n_embd)
        
        # Define the sequence of layers
        layers = [
            nn.Linear(n_embd * block_size, n_hidden),
            nn.LayerNorm(n_hidden) if use_norm_layer else nn.Identity(),
            nn.Tanh(),
            nn.Linear(n_hidden, n_hidden),
            nn.LayerNorm(n_hidden) if use_norm_layer else nn.Identity(),
            nn.Tanh(),
            nn.Linear(n_hidden, n_hidden),
            nn.LayerNorm(n_hidden) if use_norm_layer else nn.Identity(),
            nn.Tanh(),
            nn.Linear(n_hidden, n_hidden),
            nn.LayerNorm(n_hidden) if use_norm_layer else nn.Identity(),
            nn.Tanh(),
            nn.Linear(n_hidden, n_hidden),
            nn.LayerNorm(n_hidden) if use_norm_layer else nn.Identity(),
            nn.Tanh(),
            nn.Linear(n_hidden, vocab_size),
            nn.LayerNorm(vocab_size) if use_norm_layer else nn.Identity()
        ]
        
        # Create a sequential container
        self.layers = nn.Sequential(*layers)
        
    def forward(self, x):
        # Forward pass through the embedding table
        x = self.embedding_table(x)  # shape: (batch_size, block_size, n_embd)
        # Flatten the embeddings
        x = x.view(x.size(0), -1)  # shape: (batch_size, block_size * n_embd)
        # Forward pass through the MLP layers
        x = self.layers(x)  # shape: (batch_size, vocab_size)
        return x

# Define parameters
vocab_size = 27      # as per your requirement
n_embd = 10          # you can choose this value as needed
block_size = 4       # number of input characters
n_hidden = 200       # size of the hidden layer
use_norm_layer = True  # flag to use normalization layer
lr_init = 0.01

# Check for CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Create an instance of the MLP class
model = MLP(vocab_size, n_embd, block_size, n_hidden, use_norm_layer).to(device)

# Define the optimizer
optimizer = optim.AdamW(model.parameters(), lr=lr_init, weight_decay=0.01, betas=(0.9, 0.999), eps=1e-08)

# Make datasets
Xtr, Ytr = build_dataset(words[:n1])
Xdev, Ydev = build_dataset(words[n1:n2])
Xte, Yte = build_dataset(words[n2:])

In [None]:
import plotly.graph_objects as go

# Parameters for the learning rate finder
lr_min = 1e-7
lr_max = 1e-1
num_iters = 1000
batch_size = 32
log_lrs = []
losses = []

# Define a function to test different learning rates
def find_lr(model, Xtr, Ytr, lr_min, lr_max, num_iters, batch_size):
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr_min)
    lr_lambda = lambda x: (lr_max / lr_min) ** (x / num_iters)
    scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
    
    for i in range(num_iters):
        # Minibatch
        ix = torch.randint(0, Xtr.shape[0], (batch_size,), device=device)
        Xb, Yb = Xtr[ix], Ytr[ix]

        # Forward pass
        optimizer.zero_grad()
        logits = model(Xb)
        loss = F.cross_entropy(logits, Yb)
        
        # Backward pass
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        # Track learning rates and losses
        log_lrs.append(np.log10(optimizer.param_groups[0]['lr']))
        losses.append(loss.item())
        
        # Print progress
        if (i + 1) % 100 == 0:
            print(f"Step {i + 1}/{num_iters}, Loss: {loss.item()}, LR: {optimizer.param_groups[0]['lr']}")

# Create an instance of the MLP class
model = MLP(vocab_size, n_embd, block_size, n_hidden, use_norm_layer).to(device)

# Move datasets to the appropriate device
Xtr, Ytr = Xtr.to(device), Ytr.to(device)

# Run the learning rate finder
find_lr(model, Xtr, Ytr, lr_min, lr_max, num_iters, batch_size)

# Plot the learning rate finder results with Plotly
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=log_lrs,
    y=losses,
    mode='lines+markers',
    name='Loss'
))

fig.update_layout(
    title='Learning Rate Finder',
    xaxis_title='Learning Rate (log scale)',
    yaxis_title='Loss',
    hovermode='x'
)

fig.show()


In [None]:
max_steps = 400000
batch_size = 32
lossi = []

Xtr, Ytr = Xtr.to(device), Ytr.to(device)

for i in range(max_steps):
    # Minibatch
    ix = torch.randint(0, Xtr.shape[0], (batch_size,), device=device)
    Xb, Yb = Xtr[ix], Ytr[ix]

    # Forward pass
    optimizer.zero_grad()
    logits = model(Xb)
    loss = F.cross_entropy(logits, Yb)
    
    # Backward pass
    loss.backward()
    
    # Update
    if i == int(0.67 * max_steps):
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr_init / 10
    optimizer.step()
    
    # Track stats
    lossi.append(loss.item())
    if i % 5000 == 0:
        mean_loss = sum(lossi[-100:]) / 100
        print(f"{i:7d} / {max_steps:7d}: Mean Loss: {mean_loss:.4f}")

    # if i > 10000:
    #     break

In [None]:
# Function to calculate moving average
def moving_average(values, window):
    moving_avgs = []
    for i in range(len(values)):
        if i < window:
            moving_avgs.append(sum(values[:i+1]) / (i+1))
        else:
            moving_avgs.append(sum(values[i-window+1:i+1]) / window)
    return moving_avgs

# Calculate moving average with window size 100
smoothed_loss = moving_average(lossi, 100)

# Plotting the smoothed loss curve
plt.plot(smoothed_loss)
plt.xlabel('Steps')
plt.ylabel('Loss')
plt.title('Training Loss Curve')
plt.show()


In [None]:
Xdev, Ydev = Xdev.to(device), Ydev.to(device)

@torch.no_grad()
def split_loss(split):
    x,y = {
        "train": (Xtr, Ytr),
        "val": (Xdev, Ydev),
        "test": (Xte, Yte)
    }[split]
    x, y = x.to(device=device), y.to(device=device)
    # forward pass
    logits = model(x)
    loss = F.cross_entropy(logits, y)


    # # -------------
    # emb = C[x]
    # embcat = emb.view(emb.shape[0], -1)
    # h = torch.tanh(embcat @ W1 + b1)  
    # logits = h @ W2 + b2                                           
    # loss = F.cross_entropy(logits, y)
    print(f"{split}: {loss.item():.4f}")

split_loss("train")
split_loss("val")    

In [None]:
if n_embd == 2:
    plt.figure(figsize=(8,8))
    plt.scatter(C[:,0].data, C[:,1].data, s=200)
    for i in range(C.shape[0]):
        plt.text(C[i,0].item(), C[i,1].item(), itos[i], ha="center", va="center", color="white")
    plt.grid("minor");

In [None]:
# Sampling function
with torch.no_grad():
    for _ in range(20):
        model.eval()
        out = []
        context = [0] * block_size
        while True:
            x = model(torch.tensor([context]).to(device=device))
            probs = F.softmax(x, dim=1)
            ix = torch.multinomial(probs, num_samples=1).item()
            context = context[1:] + [ix]
            out.append(ix)
            if ix == 0:
                break
        print("".join(itos[i] for i in out))