In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertModel
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.model_selection import train_test_split
# import wic
import warnings
from tqdm import tqdm
from torch.cuda.amp import GradScaler, autocast
from datasets import Dataset
import math
import pandas as pd
import numpy as np
warnings.filterwarnings("ignore", category=DeprecationWarning) 

In [None]:
from torch.utils.data import Dataset

class WSDTripletDataset(Dataset):
    def __init__(self, hf_dataset, tokenizer, max_length):
        self.dataset = hf_dataset
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        data = self.dataset.iloc[idx]  # Use iloc for pandas DataFrame

        def tokenize(text):
            tokens = self.tokenizer(
                text,
                padding="max_length",
                truncation=True,
                max_length=self.max_length,
                return_tensors="pt"
            )
            return {
                'input_ids': tokens['input_ids'].squeeze(0),
                'attention_mask': tokens['attention_mask'].squeeze(0)
            }

        return {
            'anchor': tokenize(data['anchor']),
            'positive': tokenize(data['positive']),
            'negative': tokenize(data['negative']),
            'target_word': data['target_word']
        }

In [None]:
from transformers import AutoModel

class TripletBERT(nn.Module):
    def __init__(self, model_name):
        super(TripletBERT, self).__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.hidden_size = self.bert.config.hidden_size
        self.fc = nn.Linear(self.hidden_size, 256)

    def get_embedding(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]
        proj = self.fc(cls_output)
        return proj  

    def forward(self, anchor, positive, negative):
        anchor_embed = self.get_embedding(anchor["input_ids"], anchor["attention_mask"])
        positive_embed = self.get_embedding(positive["input_ids"], positive["attention_mask"])
        negative_embed = self.get_embedding(negative["input_ids"], negative["attention_mask"])
        return anchor_embed, positive_embed, negative_embed


In [None]:
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.loss_fn = nn.TripletMarginLoss(margin=margin, p=2)

    def forward(self, anchor, positive, negative):
        return self.loss_fn(anchor, positive, negative)

In [None]:
def collate_fn(batch):
    return {
        'anchor': {
            'input_ids': torch.stack([item['anchor']['input_ids'] for item in batch]),
            'attention_mask': torch.stack([item['anchor']['attention_mask'] for item in batch])
        },
        'positive': {
            'input_ids': torch.stack([item['positive']['input_ids'] for item in batch]),
            'attention_mask': torch.stack([item['positive']['attention_mask'] for item in batch])
        },
        'negative': {
            'input_ids': torch.stack([item['negative']['input_ids'] for item in batch]),
            'attention_mask': torch.stack([item['negative']['attention_mask'] for item in batch])
        },
        'target_word': [item['target_word'] for item in batch]
    }


In [None]:
import torch
import math
from tqdm import tqdm
from torch.cuda.amp import autocast, GradScaler

def train_triplet(model, trainloader, testloader, optimizer, loss_fn, device, num_epochs=5):
    model.to(device)
    scaler = GradScaler()
    best_loss = math.inf

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        progress_bar = tqdm(trainloader, desc=f"Epoch {epoch+1}/{num_epochs}")

        for batch in progress_bar:
            optimizer.zero_grad()

            # Move to device
            anchor_ids = batch["anchor"]["input_ids"].to(device)
            anchor_mask = batch["anchor"]["attention_mask"].to(device)
            pos_ids = batch["positive"]["input_ids"].to(device)
            pos_mask = batch["positive"]["attention_mask"].to(device)
            neg_ids = batch["negative"]["input_ids"].to(device)
            neg_mask = batch["negative"]["attention_mask"].to(device)

            with autocast():
                anchor_embed = model.get_embedding(anchor_ids, anchor_mask)
                pos_embed = model.get_embedding(pos_ids, pos_mask)
                neg_embed = model.get_embedding(neg_ids, neg_mask)

                loss = loss_fn(anchor_embed, pos_embed, neg_embed)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            total_loss += loss.item()
            progress_bar.set_postfix({"Train Loss": loss.item()})

        avg_train_loss = total_loss / len(trainloader)

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch in testloader:
                anchor_ids = batch["anchor"]["input_ids"].to(device)
                anchor_mask = batch["anchor"]["attention_mask"].to(device)
                pos_ids = batch["positive"]["input_ids"].to(device)
                pos_mask = batch["positive"]["attention_mask"].to(device)
                neg_ids = batch["negative"]["input_ids"].to(device)
                neg_mask = batch["negative"]["attention_mask"].to(device)

                anchor_embed = model.get_embedding(anchor_ids, anchor_mask)
                pos_embed = model.get_embedding(pos_ids, pos_mask)
                neg_embed = model.get_embedding(neg_ids, neg_mask)

                loss = loss_fn(anchor_embed, pos_embed, neg_embed)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(testloader)

        # Save the best model based on validation loss
        if avg_val_loss < best_loss:
            best_loss = avg_val_loss
            torch.save(model.bert.state_dict(), "shared_weights_triplet_tiny.pth")

        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}")


In [None]:
# Load and preprocess the triplet dataset
data_triplet = pd.read_csv("/kaggle/input/context-positive-negative/context_gloss_triplets_all_negatives.csv")
data_triplet = data_triplet.rename(columns={
    'Context Sentence (Anchor)': 'anchor',
    'Gloss Definition (Positive)': 'positive',
    'Gloss Definition (Negative)': 'negative'
})
data_triplet['target_word'] = data_triplet['positive'].str.extract(r'^(\w+)\s*:')

triplet_df = data_triplet.dropna(subset=['anchor', 'positive', 'negative', 'target_word'])
triplet_df = triplet_df.astype(str)

train_data, test_data = train_test_split(triplet_df, test_size=0.2, random_state=42)
train_data = train_data.reset_index(drop=True)
test_data = test_data.reset_index(drop=True)

In [None]:
# Model setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "huawei-noah/TinyBERT_General_4L_312D"
model = TripletBERT(model_name=model_name)
model.bert.load_state_dict(torch.load("/kaggle/input/gloss-hypernymy-tinybert-pretrained/pytorch/default/1/shared_weights_gloss_hypernym_tiny.pth", map_location=device))

# Updated optimizer and loss
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

loss_fn = TripletLoss(margin=5.0)  # Lower margin helps smoother convergence

In [None]:
# Tokenizer and Dataloaders
tokenizer = AutoTokenizer.from_pretrained(model_name)
train_dataset = WSDTripletDataset(train_data, tokenizer, max_length=128)
test_dataset = WSDTripletDataset(test_data, tokenizer, max_length=128)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)


In [None]:
# # Model setup
# model_name = "huawei-noah/TinyBERT_General_4L_312D"
# model = TripletBERT(model_name=model_name)
# optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
# loss_fn = TripletLoss(margin=1)
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
# Train the model
train_triplet(
    model=model,
    trainloader=train_loader,
    testloader = test_loader,
    optimizer=optimizer,
    loss_fn=loss_fn,
    device=device,
    num_epochs=30
)


# Evaluation


In [None]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# model_name = "huawei-noah/TinyBERT_General_4L_312D"
# tokenizer = AutoTokenizer.from_pretrained(model_name)

# # Reinitialize model and load state_dict
# model = TripletBERT(model_name=model_name).to(device)
# model.load_state_dict(torch.load("/kaggle/input/triplet_tinybert_30_0.7m/pytorch/default/1/triplet_tinybert_25_0.7M.pth", map_location=device))
# model.eval()


In [None]:
import torch
import torch.nn.functional as F
from sklearn.metrics.pairwise import cosine_similarity

# Example anchor and positive sentences
anchor_sentence = "The cat is on the mat."
positive_sentence = "A cat is sitting on a mat."

# Tokenize the sentences
anchor_tokens = tokenizer(anchor_sentence, padding="max_length", truncation=True, max_length=128, return_tensors="pt")
positive_tokens = tokenizer(positive_sentence, padding="max_length", truncation=True, max_length=128, return_tensors="pt")

# Move tokens to the device
anchor_tokens = {k: v.to(device) for k, v in anchor_tokens.items()}
positive_tokens = {k: v.to(device) for k, v in positive_tokens.items()}

# Get the embeddings from the model
model.eval()  # Switch to evaluation mode
with torch.no_grad():
    anchor_embed = model.get_embedding(anchor_tokens['input_ids'], anchor_tokens['attention_mask'])
    positive_embed = model.get_embedding(positive_tokens['input_ids'], positive_tokens['attention_mask'])

# Convert embeddings to numpy for cosine similarity calculation
anchor_embed = anchor_embed.cpu().numpy()
positive_embed = positive_embed.cpu().numpy()

# Compute cosine similarity
cosine_sim = cosine_similarity(anchor_embed, positive_embed)
print(f"Cosine Similarity between anchor and positive sentence: {cosine_sim[0][0]:.4f}")


In [None]:
# Example negative sentence
negative_sentence = "A dog is barking outside."

# Tokenize the negative sentence
negative_tokens = tokenizer(negative_sentence, padding="max_length", truncation=True, max_length=128, return_tensors="pt")
negative_tokens = {k: v.to(device) for k, v in negative_tokens.items()}

model.eval()
# Get the negative embedding from the model
with torch.no_grad():
    negative_embed = model.get_embedding(negative_tokens['input_ids'], negative_tokens['attention_mask'])

# Convert to numpy for cosine similarity
negative_embed = negative_embed.cpu().numpy()

# Compute cosine similarity for both anchor-positive and anchor-negative
cosine_sim_pos = cosine_similarity(anchor_embed, positive_embed)
cosine_sim_neg = cosine_similarity(anchor_embed, negative_embed)

print(f"Cosine Similarity between anchor and positive sentence: {cosine_sim_pos[0][0]:.4f}")
print(f"Cosine Similarity between anchor and negative sentence: {cosine_sim_neg[0][0]:.4f}")
