In [1]:
#Open the json file
import json

def read_data(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        data = json.load(file)
    return data

In [2]:
import torch
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def remove_links(text):
  lista = text.split()
  text = ""
  checker = True
  prev = None
  for element in lista:
    if element == "https" or element == "www":
      checker = False
    elif prev != "/" and prev != "" and prev != "." and prev != "://" and prev != "-" and element != "/" and element != "" and element != "." and element != "-" and element!="://":
      checker = True
    if checker:
      if element not in [".", ",", ":", "/", ";", "-", "_", "'", '"', "|", "[", "]", "+", "#", "*", "(", ")"]:
        text += " "+element
    prev = element
  return text

In [4]:
class Dataset_en(torch.utils.data.Dataset):
    def __init__(self, path, tokenizer):
        self.data = read_data(path)
        self.tokenizer = tokenizer
        self.max_len = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data[idx]['text']
        label = self.data[idx]['category']
        if label == "CONSPIRACY":
            label = 1
        else:
            label = 0
            
        text = remove_links(text)
        
        inputs = self.tokenizer(text, return_tensors='pt', max_length=self.max_len, padding='max_length', truncation=True)
        item = {key: inputs[key].squeeze(0) for key in inputs}
        item['labels'] = torch.tensor(label)

        return item

In [5]:
dataset = Dataset_en("../dataset_en_train.json", tokenizer)

print(len(dataset))

4000


In [6]:
#Train test split the dataset
from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42)

print(len(train_data))
print(len(test_data))

3200
800


In [7]:
#Create weights for the classes of the training data
from sklearn.utils.class_weight import compute_class_weight

labels = [data['labels'].item() for data in train_data]
class_weights = compute_class_weight('balanced', classes=[0, 1], y=labels)
class_weights = torch.tensor(class_weights, dtype=torch.float)

print(class_weights)

tensor([0.7718, 1.4197])


In [8]:
from transformers import AutoModel

BERT = AutoModel.from_pretrained("bert-base-uncased")
BERT.config.output_hidden_states = True
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [9]:
class AttentionPooling(torch.nn.Module):
    def __init__(self, hidden_size):
        super(AttentionPooling, self).__init__()
        self.hidden_size = hidden_size
        self.query = torch.nn.Linear(hidden_size, hidden_size)
        self.key = torch.nn.Linear(hidden_size, hidden_size)
        self.value = torch.nn.Linear(hidden_size, hidden_size)
        
        self.f1 = torch.nn.Linear(hidden_size, hidden_size)
        
        self.softmax = torch.nn.Softmax(dim=1)
        
    def forward(self, inputs):
        #Inputs: (batch_size, seq_len, hidden_size)
        query = self.query(inputs)
        key = self.key(inputs)
        value = self.value(inputs)
        
        attention_scores = torch.bmm(query, key.transpose(1, 2)) / self.hidden_size**0.5
        attention_weights = self.softmax(attention_scores)
        
        context = torch.bmm(attention_weights, value)
        
        context = self.f1(context)
        
        pooled_output = context.mean(dim=1)
        
        return pooled_output

class BertClassifier(torch.nn.Module):
    def __init__(self, bert, num_classes):
        super(BertClassifier, self).__init__()
        self.bert = bert
        self.pooling = AttentionPooling(hidden_size=bert.config.hidden_size)
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(bert.config.hidden_size, num_classes)
        
        self.ffw = torch.nn.Sequential(
            torch.nn.Linear(bert.config.hidden_size, bert.config.hidden_size * 4),
            torch.nn.ReLU(),
            torch.nn.Linear(bert.config.hidden_size * 4, bert.config.hidden_size),
        )
        
    def forward(self, input_ids, attention_mask):
        x = self.bert(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state
        pooled_output = self.pooling(x)
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        return logits
        

In [10]:
model = BertClassifier(BERT, num_classes=2)

print(model)

BertClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_af

In [11]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
criterion = torch.nn.CrossEntropyLoss(weight=torch.tensor(class_weights).to(device))

train_loader = torch.utils.data.DataLoader(train_data, batch_size=8, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=8, shuffle=False)

  criterion = torch.nn.CrossEntropyLoss(weight=torch.tensor(class_weights).to(device))


In [13]:
from tqdm import tqdm
from sklearn.metrics import f1_score, matthews_corrcoef

EPOCHS = 5

model.train()

model.to(device)

best_mcc = -1

for epoch in range(EPOCHS):
    loop = tqdm(train_loader, leave=True)
    for batch in loop:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        loop.set_description(f"Epoch {epoch+1}")
        loop.set_postfix(loss=loss.item())
        
    model.eval()
    y_true = []
    y_pred = []
    with torch.no_grad():
        for batch in tqdm(test_loader, leave=True):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask)
            _, predicted = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())
            
    f1 = f1_score(y_true, y_pred)
    mcc = matthews_corrcoef(y_true, y_pred)
    print(f"F1 Score: {f1}")
    print(f"MCC: {mcc}")
    
    if mcc > best_mcc:
        best_mcc = mcc
        torch.save(model.state_dict(), "model_BERT_Attention_NOLINKS.pth")
        
    #Save the results each epoch
    with open(f"results_BERT_Attention_NOLINKS_{epoch+1}.txt", 'w') as file:
        file.write(f"F1 Score: {f1}\n")
        file.write(f"MCC: {mcc}\n")

Epoch 1: 100%|██████████| 400/400 [03:18<00:00,  2.01it/s, loss=0.000518]
100%|██████████| 100/100 [00:16<00:00,  6.23it/s]


F1 Score: 0.8582089552238807
MCC: 0.7903425533886849


Epoch 2: 100%|██████████| 400/400 [03:14<00:00,  2.06it/s, loss=0.00026] 
100%|██████████| 100/100 [00:16<00:00,  6.18it/s]


F1 Score: 0.8628230616302187
MCC: 0.7999279721499342


Epoch 3: 100%|██████████| 400/400 [03:15<00:00,  2.05it/s, loss=9.07e-5] 
100%|██████████| 100/100 [00:16<00:00,  6.21it/s]


F1 Score: 0.8644400785854617
MCC: 0.8012874437007889


Epoch 4: 100%|██████████| 400/400 [03:15<00:00,  2.04it/s, loss=0.00069] 
100%|██████████| 100/100 [00:16<00:00,  6.19it/s]


F1 Score: 0.8625
MCC: 0.8058355850492399


Epoch 5: 100%|██████████| 400/400 [03:15<00:00,  2.05it/s, loss=8.28e-5] 
100%|██████████| 100/100 [00:16<00:00,  6.22it/s]

F1 Score: 0.8560311284046692
MCC: 0.7882529169379718





In [37]:
#Load the best model
model.load_state_dict(torch.load("model_BERT_Attention.pth"))

<All keys matched successfully>

In [38]:
#Test the model using the f1 score and the mathew correlation coefficient on the test data
from sklearn.metrics import f1_score, matthews_corrcoef

model.eval()

all_labels = []
all_preds = []

with torch.no_grad():
    loop = tqdm(test_loader, leave=True)
    for batch in loop:
        with torch.no_grad():
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask)
            preds = torch.argmax(outputs, dim=1)
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

f1 = f1_score(all_labels, all_preds)
mcc = matthews_corrcoef(all_labels, all_preds)

print(f"F1 Score: {f1}")
print(f"Matthews Correlation Coefficient: {mcc}")

#Save the results
results = {
    "f1": f1,
    "mcc": mcc
}

with open("BERT_AttentionPooling.json", 'w') as file:
    json.dump(results, file)

100%|██████████| 100/100 [00:15<00:00,  6.40it/s]

F1 Score: 0.8702928870292888
Matthews Correlation Coefficient: 0.8177246968660349



