In [1]:
import random

'''Function for turning the text into lowercase expression'''
def lower_processing(data, text_type):
    if text_type == "claim_text":
        for i in data:
            data[i] == data[i][text_type].lower()
    else:
        for i in data:
            data[i] = data[i].lower()
    return data

'''Function for removing stopwords from claim and evidence for reducing the computational consumption'''
def stopwords_func(stop_words, text_type, text_data):
    if text_type == "evidence":
        for i in text_data:
            sentence = text_data[i]
            words = sentence.split()
            filtered_words = [word for word in words if word.lower() not in stop_words]
            filtered_sentence = " ".join(filtered_words)
            text_data[i] = filtered_sentence
    else:
        for i in text_data.values():
            sentence = i["claim_text"]
            words = sentence.split()
            filtered_words = [word for word in words if word.lower() not in stop_words]
            filtered_sentence = " ".join(filtered_words)
            i["claim_text"] = filtered_sentence
    return text_data

'''Function for picking random keys from the dictionary after excluding the specified key(s)'''
def pick_random_keys(dictionary, excluded_keys, num_keys):
    available_keys = [key for key in dictionary.keys() if key not in excluded_keys]
    random_keys = random.sample(available_keys, num_keys)
    return random_keys

In [None]:
import json
from nltk.corpus import stopwords

## Read in data
# Read in training data (claim)
with open('../project-data/train-claims.json', 'r') as tclaim_file:
    tclaim_data = json.load(tclaim_file)

# Read in development data (claim)
with open('../project-data/dev-claims.json', 'r') as dclaim_file:
    dclaim_data = json.load(dclaim_file)

# Read in test data (claim)
with open('../project-data/test-claims-unlabelled.json', 'r') as uclaim_file:
    uclaim_data = json.load(uclaim_file)

# Read in evidence data
with open('../project-data/evidence.json', 'r') as evi_file:
    evi_data = json.load(evi_file)

## Preprocessing - Lowercase operation of the case
tclaim_data = lower_processing(tclaim_data, "claim_text")
dclaim_data = lower_processing(dclaim_data, "claim_text")
uclaim_data = lower_processing(uclaim_data, "claim_text")
evi_data = lower_processing(evi_data, 'evidence')

# ## Remove stopwords from claims and evidence (optional)
# stop_words = set(stopwords.words('english'))
# tclaim_data = stopwords_func(stop_words, "claim", tclaim_data)
# dclaim_data = stopwords_func(stop_words, "claim", dclaim_data)
# uclaim_data = stopwords_func(stop_words, "claim", uclaim_data)
# evi_data = stopwords_func(stop_words, "evidence", evi_data)

## Create claim-evidence pair based on training set
train_pairs = []
for i in tclaim_data.values():
    for j in i["evidences"]:
        train_pairs.append({"claim": i["claim_text"], "evidence": [evi_data[j]], "label": 1})

## insert negative sample to the training set
for i in tclaim_data.values():
    excluded_keys = i["evidences"]
    random_keys = pick_random_keys(evi_data, excluded_keys, len(excluded_keys))
    for j in random_keys:
        train_pairs.append({"claim": i["claim_text"], "evidence": [evi_data[j]], "label": 0})

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel, AdamW

# 自定义数据集类
class TextMatchingDataset(Dataset):
    def __init__(self, pairs):
        self.pairs = pairs
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, index):
        claim_text = self.pairs[index]['claim']
        evidence_texts = self.pairs[index]['evidence']
        label = self.pairs[index]['label']

        # 对主张文本和证据文本进行编码
        inputs = self.tokenizer.encode_plus(
            claim_text,
            evidence_texts,
            add_special_tokens=True,
            max_length=128,  # 根据实际情况调整最大长度
            pad_to_max_length=True,
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': inputs['input_ids'].squeeze(),
            'attention_mask': inputs['attention_mask'].squeeze(),
            'label': torch.tensor(label, dtype=torch.long)
        }

# 构建BERT文本匹配模型
class TextMatchingModel(nn.Module):
    def __init__(self):
        super(TextMatchingModel, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = nn.Dropout(0.1)
        self.fc = nn.Linear(768, 2)  # 输出维度为2，表示匹配/不匹配

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        return logits

# 训练集数据
train_dataset = TextMatchingDataset(train_pairs)
train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)

# 定义模型和优化器
model = TextMatchingModel()
optimizer = AdamW(model.parameters(), lr=1e-5)

# 训练模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

model.train()
for epoch in range(10):  # 根据实际情况设置训练迭代次数
    for batch in train_dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        logits = model(input_ids, attention_mask)
        loss = nn.functional.cross_entropy(logits, labels)
        loss.backward()
        optimizer.step()
torch.save(model, '/content/drive/MyDrive/Colab Notebooks/NLP_Project/Code/bert_model.pth')

In [None]:
# 测试集数据

# Load the saved model
model = torch.load('/content/drive/MyDrive/Colab Notebooks/NLP_Project/Code/bert_model.pth')

test_pairs = []
for i in dclaim_data.values():
    for j in i["evidences"]:
        test_pairs.append({"claim": i["claim_text"], "evidence": [evi_data[j]], "label": 1})

test_dataset = TextMatchingDataset(test_pairs)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=False)

model.eval()
with torch.no_grad():
    for batch in test_dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        logits = model(input_ids, attention_mask)
        probabilities = torch.softmax(logits, dim=1)
        predictions = torch.argmax(probabilities, dim=1)

        for i, prediction in enumerate(predictions):
            claim = test_pairs[i]['claim']
            evidence = test_pairs[i]['evidence']
            label = test_pairs[i]['label']
            predicted_label = prediction.item()

            print(f"Claim: {claim}")
            print(f"Evidence: {evidence}")
            print(f"True Label: {label}")
            print(f"Predicted Label: {predicted_label}")
            print()
