In [None]:
!pip install transformers
!pip install datasets
!pip install torch torchvision torchaudio

In [None]:
import os
import pandas as pd
from argparse import ArgumentParser
from tqdm import tqdm
from pathlib import Path

import torch as ch
import torch.nn as nn
from torch.utils.data import DataLoader

# Huggingface imports
from datasets import load_dataset
from transformers import (
    AutoConfig,
    AutoModelForSequenceClassification,
    AutoTokenizer,
    default_data_collator,
)

# Configuration
GLUE_TASK_TO_KEYS = {
    "qnli": ("question", "sentence"),
    "mnli": ("premise", "hypothesis"),
    "mrpc": ("sentence1", "sentence2"),
    "qnli": ("question", "sentence"),
    "qqp": ("question1", "question2"),
    "rte": ("sentence1", "sentence2"),
    "sst2": ("sentence", None),
    "stsb": ("sentence1", "sentence2"),
    "wnli": ("sentence1", "sentence2"),
}


# Adjust dataset size as needed for Colab
TRAIN_SET_SIZE = 50_000  # Reduced for Colab memory constraints
VAL_SET_SIZE = 5_463


class SequenceClassificationModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.config = AutoConfig.from_pretrained(
            "google-bert/bert-base-cased",
            num_labels=2,
            finetuning_task="qnli",
            attn_implementation="eager",
        )

        self.model = AutoModelForSequenceClassification.from_pretrained(
            "google-bert/bert-base-cased",
            config=self.config,
            ignore_mismatched_sizes=False,
        )

        # Check if GPU is available
        self.device = "cuda" if ch.cuda.is_available() else "cpu"
        self.model.eval().to(self.device)

    def forward(self, input_ids, token_type_ids, attention_mask):
        return self.model(
            input_ids=input_ids,
            token_type_ids=token_type_ids,
            attention_mask=attention_mask,
        ).logits


def get_dataset(split, inds=None):
    raw_datasets = load_dataset("glue", "qnli")
    sentence1_key, sentence2_key = GLUE_TASK_TO_KEYS["qnli"]

    tokenizer = AutoTokenizer.from_pretrained(
        "google-bert/bert-base-cased", use_fast=True
    )

    def preprocess_function(examples):
        args = (examples[sentence1_key], examples[sentence2_key])
        return tokenizer(*args, padding="max_length", max_length=128, truncation=True)

    raw_datasets = raw_datasets.map(
        preprocess_function,
        batched=True,
        desc="Running tokenizer on dataset",
    )

    if split == "train":
        ds = raw_datasets["train"]
    else:
        ds = raw_datasets["validation"]
    return ds


def init_model(ckpt_path=None):
    model = SequenceClassificationModel()
    if ckpt_path and os.path.exists(ckpt_path):
        sd = ch.load(ckpt_path, map_location=model.device)
        model.model.load_state_dict(sd)
    return model


def init_loaders(batch_size=16):
    ds_train = get_dataset("train").select(range(TRAIN_SET_SIZE))
    ds_val = get_dataset("val").select(range(VAL_SET_SIZE))
    return (
        DataLoader(
            ds_train,
            batch_size=batch_size,
            shuffle=False,
            collate_fn=default_data_collator,
        ),
        DataLoader(
            ds_val,
            batch_size=batch_size,
            shuffle=False,
            collate_fn=default_data_collator,
        ),
    )


def process_batch(batch, device):
    return [
        x.to(device)
        for x in [
            batch["input_ids"],
            batch["token_type_ids"],
            batch["attention_mask"],
            batch["labels"],
        ]
    ]


if __name__ == "__main__":
    model = init_model()
    train_loader, val_loader = init_loaders(batch_size=16)

In [None]:
def predict_custom_text(model, tokenizer, sentence1, sentence2=None):
    """
    Predicts the class of the given custom text input using the model.

    Args:
        model (SequenceClassificationModel): The sequence classification model.
        tokenizer (AutoTokenizer): The tokenizer used for preprocessing text.
        sentence1 (str): The first text input (e.g., question or sentence).
        sentence2 (str, optional): The second text input (e.g., sentence or hypothesis). Defaults to None.

    Returns:
        int: Predicted class label (e.g., 0 or 1).
    """
    # Tokenize the input
    inputs = tokenizer(
        sentence1,
        sentence2,
        padding="max_length",
        max_length=128,
        truncation=True,
        return_tensors="pt",
    )

    # Move inputs to the same device as the model
    inputs = {key: val.to(model.device) for key, val in inputs.items()}

    # Perform inference
    with ch.no_grad():
        logits = model(**inputs)
        predictions = ch.argmax(logits, dim=1)  # Get the predicted class

    return predictions.item()

In [None]:
import torch as ch

df = pd.read_csv('bert_trak_scores.csv')
df.head()

In [None]:
def get_context(validation_index):
    questions = df[df["val_index"] == validation_index]["question"].tolist()
    sentences = df[df["val_index"] == validation_index]["sentence"].tolist()
    labels = df[df["val_index"] == validation_index]["train_label"].tolist()

    contexts = [f"{q} {s}: {'ENTAILMENT' if l == 0 else 'NOT_ENTAILMENT'}" for q, s, l in zip(questions, sentences, labels)]
    contexts = "\n".join(contexts)

    return contexts


def evaluate_model(model, data_loader):
    correct = 0
    correct_with_prompt_engineering = 0
    total = 0

    data = {"input": [], "prediction": [], "prediction_pe": [], "label": []}

    with ch.no_grad():
        for batch_idx, batch in enumerate(tqdm(data_loader)):
            input_ids, token_type_ids, attention_mask, labels = process_batch(
                batch, model.device
            )

            logits = model(input_ids, token_type_ids, attention_mask)
            predictions = ch.argmax(logits, dim=-1)

            correct += (predictions == labels).sum().item()
            total += labels.size(0)

            tokenizer = AutoTokenizer.from_pretrained(
                "google-bert/bert-base-cased", use_fast=True
            )
            input_text = tokenizer.batch_decode(input_ids, skip_special_tokens=True)
            labels = labels.cpu().numpy()
            predictions = predictions.cpu().numpy()

            for i in range(len(input_text)):
                validation_index = batch_idx * data_loader.batch_size + i

                data["input"].append(input_text[i])
                data["prediction"].append(predictions[i].item())
                data["label"].append(labels[i].item())

                # Check if the prediction is correct with prompt engineering
                context = get_context(validation_index)
                prediction_pe = predict_custom_text(model, tokenizer, context)
                data["prediction_pe"].append(prediction_pe)

                if prediction_pe == labels[i]:
                    correct_with_prompt_engineering += 1

    df = pd.DataFrame(data)
    df.to_csv("bert_predictions.csv", index=False)

    accuracy = correct / total
    accuracy_with_prompt_engineering = correct_with_prompt_engineering / total

    print(f"\nAccuracy without prompt engineering: {accuracy:.4f}")
    print(f"Accuracy with prompt engineering: {accuracy_with_prompt_engineering:.4f}")
    return accuracy, accuracy_with_prompt_engineering


if __name__ == "__main__":
    print("Evaluating validation set...")
    val_accuracy, val_accuracy_with_prompt_engineering = evaluate_model(
        model, val_loader
    )

In [None]:
prediction_data = pd.read_csv('bert_predictions.csv')

acc = (prediction_data['prediction'] == prediction_data['label']).sum() / len(prediction_data)
acc_pe = (prediction_data['prediction_pe'] == prediction_data['label']).sum() / len(prediction_data)

print(f"Accuracy without prompt engineering: {acc}")
print(f"Accuracy with prompt engineering: {acc_pe}")