In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import re
from collections import Counter
import random

In [2]:
class CustomTokenizer:
    def __init__(self, vocab):
        self.vocab = vocab
        self.pad_token = '[PAD]'
        self.unk_token = '[UNK]'
        self.cls_token = '[CLS]'
        self.mask_token = '[MASK]'

        # Ensure special tokens are in the vocabulary
        self.vocab[self.pad_token] = 0
        self.vocab[self.unk_token] = 1
        self.vocab[self.cls_token] = 2
        self.vocab[self.mask_token] = 3

        self.reverse_vocab = {v: k for k, v in self.vocab.items()}

    def encode(self, text):
        # Tokenize the text and map to token IDs
        tokens = text.split()
        token_ids = [self.vocab.get(token, self.vocab[self.unk_token]) for token in tokens]
        return torch.tensor(token_ids)

    def decode(self, token_ids):
        # Decode token ids back to text
        tokens = [self.reverse_vocab.get(id.item(), self.unk_token) for id in token_ids]
        return " ".join(tokens)

In [3]:
# BERT Model for Embedding Extraction
class BERTEmbeddingExtractor(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_len):
        super(BERTEmbeddingExtractor, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.position_embedding = nn.Embedding(max_len, embedding_dim)
        self.layer_norm = nn.LayerNorm(embedding_dim)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=8),
            num_layers=6
        )
        self.cls_token = nn.Parameter(torch.randn(1, embedding_dim))  # CLS token for embedding extraction

    def forward(self, input_ids):
        seq_len = input_ids.size(1)
        positions = torch.arange(0, seq_len).unsqueeze(0).to(input_ids.device)
        token_embeddings = self.embedding(input_ids)
        position_embeddings = self.position_embedding(positions)
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        transformer_output = self.transformer(embeddings)
        return transformer_output  # Return embeddings directly (not logits)


In [8]:
# Dataset for MLM Training
class TextDataset(Dataset):
    def __init__(self, texts, tokenizer, max_len=50):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        token_ids = self.tokenizer.encode(text)
        # Apply padding/truncation
        token_ids = token_ids[:self.max_len]
        token_ids = torch.cat([token_ids, torch.tensor([self.tokenizer.vocab[self.tokenizer.pad_token]] * (self.max_len - len(token_ids)))])

        # Create masked version of the input for MLM
        masked_token_ids = token_ids.clone()
        mask_indices = random.sample(range(self.max_len), k=int(self.max_len * 0.15))  # Mask 15% of the tokens
        for idx in mask_indices:
            masked_token_ids[idx] = self.tokenizer.vocab[self.tokenizer.mask_token]  # Apply [MASK]

        return masked_token_ids, token_ids  # masked input, original target

In [5]:
# MLM Loss Function
def mlm_loss(predictions, targets, mask_token_id):
    mask = (targets != mask_token_id).float()  # Ignore the masked positions in the target
    loss = torch.nn.CrossEntropyLoss(reduction='none')(predictions.view(-1, predictions.size(-1)), targets.view(-1))
    loss = loss * mask.view(-1)  # Apply mask to ignore the padded/unused tokens
    return loss.sum() / mask.sum()  # Normalize the loss

# Training Loop
def train_model(model, dataloader, optimizer, tokenizer, epochs=5):
    model.train()
    mask_token_id = tokenizer.vocab[tokenizer.mask_token]
    for epoch in range(epochs):
        running_loss = 0.0
        for i, (masked_input, original_target) in enumerate(dataloader):
            optimizer.zero_grad()
            masked_input = masked_input.to(next(model.parameters()).device)
            original_target = original_target.to(next(model.parameters()).device)

            # Forward pass
            output = model(masked_input)
            predictions = output.view(-1, output.size(-1))  # Flatten the output to match the shape of the target

            # Compute loss
            loss = mlm_loss(predictions, original_target, mask_token_id)
            loss.backward()

            # Update weights
            optimizer.step()

            running_loss += loss.item()
            if (i + 1) % 100 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Step [{i+1}/{len(dataloader)}], Loss: {running_loss / 100:.4f}")
                running_loss = 0.0

In [10]:
def save_embeddings_to_csv(model, texts, tokenizer, csv_file="embeddings.csv"):
    model.eval()
    embeddings = []
    with torch.no_grad():
        for text in texts:
            input_ids = tokenizer.encode(text)  # Only need input_ids
            input_ids = input_ids.unsqueeze(0).to(next(model.parameters()).device)  # Add batch dimension
            output = model(input_ids)
            cls_embedding = output[:, 0, :].squeeze().cpu().numpy()  # [CLS] token embedding
            embeddings.append(cls_embedding)

    # Save embeddings to a CSV
    df = pd.DataFrame(embeddings)
    df.to_csv(csv_file, index=False)
    print(f"Embeddings saved to {csv_file}")

In [11]:
# Main Execution
def main():
    # Define a small vocabulary and custom tokenizer for testing
    vocab = {'the': 4, 'quick': 5, 'brown': 6, 'fox': 7, 'jumps': 8, 'over': 9, 'lazy': 10, 'dog': 11}
    tokenizer = CustomTokenizer(vocab)

    # Define the model
    model = BERTEmbeddingExtractor(vocab_size=len(vocab), embedding_dim=128, max_len=50).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
    ["the quick brown fox", "jumps over the lazy dog", "the quick fox jumps over the dog", "lazy dogs are jumping"]
    # Create dataset and dataloader
    texts =
    dataset = TextDataset(texts, tokenizer)
    dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

    # Define optimizer
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    # Train the model
    train_model(model, dataloader, optimizer, tokenizer, epochs=5)

    # Save embeddings after training
    save_embeddings_to_csv(model, texts, tokenizer, csv_file="embeddings_after_training.csv")

if __name__ == "__main__":
    main()



Embeddings saved to embeddings_after_training.csv
