In [3]:
# imports and data
import os
import math
import random
import pickle
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
from google.colab import files
import os

# Upload your files interactively
uploaded = files.upload()
# After upload, the files will be in the current working directory
TRAIN_FILE = "Shakespeare_clean_train.txt"
VALIDATION_FILE = "Shakespeare_clean_valid.txt"
TEST_FILE = "Shakespeare_clean_test.txt"

Saving bpe_encoder.pkl to bpe_encoder.pkl


In [4]:
# load BPE Encoder
def load_bpe_encoder(path='bpe_encoder.pkl'):
    with open(path, 'rb') as f:
        return pickle.load(f)

# tokenize Text using BPE
def bpe_tokenize_word(word, merges):
    tokens = list(word) + ['</w>']
    for pair in merges:
        replacement = ''.join(pair)
        i = 0
        new_tokens = []
        while i < len(tokens):
            if i < len(tokens) - 1 and (tokens[i], tokens[i+1]) == pair:
                new_tokens.append(replacement)
                i += 2
            else:
                new_tokens.append(tokens[i])
                i += 1
        tokens = new_tokens
    return tokens

def tokenize_text(text, merges):
    tokens = []
    for word in text.strip().split():
        tokens.extend(bpe_tokenize_word(word.lower(), merges))
    return tokens

In [5]:
# Build vocab and dataset
def build_vocab(tokens):
    vocab = sorted(set(tokens))
    token_to_id = {token: i for i, token in enumerate(vocab)}
    id_to_token = {i: token for token, i in token_to_id.items()}
    return token_to_id, id_to_token

def create_trigrams(token_ids):
    inputs = []
    targets = []
    for i in range(len(token_ids) - 2):
        inputs.append([token_ids[i], token_ids[i+1]])
        targets.append(token_ids[i+2])
    return np.array(inputs), np.array(targets)


In [6]:
# trigram feedforward model
class NeuralTrigramModel:
    def __init__(self, vocab_size, embedding_dim=32, hidden_dim=128):
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim

        # Initialize weights
        self.embeddings = np.random.randn(vocab_size, embedding_dim) * 0.01
        self.W1 = np.random.randn(2 * embedding_dim, hidden_dim) * 0.01
        self.b1 = np.zeros((hidden_dim,))
        self.W2 = np.random.randn(hidden_dim, vocab_size) * 0.01
        self.b2 = np.zeros((vocab_size,))

    def forward(self, x):
        # x shape: (batch_size, 2)
        emb = self.embeddings[x]  # (batch_size, 2, emb_dim)
        emb = emb.reshape(x.shape[0], -1)  # flatten to (batch_size, 2*emb_dim)
        h = np.tanh(emb @ self.W1 + self.b1)  # (batch_size, hidden_dim)
        logits = h @ self.W2 + self.b2        # (batch_size, vocab_size)
        return logits

    def compute_loss(self, logits, targets):
        # Cross entropy loss
        probs = np.exp(logits - np.max(logits, axis=1, keepdims=True))
        probs /= probs.sum(axis=1, keepdims=True)
        loss = -np.log(probs[range(len(targets)), targets]).mean()
        return loss, probs

    def backward(self, x, targets, probs, lr=0.1):
        batch_size = x.shape[0]
        one_hot = np.zeros_like(probs)
        one_hot[np.arange(batch_size), targets] = 1
        dlogits = (probs - one_hot) / batch_size

        # Forward pass parts
        emb = self.embeddings[x].reshape(batch_size, -1)
        h = np.tanh(emb @ self.W1 + self.b1)

        # Gradients
        dW2 = h.T @ dlogits
        db2 = dlogits.sum(axis=0)

        dh = dlogits @ self.W2.T
        dh_raw = dh * (1 - h**2)

        dW1 = emb.T @ dh_raw
        db1 = dh_raw.sum(axis=0)

        demb = dh_raw @ self.W1.T
        demb = demb.reshape(batch_size, 2, self.embedding_dim)

        # Update embeddings
        for i in range(2):
            np.add.at(self.embeddings, x[:, i], -lr * demb[:, i])

        # Update weights
        self.W1 -= lr * dW1
        self.b1 -= lr * db1
        self.W2 -= lr * dW2
        self.b2 -= lr * db2


In [7]:
# model save and load
def save(self, path='neural_trigram_model.pkl'):
    with open(path, 'wb') as f:
        pickle.dump({
            'embeddings': self.embeddings,
            'W1': self.W1,
            'b1': self.b1,
            'W2': self.W2,
            'b2': self.b2
        }, f)
    print(f"✅ Neural model saved to: {path}")

def load(self, path='neural_trigram_model.pkl'):
    with open(path, 'rb') as f:
        params = pickle.load(f)
        self.embeddings = params['embeddings']
        self.W1 = params['W1']
        self.b1 = params['b1']
        self.W2 = params['W2']
        self.b2 = params['b2']
    print(f"✅ Neural model loaded from: {path}")


In [8]:
# preplexity score
def compute_perplexity(model, tokens, token_to_id, n=3):
    token_ids = [token_to_id.get(t, 0) for t in tokens]
    total_log_prob = 0
    count = 0

    for i in range(len(token_ids) - 2):
        context = token_ids[i:i+2]
        target = token_ids[i+2]

        x = np.array([context])
        logits = model.forward(x)
        probs = np.exp(logits - np.max(logits))
        probs /= probs.sum()

        prob = probs[0][target]
        total_log_prob += np.log(prob + 1e-10)  # add epsilon to avoid log(0)
        count += 1

    perplexity = np.exp(-total_log_prob / count) if count > 0 else float('inf')
    return perplexity


In [9]:
# train the model
def train_model(model, inputs, targets, epochs=5, batch_size=512, lr=0.1):
    for epoch in range(epochs):
        permutation = np.random.permutation(len(inputs))
        inputs_shuffled = inputs[permutation]
        targets_shuffled = targets[permutation]

        total_loss = 0.0
        for i in range(0, len(inputs), batch_size):
            x_batch = inputs_shuffled[i:i+batch_size]
            y_batch = targets_shuffled[i:i+batch_size]

            logits = model.forward(x_batch)
            loss, probs = model.compute_loss(logits, y_batch)
            model.backward(x_batch, y_batch, probs, lr)
            total_loss += loss * len(x_batch)

        avg_loss = total_loss / len(inputs)
        print(f"Epoch {epoch+1}: Loss = {avg_loss:.4f}")


In [10]:
# text generation
def generate_text(model, token_to_id, id_to_token, start_tokens=['the'], length=20):
    context = start_tokens[:2]
    result = context.copy()

    for _ in range(length):
        if len(context) < 2:
            context = ['<unk>'] * (2 - len(context)) + context
        x = np.array([[token_to_id.get(context[-2], 0), token_to_id.get(context[-1], 0)]])
        logits = model.forward(x)
        probs = np.exp(logits - np.max(logits))
        probs /= probs.sum()
        next_token_id = np.random.choice(len(probs[0]), p=probs[0])
        next_token = id_to_token[next_token_id]
        result.append(next_token)
        context.append(next_token)

    return " ".join(t.replace('</w>', '') for t in result)


In [12]:
# load text
with open(TRAIN_FILE, 'r', encoding='utf-8') as f:
    train_text = f.read().lower()

#load bpe encoder
merges = load_bpe_encoder()

tokens = tokenize_text(train_text, merges)
token_to_id, id_to_token = build_vocab(tokens)
token_ids = [token_to_id[t] for t in tokens]

# Prepare data
inputs, targets = create_trigrams(token_ids)

# Initialize and train model
model = NeuralTrigramModel(vocab_size=len(token_to_id))
train_model(model, inputs, targets, epochs=100, lr=0.05)

# Generate text
print("\nGenerated Text:")
print(generate_text(model, token_to_id, id_to_token, start_tokens=['the']))


Epoch 1: Loss = 8.6497
Epoch 2: Loss = 8.5778
Epoch 3: Loss = 8.4671
Epoch 4: Loss = 8.1451
Epoch 5: Loss = 7.7698
Epoch 6: Loss = 7.5780
Epoch 7: Loss = 7.4707
Epoch 8: Loss = 7.4028
Epoch 9: Loss = 7.3591
Epoch 10: Loss = 7.3301
Epoch 11: Loss = 7.3099
Epoch 12: Loss = 7.2953
Epoch 13: Loss = 7.2846
Epoch 14: Loss = 7.2763
Epoch 15: Loss = 7.2700
Epoch 16: Loss = 7.2651
Epoch 17: Loss = 7.2611
Epoch 18: Loss = 7.2579
Epoch 19: Loss = 7.2553
Epoch 20: Loss = 7.2531
Epoch 21: Loss = 7.2514
Epoch 22: Loss = 7.2498
Epoch 23: Loss = 7.2485
Epoch 24: Loss = 7.2474
Epoch 25: Loss = 7.2464
Epoch 26: Loss = 7.2456
Epoch 27: Loss = 7.2448
Epoch 28: Loss = 7.2442
Epoch 29: Loss = 7.2437
Epoch 30: Loss = 7.2431
Epoch 31: Loss = 7.2427
Epoch 32: Loss = 7.2422
Epoch 33: Loss = 7.2418
Epoch 34: Loss = 7.2415
Epoch 35: Loss = 7.2411
Epoch 36: Loss = 7.2408
Epoch 37: Loss = 7.2405
Epoch 38: Loss = 7.2402
Epoch 39: Loss = 7.2399
Epoch 40: Loss = 7.2396
Epoch 41: Loss = 7.2393
Epoch 42: Loss = 7.2390
E

In [None]:
# compute preplexity on validation
with open('valid.txt', 'r', encoding='utf-8') as f:
    valid_text = f.read().lower()

valid_tokens = tokenize_text(valid_text, merges)

# Compute perplexity
neural_ppl = compute_perplexity(model, valid_tokens, token_to_id)
#ngram_perplexities['Neural Trigram'] = neural_ppl
print(f"🔍 Neural Trigram Model Perplexity: {neural_ppl:.2f}")


In [None]:
# n-gram vs nueral-m-gram preplexity comparesion
import matplotlib.pyplot as plt

labels = list(ngram_perplexities.keys())
values = list(ngram_perplexities.values())

plt.figure(figsize=(8, 5))
bars = plt.bar([str(k) for k in labels], values, color='skyblue')
plt.ylabel('Perplexity')
plt.title('Perplexity Comparison: N-gram vs Neural Trigram')
plt.grid(axis='y')

# Add value labels
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2.0, yval + 0.1, f'{yval:.2f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()
