# import text8 

In [1]:
with open("text8", "r") as f:
    text = f.read()

FileNotFoundError: [Errno 2] No such file or directory: 'text8'

In [None]:
 # tokens are separated by whitespace
tokens = text.split()
print(tokens[:20]) 

In [None]:
from collections import Counter
import torch

## Import query and passages

## Take most common 

In [None]:
# take the 30,000 most frequently occuring tokens

token_counts = Counter(tokens)
most_common = token_counts.most_common(30000)
vocab_list = [token for token, _ in most_common]

# Create simple word-to-index and index-to-word mappings
token_to_index = {token: idx for idx, token in enumerate(vocab_list)} # for turning raw text into training data
index_to_token = {idx: token for token, idx in token_to_index.items()} # reverse index - for interpreting model outputs/predictions

# Define CBOW 

In [None]:

class CBOW(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(CBOW, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear = nn.Linear(embedding_dim, vocab_size)

    def forward(self, inputs):
        embeds = self.embeddings(inputs)
        mean_embeds = embeds.mean(dim=1) 
        out = self.linear(mean_embeds)
        return out

In [None]:
from torch.utils.data import DataLoader

## generate pairs 

In [None]:
def generate_cbow_pairs(tokens, window_size):
    pairs = []
    vocab_set = set(vocab_list)  # Use a set for faster membership checks

    for i in range(window_size, len(tokens) - window_size):
        context = tokens[i - window_size:i] + tokens[i+1:i + window_size + 1]
        target = tokens[i]

        # Only include tokens that are in the vocab_list
        if target in vocab_set and all(token in vocab_set for token in context):
            # Convert context and target to indices
            indexed_context = [token_to_index[token] for token in context]
            indexed_target = token_to_index[target]
            pairs.append((indexed_context, indexed_target))
    return pairs

## define class 

In [None]:
class CBOWDataset(torch.utils.data.Dataset):
    def __init__(self, pairs):
        self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        context, target = self.pairs[idx]
        return torch.tensor(context), torch.tensor(target)

In [None]:
import random
from sklearn.model_selection import train_test_split

embedding_dim = 100  # 50-100 for 10k-30k tokens
window_size = 2  # either side of centre token

# Generate CBOW pairs from the full dataset
cbow_pairs = generate_cbow_pairs(tokens, window_size)

# Split into training and test sets (e.g. 20% train, 2% test = 90% drop)
train_pairs, test_pairs = train_test_split(
    cbow_pairs,
    train_size=0.20,
    test_size=0.02,
    random_state=42
)


In [None]:
# training dataset
dataset = CBOWDataset(train_pairs)
dataloader = DataLoader(dataset, batch_size=128, shuffle=True)

In [None]:
vocab_size = len(token_to_index) # 30k

# use cuda
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CBOW(vocab_size, embedding_dim)

model = model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 10
for epoch in range(epochs):
    total_loss = 0
    for context_batch, target_batch in dataloader:
        context_batch = context_batch.to(device)  # Move input batch to device
        target_batch = target_batch.to(device)  # Move target batch to device

        output = model(context_batch)             
        loss = loss_fn(output, target_batch)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    average_epoch_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch+1}, Avg Loss: {average_epoch_loss:.4f}")

## testing

In [None]:

test_dataset = CBOWDataset(test_pairs)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)

model.eval()
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for context_batch, target_batch in test_dataloader:
        context_batch = context_batch.to(device)
        target_batch = target_batch.to(device)

        output = model(context_batch)
        loss = loss_fn(output, target_batch)
        test_loss += loss.item()

        _, predicted = torch.max(output, dim=1)
        correct += (predicted == target_batch).sum().item()
        total += target_batch.size(0)

average_test_loss = test_loss / len(test_dataloader)
accuracy = correct / total * 100

print(f"Test Loss: {average_test_loss:.4f}, Accuracy: {accuracy:.2f}%")

# similar words test function

In [None]:


import torch.nn.functional as F

def get_similar_words(query_word, word_to_ix, ix_to_word, embeddings, top_k=5):
    if query_word not in word_to_ix:
        print(f"'{query_word}' not in vocabulary.")
        return []

    query_idx = word_to_ix[query_word]
    query_embedding = embeddings[query_idx]

    # Compute cosine similarity with all embeddings
    similarities = F.cosine_similarity(query_embedding.unsqueeze(0), embeddings)
    
    # Get top_k most similar (excluding the query word itself)
    top_indices = similarities.argsort(descending=True)[1:top_k+1]
    similar_words = [(ix_to_word[idx.item()], similarities[idx].item()) for idx in top_indices]
    
    return similar_words

# normalize embeddings

In [None]:

embedding_weights = model.embeddings.weight.data  # shape: [vocab_size, embedding_dim]
norms = embedding_weights.norm(dim=1, keepdim=True)
normalized_embeddings = embedding_weights / norms

## function to test with a word 

In [None]:
word = "three"
similar = get_similar_words(word, token_to_index, index_to_token, normalized_embeddings, top_k=5)
print(f"Words similar to {word}:")
for word, score in similar:
    print(f"{word} ({score:.4f})")