In [1]:
# Import all the packages
import numpy as np
import json
from collections import Counter
from sentence_transformers import SentenceTransformer
import pickle
import faiss
from tqdm import tqdm
import nltk
import re
from datasets import Dataset
from nltk.tokenize import sent_tokenize
nltk.download('punkt')
nltk.download('punkt_tab')




[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\DELL\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\DELL\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

## Data processing

In [2]:
from transformers import BertTokenizer
from torch.utils.data import Dataset
import torch

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

label2id = {
    "SUPPORTS": 0,
    "REFUTES": 1,
    "NOT_ENOUGH_INFO": 2,
    "DISPUTED": 3
}

class ClaimEvidenceDataset(Dataset):
    def __init__(self, claims, evidence_dict, tokenizer, max_length=512):
        self.encodings = []
        self.labels = []
        for claim_data in claims.values():
            claim_text = claim_data["claim_text"]
            label_str = claim_data["claim_label"]
            for eid in claim_data.get("evidences", []):
                if eid in evidence_dict:
                    evidence_text = evidence_dict[eid]
                    encoded = tokenizer(
                        claim_text,
                        evidence_text,
                        padding="max_length",
                        truncation=True,
                        max_length=max_length,
                        return_tensors="pt"
                    )
                    self.encodings.append({k: v.squeeze() for k, v in encoded.items()})
                    self.labels.append(label2id[label_str])

    def __getitem__(self, idx):
        item = self.encodings[idx]
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


In [4]:
import json, random
from collections import defaultdict, Counter
import torch
from torch.utils.data import Dataset
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
PAD, MASK = tokenizer.pad_token_id, tokenizer.mask_token_id

label2id = {"SUPPORTS": 0, "REFUTES": 1, "NOT_ENOUGH_INFO": 2, "DISPUTED": 3}
id2label = {v: k for k, v in label2id.items()}


# Dataset class for claim and evidence
class ClaimEvidenceDataset(Dataset):
    """
    - balance=True  : Balance all labels to max_count * target_ratio
    - augmenters    : Optional {'dropout','swap','pad','cutmix'}
    - aug_params    : Hyperparameters for each method
    """
    def __init__(self,
                 claims: dict,
                 evidence_dict: dict,
                 tokenizer,
                 max_length: int = 512,
                 balance: bool = True,
                 target_ratio: float = 1.0,
                 augmenters=None,
                 aug_params=None,
                 seed: int = 42):

        random.seed(seed)
        self.tokenizer, self.max_length = tokenizer, max_length
        self.encodings, self.labels = [], []
        self.augmenters = set(augmenters or ['dropout', 'swap', 'pad', 'cutmix'])

        # Default hyperparameters
        _default = dict(dropout_prob=0.15,
                        swap_prob=0.10,
                        pad_prob=0.05,
                        cutmix_min=0.3,
                        cutmix_max=0.7)
        self.aug_params = {**_default, **(aug_params or {})}

        # --------- Original sample encoding --------- #
        for cdict in claims.values():
            claim_text = cdict["claim_text"]
            lab = label2id[cdict["claim_label"]]
            for eid in cdict.get("evidences", []):
                if eid in evidence_dict:
                    evi = evidence_dict[eid]
                    toks = tokenizer(claim_text, evi,
                                     truncation=True,
                                     padding="max_length",
                                     max_length=max_length,
                                     return_tensors="pt")
                    self.encodings.append({k: v.squeeze(0) for k, v in toks.items()})
                    self.labels.append(lab)

        # --------- Oversample with online augmentation --------- #
        if balance:
            self._balance_dataset(target_ratio)

    # ======= Dataset API ======= #
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {k: v.clone() for k, v in self.encodings[idx].items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    # ======= Balance samples by different labels ======= #
    def _balance_dataset(self, target_ratio: float):
        by_label = defaultdict(list)
        for i, y in enumerate(self.labels):
            by_label[y].append(i)

        max_count = int(max(len(v) for v in by_label.values()) * target_ratio)

        for lab, idx_list in by_label.items():
            need = max_count - len(idx_list)
            for _ in range(max(0, need)):
                base_idx = random.choice(idx_list)
                base_enc = self.encodings[base_idx]
                aug_enc = self._augment_encoding(base_enc, lab)
                self.encodings.append(aug_enc)
                self.labels.append(lab)

    # ======= Augment a single sample ======= #
    def _augment_encoding(self, enc, lab):
        enc = {k: v.clone() for k, v in enc.items()} 
        choice = random.choice(list(self.augmenters))
        if choice == 'dropout':
            self._token_dropout(enc)
        elif choice == 'swap':
            self._swap_neighbor(enc)
        elif choice == 'pad':
            self._random_pad(enc)
        elif choice == 'cutmix':
            self._cutmix(enc, lab)
        return enc

    # ----- 1. Random token dropout -----
    def _token_dropout(self, enc):
        ids = enc['input_ids']
        mask = torch.rand_like(ids.float()) < self.aug_params['dropout_prob']
        ids[mask & (ids != PAD)] = MASK
        enc['input_ids'] = ids

    # ----- 2. Swap neighbouring tokens -----
    def _swap_neighbor(self, enc):
        ids = enc['input_ids']
        for i in range(1, len(ids) - 1):
            if random.random() < self.aug_params['swap_prob'] and ids[i] not in (PAD, MASK):
                ids[i], ids[i + 1] = ids[i + 1].clone(), ids[i].clone()
        enc['input_ids'] = ids

    # ----- 3. Random inner padding -----
    def _random_pad(self, enc):
        ids, mask = enc['input_ids'], enc['attention_mask']
        pad_prob = self.aug_params['pad_prob']
        new_ids, new_mask = [], []
        for tok, m in zip(ids, mask):
            if m.item() == 0:  
                break
            new_ids.append(tok.item())
            new_mask.append(1)
            if random.random() < pad_prob and len(new_ids) < self.max_length - 1:
                new_ids.append(PAD)
                new_mask.append(0)
        # Truncate / pad with PAD at the end
        new_ids = (new_ids + [PAD] * self.max_length)[:self.max_length]
        new_mask = (new_mask + [0] * self.max_length)[:self.max_length]
        enc['input_ids'] = torch.tensor(new_ids, dtype=torch.long)
        enc['attention_mask'] = torch.tensor(new_mask, dtype=torch.long)

    # ----- 4. CutMix (same label) -----
    def _cutmix(self, enc, lab):
        # Randomly select another sample with the same label
        same_idxs = [i for i, y in enumerate(self.labels) if y == lab]
        other = {k: v.clone() for k, v in self.encodings[random.choice(same_idxs)].items()}
        lam = random.uniform(self.aug_params['cutmix_min'],
                             self.aug_params['cutmix_max'])
        cut_point = int(lam * self.max_length)
        # Take the current half and the other half
        enc['input_ids'][cut_point:] = other['input_ids'][cut_point:]
        enc['attention_mask'][cut_point:] = other['attention_mask'][cut_point:]




## BERT finetuning

In [8]:
model_path = "./model/my_bert_classifier"

In [None]:
from transformers import BertForSequenceClassification, Trainer, TrainingArguments

# load data
with open('./data/train-claims.json', 'r') as f:
    train_claims = json.load(f)
with open('./data/evidence.json', 'r') as f:
    evidence_dict = json.load(f)


# 3. Construct the dataset
train_dataset = train_dataset = ClaimEvidenceDataset(
    claims=train_claims,
    evidence_dict=evidence_dict,
    tokenizer=tokenizer,
    max_length=512,
    balance=True,             
    target_ratio=1.0,
)
# Load BERT model
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=4)

# Set training parameters
training_args = TrainingArguments(
    output_dir="./results",
    per_device_train_batch_size=16,
    num_train_epochs=3,
    eval_strategy="no",
    save_strategy="no",
    logging_strategy="no",   
    logging_steps=50,
)

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    tokenizer=tokenizer
)

# Start training
trainer.train()

# Save model

model.save_pretrained(model_path)
tokenizer.save_pretrained(model_path)


## Predict and save

In [10]:
import json
import torch
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from tqdm import tqdm
import torch.nn.functional as F

# 1. load model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertForSequenceClassification.from_pretrained(model_path).to(device)
tokenizer = BertTokenizer.from_pretrained(model_path)
model.eval()

# 2. load your retrieval file (contains claim_text)
with open("test-claims-predictions.json", "r") as f:
    test_data = json.load(f)

# 3. load evidence content
with open("./data/evidence.json", "r") as f:
    evidence_dict = json.load(f)

id2label = {
    0: "SUPPORTS",
    1: "REFUTES",
    2: "NOT_ENOUGH_INFO",
    3: "DISPUTED"
}

# 4. define the function to predict the label with average probability
def predict_label_average(claim_text, evidence_ids):
    probs_list = []

    for eid in evidence_ids:
        if eid not in evidence_dict:
            continue
        ev_text = evidence_dict[eid]

        inputs = tokenizer(
            claim_text,
            ev_text,
            return_tensors="pt",
            truncation=True,
            padding="max_length",
            max_length=512
        ).to(device)

        with torch.no_grad():
            outputs = model(**inputs)
            probs = F.softmax(outputs.logits, dim=-1)
            probs_list.append(probs.cpu().numpy()[0])

    if not probs_list:
        return "NOT_ENOUGH_INFO"  # fallback

    avg_probs = np.mean(probs_list, axis=0)
    pred_idx = int(np.argmax(avg_probs))
    return id2label[pred_idx]

# 5. predict all test claims
final_predictions = {}

for cid, entry in tqdm(test_data.items(), desc="Classifying"):
    claim_text = entry["claim_text"]
    evidence_ids = entry["evidences"]

    label = predict_label_average(claim_text, evidence_ids)

    final_predictions[cid] = {
        "evidences": evidence_ids,
        "claim_label": label,
        "claim_text": claim_text
    }

# 6. save
with open("test-claims-final-predictions.json", "w") as f:
    json.dump(final_predictions, f, indent=2)

print("Done: saved to test-claims-final-predictions.json")


Classifying: 100%|██████████| 153/153 [00:14<00:00, 10.54it/s]

Done: saved to test-claims-final-predictions.json



