In [None]:
!pip install -U datasets

In [None]:
import numpy as np
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from datasets import load_dataset
import random
import tensorflow as tf
from tqdm import tqdm
from tensorflow.keras.utils import Sequence
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Lambda, Dense, Dropout, Concatenate, Embedding
from tensorflow.keras.utils import Sequence
import time
import string
import json
import re
import nltk
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertModel, BertTokenizer, RobertaModel, RobertaTokenizer
from torch.optim import AdamW
from tqdm import tqdm
import torch.nn.functional as F
nltk.download('punkt')
nltk.download('punkt_tab')
from nltk.tokenize import sent_tokenize
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize
import pickle
import spacy
import pandas as pd

# Train model

In [None]:
swap = False
number_of_train_samples = "Full"
number_of_test_samples = "Full"
epochs = 1
train_batch_size = 32
test_batch_size = 32

In [None]:
def swap_items_at_random_indices(list1, list2):
    assert len(list1) == len(list2), "Lists must be of the same length"
    num_swaps = int(len(list1)/2)
    indices = random.sample(range(len(list1)), num_swaps)

    for i in indices:
        list1[i], list2[i] = list2[i], list1[i]

    return list1, list2

snli_dataset = load_dataset("stanfordnlp/snli")
mnli_dataset = load_dataset("nyu-mll/multi_nli")
snli_train_dataset = snli_dataset["train"]
snli_test_dataset = snli_dataset["test"]
snli_train_dataset = snli_train_dataset.filter(lambda example: example["label"] != -1)
snli_test_dataset = snli_test_dataset.filter(lambda example: example["label"] != -1)
mnli_train_dataset = mnli_dataset["train"]
mnli_test_dataset = mnli_dataset["validation_matched"]
mnli_test_mismatch_dataset = mnli_dataset["validation_mismatched"]
mnli_train_dataset = mnli_train_dataset.filter(lambda example: example["label"] != -1)
mnli_test_dataset = mnli_test_dataset.filter(lambda example: example["label"] != -1)
mnli_test_mismatch_dataset = mnli_test_mismatch_dataset.filter(lambda example: example["label"]!=-1)


snli_train_premises = list(snli_train_dataset["premise"])
snli_train_hypothesis = list(snli_train_dataset["hypothesis"])
snli_train_labels = list(snli_train_dataset["label"])

snli_test_premises = list(snli_test_dataset["premise"])
snli_test_hypothesis = list(snli_test_dataset["hypothesis"])
snli_test_labels = list(snli_test_dataset["label"])

mnli_train_premises = list(mnli_train_dataset["premise"])
mnli_train_hypothesis = list(mnli_train_dataset["hypothesis"])
mnli_train_labels = list(mnli_train_dataset["label"])

mnli_test_premises = list(mnli_test_dataset["premise"])
mnli_test_hypothesis = list(mnli_test_dataset["hypothesis"])
mnli_test_labels = list(mnli_test_dataset["label"])

mnli_test_mismatched_premises = list(mnli_test_mismatch_dataset["premise"])
mnli_test_mismatched_hypothesis = list(mnli_test_mismatch_dataset["hypothesis"])
mnli_test_mismatched_labels = list(mnli_test_mismatch_dataset["label"])


all_train_premises =  snli_train_premises + mnli_train_premises
all_train_hypothesis =  snli_train_hypothesis + mnli_train_hypothesis
if swap:
  all_train_premises, all_train_hypothesis = swap_items_at_random_indices(all_train_premises, all_train_hypothesis)

all_test_premises =  mnli_test_premises#snli_test_premises #+ mnli_test_premises #+mnli_test_mismatched_premises # + mnli_test_premises
all_test_hypothesis = mnli_test_hypothesis# snli_test_hypothesis #+ mnli_test_hypothesis # mnli_test_mismatched_hypothesis #+ mnli_test_hypothesis
if swap:
  all_test_premises, all_test_hypothesis = swap_items_at_random_indices(all_test_premises, all_test_hypothesis)

all_train_labels =  snli_train_labels + mnli_train_labels
all_test_labels =  mnli_test_labels# snli_test_labels # + mnli_test_labels # mnli_test_mismatched_labels #+ mnli_test_labels


if number_of_train_samples != "Full":
    train_indices = random.sample(range(len(all_train_premises)), number_of_train_samples)
    all_train_premises = [all_train_premises[i] for i in train_indices]
    all_train_hypothesis = [all_train_hypothesis[i] for i in train_indices]
    all_train_labels = [all_train_labels[i] for i in train_indices]

if number_of_test_samples != "Full":
    test_indices = random.sample(range(len(all_test_premises)), number_of_test_samples)
    all_test_premises = [all_test_premises[i] for i in test_indices]
    all_test_hypothesis = [all_test_hypothesis[i] for i in test_indices]
    all_test_labels = [all_test_labels[i] for i in test_indices]

In [None]:
# 1. Dataset class
class SNLIDataset(Dataset):
    def __init__(self, premises, hypotheses, labels, tokenizer, max_length=128):
        self.premises = premises
        self.hypotheses = hypotheses
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        enc_a = self.tokenizer(
            self.premises[idx],
            padding='max_length', truncation=True,
            max_length=self.max_length, return_tensors='pt'
        )
        enc_b = self.tokenizer(
            self.hypotheses[idx],
            padding='max_length', truncation=True,
            max_length=self.max_length, return_tensors='pt'
        )
        return {
            'input_ids_a': enc_a['input_ids'].squeeze(0),
            'attention_mask_a': enc_a['attention_mask'].squeeze(0),
            'input_ids_b': enc_b['input_ids'].squeeze(0),
            'attention_mask_b': enc_b['attention_mask'].squeeze(0),
            'label': torch.tensor(self.labels[idx], dtype=torch.long)
        }

# 2. Siamese BERT with MLP
class SiameseBertClassifier(nn.Module):
    def __init__(self, pretrained_model='bert-base-uncased', num_labels=3):
        super(SiameseBertClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(pretrained_model)
        for param in self.bert.parameters():
          param.requires_grad = False
        for i in range(8,12):
          for param in self.bert.encoder.layer[i].parameters():
              param.requires_grad = True
        hidden_size = self.bert.config.hidden_size  # 768
        combined_dim = hidden_size * 4  # u, v, |u-v|, u*v

        self.classifier = nn.Sequential(
            nn.Linear(combined_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, num_labels)
        )

    def forward(self, input_ids_a, attention_mask_a, input_ids_b, attention_mask_b):
        u = self.bert(input_ids=input_ids_a, attention_mask=attention_mask_a).last_hidden_state[:, 0, :]
        v = self.bert(input_ids=input_ids_b, attention_mask=attention_mask_b).last_hidden_state[:, 0, :]

        abs_diff = torch.abs(u - v)
        elem_mult = u * v
        combined = torch.cat([u, v, abs_diff, elem_mult], dim=1)

        return self.classifier(combined)

In [None]:
# 3. Evaluation function
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

def evaluate(model, dataloader, device):
    model.eval()
    all_preds = []
    total, correct = 0, 0
    with torch.no_grad():
        for batch in dataloader:
            input_ids_a = batch['input_ids_a'].to(device)
            attention_mask_a = batch['attention_mask_a'].to(device)
            input_ids_b = batch['input_ids_b'].to(device)
            attention_mask_b = batch['attention_mask_b'].to(device)
            labels = batch['label'].to(device)

            logits = model(input_ids_a, attention_mask_a, input_ids_b, attention_mask_b)
            preds = torch.argmax(logits, dim=1)
            for pred in preds:
              all_preds.append(pred)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    acc = correct / total
    print(f"Accuracy: {acc:.4f}")
    return all_preds

# 4. Training function
def train(model, train_loader, test_loader, optimizer, device, epochs=3):
    loss_fn = nn.CrossEntropyLoss()
    for epoch in range(epochs):
        model.train()
        print(f"Epoch {epoch + 1}")
        loop = tqdm(train_loader, leave=True)
        for batch in loop:
            input_ids_a = batch['input_ids_a'].to(device)
            attention_mask_a = batch['attention_mask_a'].to(device)
            input_ids_b = batch['input_ids_b'].to(device)
            attention_mask_b = batch['attention_mask_b'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            logits = model(input_ids_a, attention_mask_a, input_ids_b, attention_mask_b)
            loss = loss_fn(logits, labels)
            loss.backward()
            optimizer.step()
        print("Test")
        evaluate(model, test_loader, device)
        print("Train")
        evaluate(model, train_loader, device)


# 5. Example usage
if __name__ == "__main__":
    #tokenizer = BertTokenizer.from_pretrained("roberta-large")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Dummy data for demonstration
    train_premises = all_train_premises
    train_hypotheses = all_train_hypothesis
    train_labels = all_train_labels  # entailment=0, neutral=1, contradiction=2

    test_premises = all_test_premises
    test_hypotheses = all_test_hypothesis
    test_labels = all_test_labels  # entailment, contradiction

    # # Dataset and DataLoader
    train_dataset = SNLIDataset(train_premises, train_hypotheses, train_labels, tokenizer)
    test_dataset = SNLIDataset(test_premises, test_hypotheses, test_labels, tokenizer)

    train_loader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)

    # Model + Optimizer
    model = SiameseBertClassifier(num_labels=3).to(device)
    optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=2e-5)

    # Train
    train(model, train_loader, test_loader, optimizer, device, epochs=epochs)


# Inference

In [None]:
import pandas as pd
data = pd.read_excel("./Candidate_sentence_pairs.xlsx", sheet_name=0)
all_i = data["sentence_i"].to_list()
all_j = data["sentence_j"].to_list()
all_i = all_premise
all_j = all_hypothesis
all_i_embedding = []
all_j_embedding = []
batch_size = 2
for i in tqdm(range(0,len(all_i), batch_size)):
  selected_sentences_i = all_i[i:i+batch_size]
  selected_sentences_j = all_j[i:i+batch_size]
  selected_tokens_i = tokenizer(selected_sentences_i, return_tensors="pt", truncation=True, padding=True, max_length=128).to(device)
  selected_tokens_j = tokenizer(selected_sentences_j, return_tensors="pt", truncation=True, padding=True, max_length=128).to(device)
  with torch.no_grad():
    outputs_i = model.bert(**selected_tokens_i)
    embeddings = outputs_i.last_hidden_state[:, 0, :]
    all_i_embedding.append(embeddings.cpu())
  with torch.no_grad():
    outputs_j = model.bert(**selected_tokens_j)
    embeddings = outputs_j.last_hidden_state[:, 0, :]
    all_j_embedding.append(embeddings.cpu())
all_i_embedding = torch.cat(all_i_embedding, dim=0)
all_j_embedding = torch.cat(all_j_embedding, dim=0)

all_j_embedding = all_j_embedding.to(device)
all_i_embedding = all_i_embedding.to(device)

abs_diff = torch.abs(all_i_embedding - all_j_embedding)
elem_mult = (all_i_embedding * all_j_embedding).to(device)
combined = torch.cat([all_i_embedding,all_j_embedding, abs_diff, elem_mult], dim=1).to(device)
logits = model.classifier(combined)
probs = F.softmax(logits, dim=-1)
all_probs = [probs]


logits = probs.argmax(dim=1).cpu()
to_label = {0:"entailment", 1:"neutral", 2:"contradiction"}
labels = []
for logit in logits:
  #labels.append(to_label[logit.item()])
  labels.append(logit.item())


In [None]:
data["label"] = labels
data.to_csv("edited_data_with_label.csv", index=False)