# 2025 COMP90042 Project
*Make sure you change the file name with your group id.*

# Readme
*If there is something to be noted for the marker, please mention here.*

*If you are planning to implement a program with Object Oriented Programming style, please put those the bottom of this ipynb file*

# 1.DataSet Processing
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

In [None]:
import os
import json
# import random
import numpy as np
from typing import Dict, List, Tuple
import time
from tqdm import tqdm
from torch.utils.data import DataLoader
from sentence_transformers import SentenceTransformer, InputExample, losses, util
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import precision_recall_fscore_support
import faiss

# === Parameters ===
TRAIN_CLAIMS_PATH = "data/train-claims.json"
DEV_CLAIMS_PATH = "data/dev-claims.json"
EVIDENCE_PATH = "data/evidence.json"
FINETUNED_MODEL_PATH = "models/bge-finetuned"
BASE_MODEL_NAME = "BAAI/bge-base-en-v1.5"

# === Parameters ===
BATCH_SIZE = 16
EPOCHS = 1
TOP_K = 10
LIMIT_EVIDENCES = True
SIMILARITY_THRESHOLD = 0.90 # Used for selecting evidences
MAX_RESULTS = 5 # Min 1 evidence and max 5 evidences

# === Loaders ===
def load_claims(path: str) -> Dict:
    with open(path) as f:
        return json.load(f)

def load_evidences(path: str) -> Dict:
    with open(path) as f:
        return json.load(f)

Claim-Evidence Loader for Classification

In [None]:
import json
import torch
from torch.utils.data import Dataset, DataLoader
import time


class DynamicEvidenceDataset(Dataset):
    def __init__(self, eval_path, claim_path, evidence_path, tokenizer, max_len=512):
        self.eval_data = self.load_data(eval_path)
        self.claim_data = self.load_data(claim_path)
        self.evidence_data = self.load_data(evidence_path)
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.label_map = {'SUPPORTS': 0, 'REFUTES': 1, 'NOT_ENOUGH_INFO': 2, 'DISPUTED': 3}

    def load_data(self, path):
        with open(path, 'r') as f:
            return json.load(f)

    def __len__(self):
        return len(self.claim_data)

    def __getitem__(self, idx):
        claim_id = list(self.eval_data.keys())[idx]
        evidences = self.eval_data.get(claim_id, {}).get('evidences', [])

        # Fetch claim text
        claim_text = self.claim_data[claim_id]['claim_text']

        # Fetch evidence texts
        evidence_texts = [self.evidence_data.get(e_id, "") for e_id in evidences]
        evidence = " [SEP] ".join(evidence_texts)

        # Construct input text
        inputs = self.tokenizer("CLAIM: " + claim_text + " [SEP] EVIDENCE: " + evidence,
                                truncation=True, padding='max_length',
                                max_length=self.max_len, return_tensors='pt')

        inputs = {k: v.squeeze(0) for k, v in inputs.items()}
        inputs['labels'] = self.label_map[self.claim_data[claim_id]['claim_label']]
        return inputs


def create_dataloader(eval_path, claim_path, evidence_path, tokenizer, batch_size=16, max_len=512):
    dataset = DynamicEvidenceDataset(eval_path, claim_path, evidence_path, tokenizer, max_len)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    return dataloader

# 2. Model Implementation
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

Evidence Retrieval

In [None]:
def faiss_candidates(
    claims: Dict[str, dict], evidences: Dict[str, str],
    model: SentenceTransformer, top_k: int
) -> Tuple[List[str], List[str], Dict[str, List[str]]]:
    # Filter evidences to only those mentioned in the claims
    if LIMIT_EVIDENCES:
        used_evidence_ids = set()
        for c in claims.values():
            used_evidence_ids.update(c["evidences"])

        # Add 5,000 random evidence IDs that are not not already used
        additional_ids = list(set(evidences.keys()) - used_evidence_ids)
        additional_ids = additional_ids[:5000]

        all_ids = list(used_evidence_ids) + additional_ids
        all_ids = [eid for eid in all_ids if eid in evidences and evidences[eid]]

        evidences = {eid: evidences[eid] for eid in all_ids}

    claim_ids = list(claims)
    claim_texts = [claims[cid]["claim_text"] for cid in claim_ids]
    evidence_ids, evidence_texts = zip(*[(eid, txt) for eid, txt in evidences.items() if txt]) if evidences else ([], [])

    emb_e = model.encode(evidence_texts, batch_size=64, show_progress_bar=True,
                            convert_to_numpy=True, normalize_embeddings=True, device='cpu')
    os.makedirs("cache", exist_ok=True)

    emb_c = model.encode(claim_texts, batch_size=64, show_progress_bar=True,
                         convert_to_numpy=True, normalize_embeddings=True, device='cpu')

    d = emb_e.shape[1]
    index = faiss.IndexHNSWFlat(d, 32)
    index.hnsw.efConstruction = 200
    index.verbose = True

    # Index creation
    print("Training FAISS index")
    start = time.time()
    # INFO: We can use indexes that require training (search faster but may be less accurate) or we can use indexes that don't need training.

    # TODO: Can probably remove this
    # train_sample = emb_e[np.random.choice(len(emb_e), size=10_000, replace=False)]
    # index.train(train_sample)
    print(f"Index trained in {time.time() - start:.2f} seconds")

    print("Adding embeddings to FAISS index in batches")

    for i in range(0, len(emb_e), 100):
        try:
            batch = emb_e[i:i+100]
            index.add(batch)
            print(f"Added {i + len(batch)} / {len(emb_e)} embeddings", flush=True)
        except Exception as e:
            print(f"Failed at batch {i}: {e}")
            break

    print(f"Finished adding all vectors in {time.time() - start:.2f} seconds")

    print("Performing FAISS search (per claim)")
    start = time.time()

    I_all = []

    for emb in tqdm(emb_c, desc="Searching claims"):
        sim_scores, I = index.search(np.expand_dims(emb, axis=0), top_k)
        paired = list(zip(I[0], sim_scores[0]))

        # Filter by similarity threshold
        filtered = [idx for idx, score in paired if score >= SIMILARITY_THRESHOLD]

        # If nothing passes threshold, take the best one as we want at-least 1 evidence per claim
        if not filtered:
            filtered = [paired[0][0]] if paired else []

        # Limit to max results (5 evidences)
        filtered = filtered[:MAX_RESULTS]
        I_all.append(filtered)

    print(f"Search done in {time.time() - start:.2f} seconds")

    # Create dict structure for the results
    cand_map: Dict[str, List[str]] = {
        claim_ids[i]: [
            evidence_ids[j] for j in I_all[i] if j < len(evidence_ids)
        ] for i in range(len(claim_ids))
    }

    return claim_ids, claim_texts, cand_map

def train_model(model: SentenceTransformer, dataloader, loss_func, output_path: str, epochs: int = 1):
    for epoch in range(epochs):
        print(f"\n🔁 Epoch {epoch + 1}/{epochs}")
        model.fit(
            train_objectives=[(dataloader, loss_func)],
            epochs=1,
            warmup_steps=100,
            output_path=output_path,
            show_progress_bar=True
        )

        model.save(output_path)
        print(f"✅ Finished training Epoch {epoch + 1}")


Claim Classification

In [None]:
import json
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from transformers import BertTokenizer, BertForSequenceClassification
import time


def evaluate_model(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []
    start_time = time.time()

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to('cuda')
            attention_mask = batch['attention_mask'].to('cuda')
            labels = batch['labels'].to('cuda')

            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            _, preds = torch.max(logits, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    end_time = time.time()
    inference_time = end_time - start_time

    return all_preds, all_labels, inference_time


def run_evaluation(eval_path, claim_path, evidence_path, model, tokenizer,output_path, batch_size=16, max_len=512):

    label_map = {0: 'SUPPORTS', 1: 'REFUTES', 2: 'NOT_ENOUGH_INFO', 3: 'DISPUTED'}

    dataloader = create_dataloader(eval_path, claim_path, evidence_path, tokenizer, batch_size, max_len)
    preds, labels, inference_time = evaluate_model(model, dataloader)

    # Calculate metrics
    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average='weighted')
    recall = recall_score(labels, preds, average='weighted')
    f1 = f1_score(labels, preds, average='weighted')
    accuracy = accuracy_score(labels, preds)

    # Prepare output dictionary (key: claim_id)
    output_data = {}
    eval_data = json.load(open(eval_path))

    for idx, claim_id in enumerate(eval_data.keys()):
        output_data[claim_id] = {
            "evidences": eval_data[claim_id]["evidences"],
            "claim_label": label_map[int(preds[idx])]
        }

    with open(output_path, 'w') as f:
        json.dump(output_data, f, indent=4)

    print(f"Predictions saved to {output_path}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Total Inference Time: {inference_time:.2f} seconds")



# 3.Testing and Evaluation
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

Evidence Retrieval

In [None]:
def evaluate(pred: dict, actual: dict):
    # For each claim, get the set of gold and predicted evidence IDs
    gold_sets = [set(actual[c]["evidences"]) for c in actual]
    pred_sets = [set(pred.get(c, {}).get("evidences", [])) for c in actual]

    # Fit the label binarizer on the gold evidence universe
    mlb = MultiLabelBinarizer()
    y_true = mlb.fit_transform(gold_sets)

    valid_labels = set(mlb.classes_)  # only evidence IDs that appear in gold

    # Filter predictions to only include valid evidence IDs (safe for shared use)
    pred_sets_filtered = [
        [eid for eid in preds if eid in valid_labels]
        for preds in pred_sets
    ]
    y_pred = mlb.transform(pred_sets_filtered)

    # Micro-averaged precision/recall/F1 (shared evidences per-claim are handled naturally)
    prec, rec, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average="micro", zero_division=0
    )

    return rec, prec, f1

def predictions(cand_map: Dict[str, List[str]]) -> Dict[str, dict]:
    return {cid: {"evidences": eids} for cid, eids in cand_map.items()}

def compute_recall_f1(model: SentenceTransformer, dev_claims_path: str, evidence_path: str, top_k: int = 10, save_path: str = None):
    dev_claims = load_claims(dev_claims_path)
    evidences = load_evidences(evidence_path)

    _, _, cand_map = faiss_candidates(dev_claims, evidences, model, top_k)
    pred = predictions(cand_map)

    if save_path:
        with open(save_path, "w") as f:
            json.dump(pred, f, indent=2)
        print(f"Saved predictions to {save_path}")

    return evaluate(pred, dev_claims)

# === Prepare training data ===
print("Loading training data")
claims = load_claims(TRAIN_CLAIMS_PATH)
print("Loading evidences data")
evidences = load_evidences(EVIDENCE_PATH)
evidence_ids = list(evidences.keys())


# Get training examples (generate both positive and negative examples)
train_examples = []
for cid, cdata in tqdm(claims.items(), desc="🔧 Building training examples"):
    claim_text = cdata["claim_text"]
    positive_eids = cdata["evidences"]
    positive_texts = [evidences[eid] for eid in positive_eids if eid in evidences]
    if not positive_texts:
        continue
    for pos_text in positive_texts:
        train_examples.append(InputExample(texts=[claim_text, pos_text]))

print("Loading model.")
if os.path.exists(FINETUNED_MODEL_PATH):
    print(f"Continuing from: {FINETUNED_MODEL_PATH}")
    model = SentenceTransformer(FINETUNED_MODEL_PATH)
else:
    print(f"Starting from base model: {BASE_MODEL_NAME}")
    model = SentenceTransformer(BASE_MODEL_NAME)

train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=BATCH_SIZE)
train_loss = losses.MultipleNegativesRankingLoss(model)

try:
    for epoch in range(EPOCHS):
        train_model(model, train_dataloader, train_loss, FINETUNED_MODEL_PATH, epochs=EPOCHS)

        model.save(FINETUNED_MODEL_PATH)
        print(f"\nModel saved to: {FINETUNED_MODEL_PATH}")

        # Evaluate on dev set
        recall, precision, f1 = compute_recall_f1(model, DEV_CLAIMS_PATH, EVIDENCE_PATH, top_k=TOP_K, save_path="MyPredictions")
        print(f"[Epoch {epoch+1}] Recall@{TOP_K}: {recall:.4f}, Precision: {precision:.4f}, F1: {f1:.4f}")
except Exception as e:
    print(f"Error during training: {e}")


Claim Classification with BERT model

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification

EVAL_PATH = "MyPredictions"
STATE_DICT_PATH = "Pretrained_model/new_bert_model_Autocast_explicitMarker_LR5e05.pt"
OUTPUT_PATH = "Prediction/bert_prediction.json"

batch_size = 16
max_len = 512

def load_model_and_tokenizer(state_dict_path):
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
    model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=4)
    if state_dict_path:
        state_dict = torch.load(state_dict_path)
        model.load_state_dict(state_dict)
        print("successfully loaded finetuned BERT model")
    model.to('cuda')
    return model, tokenizer

# Load model and tokenizer
model, tokenizer = load_model_and_tokenizer(STATE_DICT_PATH)

# Run evaluation
run_evaluation(EVAL_PATH, DEV_CLAIMS_PATH, EVIDENCE_PATH, model, tokenizer,OUTPUT_PATH, batch_size, max_len)

Claim Classification with deBERTa-v3 model

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

EVAL_PATH = "MyPredictions"
STATE_DICT_PATH = "Pretrained_model/deBERTa_v3_best_model.pt"
OUTPUT_PATH = "Prediction/deBERTa_prediction.json"

batch_size = 4
max_len = 512

def load_model_and_tokenizer(state_dict_path):
    tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-v3-base")
    model = AutoModelForSequenceClassification.from_pretrained("microsoft/deberta-v3-base", num_labels=4)
    if state_dict_path:
        state_dict = torch.load(state_dict_path)
        model.load_state_dict(state_dict)
        print("successfully loaded finetuned deBERTa model")
    model.to('cuda')
    return model, tokenizer

# Load model and tokenizer
model, tokenizer = load_model_and_tokenizer(STATE_DICT_PATH)

# Run evaluation
run_evaluation(EVAL_PATH, DEV_CLAIMS_PATH, EVIDENCE_PATH, model, tokenizer,OUTPUT_PATH, batch_size, max_len)

## Object Oriented Programming codes here

*You can use multiple code snippets. Just add more if needed*