In [1]:
import nltk
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from nltk.tokenize import word_tokenize
from collections import Counter

nltk.download('gutenberg')
nltk.download('punkt')
nltk.download('punkt_tab')

import re
import random
import math
import time
import os


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.4.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/opt/anaconda3/envs/pytorch_cpu/lib/python3.12/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/opt/anaconda3/envs/pytorch_cpu/lib/python3.12/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/opt/anaconda3/envs/pytorch_cpu/lib/python3.12/site-packages/ipykernel/kernelapp.py", line 758, in start
    self.io_loop.

The nltk.corpus.gutenberg module is a built-in dataset within the Natural Language Toolkit (NLTK) library for Python. It provides a curated selection of roughly 10-15 classic, public-domain books from the Project Gutenberg electronic text archive, totaling approximately 2 million words.

# Load data

In [None]:
from nltk.corpus import gutenberg
print(gutenberg.fileids())

# Preprocessing data

In [None]:
# For limitation of resources, we are using only one book from gutenberg archive
corpus = gutenberg.raw('shakespeare-hamlet.txt')

# Clean the corpus: Remove metadata, stage directions, speaker names
corpus = re.sub(r'\[.*?\]', '', corpus)  # Remove brackets
corpus = re.sub(r'Actus.*?\.', '', corpus, flags=re.DOTALL)  # Remove act/scene headers
corpus = re.sub(r'Scena.*?\.', '', corpus, flags=re.DOTALL)
corpus = re.sub(r'Enter.*?\.', '', corpus, flags=re.DOTALL)  # Remove enter/exit
corpus = re.sub(r'Exit.*?\.', '', corpus, flags=re.DOTALL)
corpus = re.sub(r'Exeunt.*?\.', '', corpus, flags=re.DOTALL)
corpus = re.sub(r'\w+\.', '', corpus)  # Remove speaker names like 'Ham.'
corpus = re.sub(r'\s+', ' ', corpus).strip()  # Normalize spaces

In [None]:
corpus[0:100]

In [None]:
# total words
print(len(corpus))

In [None]:
# Tokenize corpus and add sentence boundaries
corpus = corpus.lower().replace("\n", " <eos> ")
tokens = word_tokenize(corpus)


# Build vocabulary
counter = Counter(tokens)

# special tokens
specials = ["<unk>", "<pad>", "<sos>", "<eos>"]

# keep words appearing at least twice
vocab_list = specials + [w for w, c in counter.items() if c >= 2 and w not in specials]
# Maps between words and indices
word2idx = {w: i for i, w in enumerate(vocab_list)}
idx2word = {i: w for w, i in word2idx.items()}

pad_idx = word2idx["<pad>"]
unk_idx = word2idx["<unk>"]


# Any word not in vocab becomes <unk>
numerical_tokens = [word2idx.get(tok, unk_idx) for tok in tokens]

print(f"Vocab size: {len(word2idx)} | Total tokens: {len(numerical_tokens)}")


# Create sliding windows
def create_sequences(tokens, seq_len):
    inputs, targets = [], []
    for i in range(len(tokens) - seq_len):
        # input window
        inputs.append(tokens[i : i + seq_len])
        # next-token targets
        targets.append(tokens[i + 1 : i + seq_len + 1])
    return torch.tensor(inputs), torch.tensor(targets)


SEQ_LEN = 50
inputs, targets = create_sequences(numerical_tokens, SEQ_LEN)

print("Total sequences:", len(inputs))
# Split sequences into train / val / test
n = len(inputs)
train_end = int(0.85 * n)
val_end = int(0.95 * n)

# slice
train_inputs = inputs[:train_end]
train_targets = targets[:train_end]

val_inputs = inputs[train_end:val_end]
val_targets = targets[train_end:val_end]

test_inputs = inputs[val_end:]
test_targets = targets[val_end:]

print("Train:", train_inputs.shape)
print("Val:  ", val_inputs.shape)
print("Test: ", test_inputs.shape)

# DataLoaders
train_loader = DataLoader(
    TensorDataset(train_inputs, train_targets), batch_size=32, shuffle=True
)

val_loader = DataLoader(
    TensorDataset(val_inputs, val_targets), batch_size=32, shuffle=False
)

test_loader = DataLoader(
    TensorDataset(test_inputs, test_targets), batch_size=32, shuffle=False
)

# Model Definition

Long Short-Term Memory (LSTM) networks are gated recurrent neural networks that learn long-range dependencies by keeping a dedicated *cell state* (c_t) and controlling information flow with gates. This architecture helps gradients survive across many time steps.


---

## Core equations (standard LSTM)

Notation:

* (x_t\in\mathbb{R}^d) — input at time (t)
* (h_t\in\mathbb{R}^h) — hidden state / output at time (t)
* (c_t\in\mathbb{R}^h) — cell state at time (t)
* (W_\bullet\in\mathbb{R}^{h\times d},;U_\bullet\in\mathbb{R}^{h\times h},;b_\bullet\in\mathbb{R}^h) — parameters

At each step:
[
\begin{aligned}
i_t &= \sigma(W_i x_t + U_i h_{t-1} + b_i) \
f_t &= \sigma(W_f x_t + U_f h_{t-1} + b_f) \
\tilde{c}*t &= \tanh(W_c x_t + U_c h*{t-1} + b_c) \
c_t &= f_t \odot c_{t-1} + i_t \odot \tilde{c}*t \
o_t &= \sigma(W_o x_t + U_o h*{t-1} + b_o) \
h_t &= o_t \odot \tanh(c_t)
\end{aligned}
]

* (\sigma(\cdot)) is the sigmoid (0..1); (\tanh(\cdot)) is used for the candidate and final squashing.
* (\odot) denotes elementwise product.

**Compact (concatenated) form:** put the four gate pre-activations into one vector:
[
z_t = W x_t + U h_{t-1} + b \in\mathbb{R}^{4h},
]
then split (z_t) into ([i_{pre}, f_{pre}, o_{pre}, g_{pre}]) and apply activations to recover (i_t,f_t,o_t,\tilde c_t). This is what most frameworks do for efficiency.

---

## Intuition (what each gate does)

* **Forget gate (f_t)**: how much of past cell (c_{t-1}) to keep.
* **Input gate (i_t)**: how much new information (\tilde c_t) to write into the cell.
* **Cell (c_t)**: long-term memory that accumulates retained past + new info.
* **Output gate (o_t)**: how much of the cell (after (\tanh)) to expose as hidden state (h_t).

The gates let the model *learn* to remember or forget over long ranges, which mitigates vanishing gradients.


In [2]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.3):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embed_size)

        # LSTM with dropout between layers (only works if num_layers > 1)
        self.lstm = nn.LSTM(
            embed_size,
            hidden_size,
            num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0,
        )

        # Dropout on LSTM outputs
        self.dropout = nn.Dropout(dropout)

        # Final classifier
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        # x: (B, T)
        x = self.embedding(x)  # (B, T, E)
        out, _ = self.lstm(x)  # (B, T, H)
        out = self.dropout(out)  # regularization
        out = self.fc(out)  # (B, T, V)
        return out

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMModel(
    vocab_size=len(word2idx),
    embed_size=256,
    hidden_size=256,
    num_layers=2,
    dropout=0.3,  
).to(device)

optimizer = optim.Adam(model.parameters(), lr=3e-4, weight_decay=1e-5) 

criterion = nn.CrossEntropyLoss(ignore_index=word2idx["<pad>"])

In [None]:
model

# Training

In [None]:
def calc_loss_and_ppl(model, loader, vocab_size, pad_idx, device):
    """
    Calculate loss and Perplexity
    """
    was_training = model.training
    model.eval()

    criterion = nn.CrossEntropyLoss(ignore_index=pad_idx, reduction="sum")

    total_loss = 0.0
    total_tokens = 0

    with torch.no_grad(): # disable grad
        for input_batch, target_batch in loader:
            input_batch = input_batch.to(device)
            target_batch = target_batch.to(device)

            logits = model(input_batch)  # (B, T, V)

            loss = criterion(
                logits.view(-1, vocab_size),
                target_batch.view(-1)
            ) # Token-level negative log-likelihood (sum)

            # Count non-pad tokens
            total_loss += loss.item()
            total_tokens += (target_batch != pad_idx).sum().item()

    avg_loss = total_loss / total_tokens
    ppl = math.exp(avg_loss)

    if was_training:
        model.train()

    return avg_loss, ppl


In [None]:
def train_model(epochs, train_loader, val_loader, pad_idx):
    # to store each epoch losses
    train_losses = []
    val_losses = []
    train_ppls = []
    val_ppls = []

    start_time = time.perf_counter()

    for epoch in range(epochs):

        # train 
        model.train()
        for input_batch, target_batch in train_loader:
            input_batch = input_batch.to(device)
            target_batch = target_batch.to(device)

            optimizer.zero_grad() # Clear old gradients

            logits = model(input_batch)  # (B, T, V)

            loss = criterion(
                logits.view(-1, len(word2idx)),
                target_batch.view(-1)
            ) # Compute token-level cross-entropy

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Clip gradients to prevent exploding gradients
            optimizer.step()

        # eval (no grad)
        model.eval()
        with torch.no_grad():
            #  compute TRAIN loss & ppl
            train_loss, train_ppl = calc_loss_and_ppl(
                model,
                train_loader,
                vocab_size=len(word2idx),
                pad_idx=pad_idx,
                device=device
            )
            # compute VAL loss & ppl 
            val_loss, val_ppl = calc_loss_and_ppl(
                model,
                val_loader,
                vocab_size=len(word2idx),
                pad_idx=pad_idx,
                device=device
            )

        train_losses.append(train_loss)
        train_ppls.append(train_ppl)
        val_losses.append(val_loss)
        val_ppls.append(val_ppl)

        if (epoch + 1) % 2 == 0:
            print(
                f"Epoch {epoch+1:3d}/{epochs} | "
                f"Train Loss: {train_loss:.2f} | "
                f"Train PPL: {train_ppl:.2f} | "
                f"Val Loss: {val_loss:.2f} | "
                f"Val PPL: {val_ppl:.2f}"
            )

    total_time = time.perf_counter() - start_time
    return train_losses, val_losses, train_ppls, val_ppls, total_time


In [None]:
num_epoches = 10
res = train_model(
    epochs=num_epoches,
    train_loader=train_loader,
    val_loader=val_loader,
    pad_idx=word2idx["<pad>"]
)

<img src="tr.png"  width="500"/>

In [None]:
print(
    f"{'Epoch':<15}{'Train loss':<15}{'Train PPL':<15}{'Val loss':<15}{'Val PPL':<15}{'Training time(ms)'}"
)
print(
    f"{num_epoches:<15}{sum(res[0])/num_epoches:<15.2f}{sum(res[2])/num_epoches:<15.2f}{sum(res[1])/num_epoches:<15.2f}{sum(res[-2])/num_epoches:<15.2f}{res[-1]* 1000:.2f}"
)

<img src="fl.png"  width="500"/>

In [None]:
# test
test_loss, test_ppl = calc_loss_and_ppl(
    model, test_loader, vocab_size=len(word2idx), pad_idx=pad_idx, device=device
)
print(f"{'Test loss':<15}{'Test PPL'}")
print(f"{test_loss:<15.2f}{test_ppl:.2f}")

<img src="tl.png"  width="500"/>

# Save model

In [None]:
saved_path = os.path.join(os.getcwd(), "app", "saved_model")
torch.save({
    "model_state": model.state_dict(),
    "optimizer_state": optimizer.state_dict(),
    "word2idx": word2idx,
    "idx2word": idx2word,
}, os.path.join(saved_path, "checkpoint1.pt"))

# Load model

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
saved_path = os.path.join(os.getcwd(), "app", "saved_model")
ckpt = torch.load(os.path.join(saved_path, "checkpoint.pt"), map_location=device)

model = LSTMModel(
    vocab_size=len(ckpt["word2idx"]),
    embed_size=256,
    hidden_size=256,
    num_layers=2,
    dropout=0.3
).to(device)

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=3e-4,          
    weight_decay=1e-5
)


model.load_state_dict(ckpt["model_state"])
optimizer.load_state_dict(ckpt["optimizer_state"])

word2idx = ckpt["word2idx"]
idx2word = ckpt["idx2word"]

model.eval()


LSTMModel(
  (embedding): Embedding(1745, 256)
  (lstm): LSTM(256, 256, num_layers=2, batch_first=True, dropout=0.3)
  (dropout): Dropout(p=0.3, inplace=False)
  (fc): Linear(in_features=256, out_features=1745, bias=True)
)

# Generate text

In [None]:
def generate_text(
    model, word2idx, idx2word, seed_text,
    length=50, temperature=0.8, sequence_length=50
):
    model.eval()

    tokens = word_tokenize(seed_text.lower())
    indices = [word2idx.get(token, word2idx["<unk>"]) for token in tokens]
    generated = indices.copy()

    with torch.no_grad():
        for _ in range(length):
            current_seq = generated[-sequence_length:]

            if len(current_seq) < sequence_length:
                current_seq = [word2idx["<pad>"]] * (sequence_length - len(current_seq)) + current_seq

            input_tensor = torch.tensor([current_seq], dtype=torch.long).to(device)

            output = model(input_tensor)

            logits = output[0, -1] / temperature
            probs = torch.softmax(logits, dim=0)
            next_token = torch.multinomial(probs, 1).item()

            generated.append(next_token)

    return " ".join(idx2word.get(idx, "<unk>") for idx in generated)


In [5]:
seed = "he was"
for i in [0.5,0.6,0.7,0.8]:
    print(i)
    print("--"*10)
    generated_sequence = generate_text(model, word2idx, idx2word, seed,temperature=i, length=25)
    print(f'Generated sequence from "{seed}":\n{generated_sequence}')
    print('\n')

0.5
--------------------
Generated sequence from "he was":
he was but for i haue seene <unk> , as 'twere may go the <unk> through the time of my defence but of all youth to earth


0.6
--------------------
Generated sequence from "he was":
he was a guildensterne ? now , i could the <unk> <unk> : for so capitall i <unk> my husband flye to : but that she speakes


0.7
--------------------
Generated sequence from "he was":
he was for your good polonius good friends , and may ( not <unk> i know my <unk> , let want <unk> note , and you do


0.8
--------------------
Generated sequence from "he was":
he was by a great lose , make <unk> the first : he comes to the <unk> , a <unk> vulgar , the pardon that was a


