# DSA4213 Assignment 2

### Download dataset

In [4]:
import requests

url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
response = requests.get(url)
with open("tinyshakespeare.txt", "w", encoding="utf-8") as f:
    f.write(response.text)


### Imports and setup

In [5]:
import time
import math
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from collections import Counter
from tqdm import tqdm
import matplotlib.pyplot as plt

### Dataset preparation

In [6]:
class TextDataset(Dataset):
    def __init__(self, text, vocab, seq_len=128):
        self.vocab = vocab
        self.itos = list(vocab.keys())
        self.stoi = {w: i for i, w in enumerate(self.itos)}
        self.data = [self.stoi[w] for w in text]
        self.seq_len = seq_len

    def __len__(self):
        return len(self.data) - self.seq_len

    def __getitem__(self, idx):
        x = torch.tensor(self.data[idx:idx+self.seq_len], dtype=torch.long)
        y = torch.tensor(self.data[idx+1:idx+self.seq_len+1], dtype=torch.long)
        return x, y

def build_vocab(tokens, min_freq=1):
    counter = Counter(tokens)
    vocab = {word: i for i, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    return vocab

def tokenize(text, level="word"):
    if level == "word":
        return text.split()
    else:  # char-level
        return list(text)

### Build RNN Model

In [7]:
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embed_size=128, hidden_size=256, num_layers=2, dropout=0.2):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.RNN(embed_size, hidden_size, num_layers,
                          dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embed(x)
        out, hidden = self.rnn(x, hidden)
        out = self.fc(out)
        return out, hidden


### Evaluation Functions

In [8]:
def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out, _ = model(x)
            loss = criterion(out.view(-1, out.size(-1)), y.view(-1))
            total_loss += loss.item()
    return total_loss / len(loader)

### Training loops

In [None]:
def train_model(train_loader, val_loader, model, criterion,
                optimizer, epochs=3, clip=1.0, device="cpu"):
    model = model.to(device)
    train_losses, val_losses = [], []
    start_time = time.time()

    for epoch in range(1, epochs+1):
        model.train()
        total_loss = 0
        for x, y in tqdm(train_loader, desc=f"Epoch {epoch}"):
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            out, _ = model(x)
            loss = criterion(out.view(-1, out.size(-1)), y.view(-1))
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), clip)
            optimizer.step()
            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)
        avg_val_loss = evaluate(model, val_loader, criterion, device)

        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)

        print(f"Epoch {epoch}: Train Loss {avg_train_loss:.3f}, "
              f"Val Loss {avg_val_loss:.3f}, Val PPL {math.exp(avg_val_loss):.3f}")

    total_time = time.time() - start_time
    print(f"\nTraining finished in {total_time:.2f} seconds")

    # Plot training curve
    plt.figure()
    plt.plot(train_losses, label="Train Loss")
    plt.plot(val_losses, label="Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Training and Validation Loss")
    plt.savefig("loss_curve.png")
    plt.close()

    return train_losses, val_losses, total_time

### Text generation

In [10]:
def generate_text(model, seed_text, stoi, itos,
                  length=200, temperature=1.0, device="cpu"):
    model.eval()
    input_seq = torch.tensor([stoi.get(ch, 0) for ch in seed_text],
                             dtype=torch.long).unsqueeze(0).to(device)
    hidden = None
    output_text = seed_text

    with torch.no_grad():
        for _ in range(length):
            out, hidden = model(input_seq, hidden)
            logits = out[:, -1, :] / temperature
            probs = torch.softmax(logits, dim=-1)
            next_idx = torch.multinomial(probs, 1).item()
            next_char = itos[next_idx]
            output_text += next_char
            input_seq = torch.tensor([[next_idx]], dtype=torch.long).to(device)
    return output_text


### Main runner

In [None]:
if __name__ == "__main__":
    # Configs
    seq_len = 128
    tokenize_level = "word"
    batch_size = 32
    epochs = 3
    lr = 1e-3
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Using device:", device)

    # Load data
    with open("tinyshakespeare.txt", "r", encoding="utf-8") as f:
        raw_text = f.read()

    tokens = tokenize(raw_text, tokenize_level)
    vocab = build_vocab(tokens)
    dataset = TextDataset(tokens, vocab, seq_len=seq_len)

    # Split 80/10/10
    n = len(dataset)
    train_size = int(0.8 * n)
    val_size = int(0.1 * n)
    test_size = n - train_size - val_size
    train_ds, val_ds, test_ds = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size)
    test_loader = DataLoader(test_ds, batch_size=batch_size)

    # Model, loss, optimizer
    model = RNNModel(len(vocab), embed_size=128, hidden_size=256,
                     num_layers=2, dropout=0.2)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Train
    train_losses, val_losses, train_time = train_model(
        train_loader, val_loader, model, criterion, optimizer,
        epochs=epochs, clip=1.0, device=device
    )

    # Test
    test_loss = evaluate(model, test_loader, criterion, device)
    print(f"\nTest Loss: {test_loss:.3f}, Test PPL: {math.exp(test_loss):.3f}")

    # Generate samples
    stoi = dataset.stoi
    itos = dataset.itos
    for T in [0.7, 1.0, 1.3]:
        sample = generate_text(model, seed_text="ROMEO:", stoi=stoi, itos=itos,
                               length=300, temperature=T, device=device)
        print(f"\nSample (T={T}):\n{sample}\n")

Using device: cpu


Epoch 1:  79%|███████▊  | 3977/5064 [54:09<15:30,  1.17it/s] 

### Evaluation

In [None]:
import json
import os

def save_evaluation(train_losses, val_losses, train_time, test_loss, model, dataset, out_dir="results"):
    os.makedirs(out_dir, exist_ok=True)

    # Save losses + metrics to JSON
    results = {
        "train_losses": train_losses,
        "val_losses": val_losses,
        "train_time_sec": train_time,
        "test_loss": test_loss,
        "test_perplexity": math.exp(test_loss),
        
    }

    with open(os.path.join(out_dir, "evaluation.json"), "w") as f:
        json.dump(results, f, indent=4)

    # Save model + vocab for reuse
    torch.save(model.state_dict(), os.path.join(out_dir, "rnn_model.pt"))
    torch.save({"stoi": dataset.stoi, "itos": dataset.itos}, os.path.join(out_dir, "vocab.pt"))

    print(f"Evaluation and model saved to: {out_dir}/")
