In [1]:
import torch
import torch.nn as nn

class Embeddings(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model)  # Token embeddings
        self.pos_emb = nn.Embedding(1000, d_model)          # Positional embeddings (max sequence length = 1000)
        self.d_model = d_model

    def forward(self, x):
        # x: (batch_size, seq_length)
        seq_len = x.size(1)
        positions = torch.arange(seq_len, device=x.device).unsqueeze(0)  # (1, seq_length)
        token_emb = self.token_emb(x)  # (batch_size, seq_length, d_model)
        pos_emb = self.pos_emb(positions)  # (1, seq_length, d_model)
        return token_emb + pos_emb  # Add and return

In [2]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.wq = nn.Linear(d_model, d_model)  # Query
        self.wk = nn.Linear(d_model, d_model)  # Key
        self.wv = nn.Linear(d_model, d_model)  # Value
        self.fc = nn.Linear(d_model, d_model)  # Final projection

    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)

        # Linear projections
        q = self.wq(q).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        k = self.wk(k).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        v = self.wv(v).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)

        # Scaled dot-product attention
        scores = torch.matmul(q, k.transpose(-2, -1)) / (self.head_dim ** 0.5)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attn = torch.softmax(scores, dim=-1)
        out = torch.matmul(attn, v)  # (batch_size, num_heads, seq_length, head_dim)

        # Concatenate heads and project
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.fc(out)

In [3]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        return self.fc2(self.dropout(torch.relu(self.fc1(x))))

In [4]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        # Self-attention with residual
        attn_out = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_out))
        # Feed-forward with residual
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))
        return x

In [5]:
class MiniGPT(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_len):
        super().__init__()
        self.embeddings = Embeddings(vocab_size, d_model)
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.fc = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.embeddings(x)  # (batch_size, seq_length, d_model)
        for layer in self.layers:
            x = layer(x, mask)
        return self.fc(x)  # (batch_size, seq_length, vocab_size)

In [6]:
import json

# Load vocabulary
with open("/home/itachi/Mini-GPT/data/processed/vocab.json", "r") as f:
    vocab = json.load(f)
    char_to_idx = vocab["char_to_idx"]  # Now this is defined!

# Hyperparameters
vocab_size = len(char_to_idx)  # Correctly set using the loaded vocabulary
d_model = 128                  
num_layers = 4                 
num_heads = 8                  
d_ff = 512                     
dropout = 0.1                  
max_seq_len = 64               

# Initialize model
model = MiniGPT(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_len)
print(model)

MiniGPT(
  (embeddings): Embeddings(
    (token_emb): Embedding(85, 128)
    (pos_emb): Embedding(1000, 128)
  )
  (layers): ModuleList(
    (0-3): 4 x DecoderLayer(
      (self_attn): MultiHeadAttention(
        (wq): Linear(in_features=128, out_features=128, bias=True)
        (wk): Linear(in_features=128, out_features=128, bias=True)
        (wv): Linear(in_features=128, out_features=128, bias=True)
        (fc): Linear(in_features=128, out_features=128, bias=True)
      )
      (ffn): FeedForward(
        (fc1): Linear(in_features=128, out_features=512, bias=True)
        (fc2): Linear(in_features=512, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (fc): Linear(in_features=128, out_features=85, bias=True)
  (dropout): Dropout(p=0.1, inplace=False)
)


In [7]:
def generate_mask(seq_len):
    return torch.triu(torch.ones(seq_len, seq_len) == 1).transpose(0, 1)

# Example
mask = generate_mask(10)
print(mask)

tensor([[ True, False, False, False, False, False, False, False, False, False],
        [ True,  True, False, False, False, False, False, False, False, False],
        [ True,  True,  True, False, False, False, False, False, False, False],
        [ True,  True,  True,  True, False, False, False, False, False, False],
        [ True,  True,  True,  True,  True, False, False, False, False, False],
        [ True,  True,  True,  True,  True,  True, False, False, False, False],
        [ True,  True,  True,  True,  True,  True,  True, False, False, False],
        [ True,  True,  True,  True,  True,  True,  True,  True, False, False],
        [ True,  True,  True,  True,  True,  True,  True,  True,  True, False],
        [ True,  True,  True,  True,  True,  True,  True,  True,  True,  True]])


In [8]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from torch.nn import CrossEntropyLoss
from torch.optim import AdamW
import numpy as np
import os

# Hyperparameters (adjust as needed)
BATCH_SIZE = 32
LEARNING_RATE = 3e-4
EPOCHS = 1
SEQ_LENGTH = 64  # Must match preprocessing sequence length
MODEL_SAVE_PATH = "/home/itachi/Mini-GPT/models/mini_gpt.pth"

# Load preprocessed data
train_data = torch.load("/home/itachi/Mini-GPT/data/processed/train_sequences.pt")
val_data = torch.load("/home/itachi/Mini-GPT/data/processed/val_sequences.pt")

# Create DataLoader for batching
train_dataset = TensorDataset(train_data["inputs"], train_data["targets"])
val_dataset = TensorDataset(val_data["inputs"], val_data["targets"])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

# Initialize model, optimizer, and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MiniGPT(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_len).to(device)
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
loss_fn = CrossEntropyLoss()

# Create causal mask
def generate_mask(seq_len):
    return torch.triu(torch.ones(seq_len, seq_len, device=device) == 1).transpose(0, 1)

mask = generate_mask(SEQ_LENGTH)

# Training loop
for epoch in range(EPOCHS):
    model.train()
    total_train_loss = 0

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        
        # Forward pass
        outputs = model(inputs, mask=mask)
        loss = loss_fn(outputs.view(-1, vocab_size), targets.view(-1))

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Prevent exploding gradients
        optimizer.step()

        total_train_loss += loss.item()

        # Log progress
        if batch_idx % 100 == 0:
            print(f"Epoch {epoch+1}/{EPOCHS} | Batch {batch_idx}/{len(train_loader)} | Loss: {loss.item():.4f}")

    # Validation
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs, mask=mask)
            loss = loss_fn(outputs.view(-1, vocab_size), targets.view(-1))
            total_val_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)
    avg_val_loss = total_val_loss / len(val_loader)
    print(f"Epoch {epoch+1} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

    # Save model checkpoint
    torch.save({
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "loss": avg_val_loss,
    }, MODEL_SAVE_PATH)

print("Training complete!")

Epoch 1/1 | Batch 0/2809 | Loss: 4.7416
Epoch 1/1 | Batch 100/2809 | Loss: 2.5177
Epoch 1/1 | Batch 200/2809 | Loss: 2.3827
Epoch 1/1 | Batch 300/2809 | Loss: 2.2945
Epoch 1/1 | Batch 400/2809 | Loss: 2.1976
Epoch 1/1 | Batch 500/2809 | Loss: 2.2060
Epoch 1/1 | Batch 600/2809 | Loss: 2.0961
Epoch 1/1 | Batch 700/2809 | Loss: 2.0302
Epoch 1/1 | Batch 800/2809 | Loss: 1.9742
Epoch 1/1 | Batch 900/2809 | Loss: 1.9529
Epoch 1/1 | Batch 1000/2809 | Loss: 1.9492
Epoch 1/1 | Batch 1100/2809 | Loss: 1.8726
Epoch 1/1 | Batch 1200/2809 | Loss: 1.7702
Epoch 1/1 | Batch 1300/2809 | Loss: 1.8927
Epoch 1/1 | Batch 1400/2809 | Loss: 1.7962
Epoch 1/1 | Batch 1500/2809 | Loss: 1.7667
Epoch 1/1 | Batch 1600/2809 | Loss: 1.7359
Epoch 1/1 | Batch 1700/2809 | Loss: 1.7419
Epoch 1/1 | Batch 1800/2809 | Loss: 1.7145
Epoch 1/1 | Batch 1900/2809 | Loss: 1.7221
Epoch 1/1 | Batch 2000/2809 | Loss: 1.6721
Epoch 1/1 | Batch 2100/2809 | Loss: 1.6228
Epoch 1/1 | Batch 2200/2809 | Loss: 1.6427
Epoch 1/1 | Batch 2300/

In [13]:
import torch
import json
import numpy as np

# Load vocabulary
with open("/home/itachi/Mini-GPT/data/processed/vocab.json", "r") as f:
    vocab = json.load(f)
    idx_to_char = {int(k): v for k, v in vocab["idx_to_char"].items()}
    char_to_idx = vocab["char_to_idx"]

# Load trained model
MODEL_SAVE_PATH = "/home/itachi/Mini-GPT/models/mini_gpt.pth"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = MiniGPT(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_len).to(device)
checkpoint = torch.load(MODEL_SAVE_PATH, map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
model.eval()

# Generate causal mask
def generate_mask(seq_len):
    return torch.triu(torch.ones(seq_len, seq_len, device=device) == 1).transpose(0, 1)

# Decoding strategies
def greedy_search(model, seed_text, max_length=100):
    input_seq = torch.tensor([char_to_idx[char] for char in seed_text], device=device).unsqueeze(0)
    generated_text = seed_text

    for _ in range(max_length):
        with torch.no_grad():
            output = model(input_seq, mask=generate_mask(input_seq.size(1)))
            next_token = torch.argmax(output[:, -1, :], dim=-1).item()
            generated_text += idx_to_char[next_token]
            input_seq = torch.cat([input_seq, torch.tensor([[next_token]], device=device)], dim=1)

    return generated_text

def top_k_sampling(model, seed_text, max_length=100, k=5):
    input_seq = torch.tensor([char_to_idx[char] for char in seed_text], device=device).unsqueeze(0)
    generated_text = seed_text

    for _ in range(max_length):
        with torch.no_grad():
            output = model(input_seq, mask=generate_mask(input_seq.size(1)))
            probs = torch.softmax(output[:, -1, :], dim=-1)

            # Extract top-k tokens and their probabilities
            top_k_probs, top_k_indices = torch.topk(probs, k)

            # Normalize top-k probabilities to sum to 1
            top_k_probs = top_k_probs / top_k_probs.sum()

            # Convert tensors to numpy
            top_k_probs = top_k_probs.squeeze().cpu().numpy()
            top_k_indices = top_k_indices.squeeze().cpu().numpy()

            # Sample next token
            next_token = np.random.choice(top_k_indices, p=top_k_probs)

            # Append generated character
            generated_text += idx_to_char[next_token]
            input_seq = torch.cat([input_seq, torch.tensor([[next_token]], device=device)], dim=1)

    return generated_text


# Generate poetry
seed_text = "<start>"
print("Greedy Search:")
print(greedy_search(model, seed_text, max_length=200))
print("\nTop-k Sampling (k=5):")
print(top_k_sampling(model, seed_text, max_length=200, k=5))

Greedy Search:
<start> The worst of that which gives thee releasing: My bonds in wanofex t my wenomellle wee wne waromeexthele warare belealeat weramo be bllexlat beshat bextin s sthen fononest bexpele bupllin blexelin st 

Top-k Sampling (k=5):
<start> Thee and thy love's might: O let my looks be then the elocenatfrreathemeencanarin thedaruthemeveyexchathindevevelouroffreyouthanouremofrouthearisusthacthinaritous, blinaved, Thed, mpas I dof susthedo
