In [1]:
# =====================================================
# Phase 4 - BiLSTM Language Model
# =====================================================

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import pickle
import numpy as np

In [2]:
# ----------------------------
# 1. Load dataset
# ----------------------------
df = pd.read_csv("../data/clean_dataset.csv")

# Use smaller subset for training speed
sample_df = df.sample(5000, random_state=42)

In [3]:
# ----------------------------
# 2. Load BPE Tokenizer
# (re-train small tokenizer here if needed)
# ----------------------------
class BPETokenizer:
    def __init__(self):
        pass
    def encode(self, text):
        return list(text)   # simple char-level for demo
    def decode(self, tokens):
        return "".join(tokens)

bpe = BPETokenizer()
sequences = [bpe.encode(c) for c in sample_df['code'].astype(str)]

# Build vocab
all_tokens = [t for seq in sequences for t in seq]
vocab = list(set(all_tokens))
word_to_idx = {w: i for i, w in enumerate(vocab)}
idx_to_word = {i: w for w, i in word_to_idx.items()}

print("Vocab size:", len(vocab))

Vocab size: 677


In [4]:
# ----------------------------
# 3. Dataset & DataLoader
# ----------------------------
class TextDataset(Dataset):
    def __init__(self, sequences, word_to_idx, seq_len=20):
        self.data = []
        for seq in sequences:
            idxs = [word_to_idx[t] for t in seq if t in word_to_idx]
            for i in range(len(idxs) - seq_len):
                x = idxs[i:i+seq_len]
                y = idxs[i+1:i+seq_len+1]
                self.data.append((x, y))
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return torch.tensor(self.data[idx][0]), torch.tensor(self.data[idx][1])

dataset = TextDataset(sequences, word_to_idx)
loader = DataLoader(dataset, batch_size=64, shuffle=True)


In [5]:
# ----------------------------
# 4. Define BiLSTM Model
# ----------------------------
class BiLSTMLM(nn.Module):
    def __init__(self, vocab_size, embed_dim=100, hidden_dim=128, num_layers=2):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, 
                            batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim*2, vocab_size)  # 2x for BiLSTM
    def forward(self, x):
        x = self.embed(x)
        out, _ = self.lstm(x)
        out = self.fc(out)
        return out

device = "cuda" if torch.cuda.is_available() else "cpu"
model = BiLSTMLM(len(vocab)).to(device)


In [6]:
# ----------------------------
# 5. Training Loop
# ----------------------------
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

EPOCHS = 3
for epoch in range(EPOCHS):
    total_loss = 0
    for X, Y in loader:
        X, Y = X.to(device), Y.to(device)
        optimizer.zero_grad()
        output = model(X)
        loss = criterion(output.view(-1, len(vocab)), Y.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}, Perplexity: {np.exp(total_loss/len(loader)):.2f}")


KeyboardInterrupt: 

In [None]:
# ----------------------------
# 6. Text Generation
# ----------------------------
def generate_text(model, start_token="d", length=50):
    model.eval()
    tokens = [word_to_idx.get(start_token, 0)]
    for _ in range(length):
        inp = torch.tensor(tokens[-20:], dtype=torch.long).unsqueeze(0).to(device)
        with torch.no_grad():
            out = model(inp)
            next_token = torch.argmax(out[0, -1]).item()
        tokens.append(next_token)
    return "".join([idx_to_word[i] for i in tokens])

print("\nGenerated text sample:")
print(generate_text(model, start_token="d"))