### Please fill the following paths.

In [None]:
# Path to the test dataset, containing "digi.json" and "libertatea.json"
TEST_PATH = ""

# Path to the test dataset, containing "protv.json", "cancan.json" and "wowbiz.json"
TRAIN_PATH = ""

# Path to the folder in which the model and other necessary tools are saved
FOLDER_PATH = ""

### Imports.

In [None]:
! pip install transformers==4.28.0

In [None]:
! pip install datasets

In [None]:
import torch
import numpy as np
from transformers import BertTokenizerFast, BertForSequenceClassification, Trainer, TrainingArguments
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import json 
import pandas as pd
import os
from datasets import load_dataset

from string import punctuation
import re

import torch
import torch.nn.functional as F
from torch import nn

from torch.utils.data import DataLoader

from transformers import BertModel
from torch.nn.functional import cosine_similarity

from datasets import Dataset

### Reading test and train datasets.

In [None]:
def preprocess(text):
  result = text.replace('/',"").replace('\n','')
  result = re.sub(r'[0-9]+','număr',result)
  result = re.sub(r'(\w)(\1{2,})',r'\1',result)
  result = re.sub(r'(?x)\b(?=\w*\d)\w+\s*', '', result)
  result = result.lower()
  punctuations = punctuation + "„”"
  result = "".join(word for word in result if word not in punctuations)
  result = ' '.join(result.split())
  re.sub(r' +',' ',result).lower().strip()
  return result

In [None]:
def read_file(path, name):
  file_path = path + "/" + name 

  reader = open(file_path)
  json_array = json.load(reader)
  news = []
  # nonclickbait = 1
  # clickbait = 0

  for element in json_array:
    cat = 0
    if element["category"] == "nonclickbait":
      cat = 1
    item = {
        "title" : preprocess(element["title"]),
        "content" : preprocess(element["content"]),
        "category":cat
            }
    news.append(item)

  return news

In [None]:
def read_raw_data(folder_path):
  filenames = sorted(os.listdir(folder_path))

  raw_data = []
  for filename in filenames:
    print(filename)
    current = read_file(folder_path, filename)
    raw_data.extend(current)

  return raw_data

In [None]:
print('Test files:')
test_raw_data  = read_raw_data(TEST_PATH)
print("---------------------")
print('Train files:')
train_raw_data = read_raw_data(TRAIN_PATH)
print("---------------------")

In [None]:
df_train = pd.DataFrame(train_raw_data)
df_test = pd.DataFrame(test_raw_data)

### Model

In [None]:
model_name = "dumitrescustefan/bert-base-romanian-cased-v1"
tokenizer = BertTokenizerFast.from_pretrained(model_name)

In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.text_encoder = BertModel.from_pretrained("dumitrescustefan/bert-base-romanian-cased-v1")

    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None):
        outputs = self.text_encoder(input_ids=input_ids,
                                    attention_mask=attention_mask,
                                    token_type_ids=token_type_ids)

        last_hidden_state = outputs.last_hidden_state
        
        # mean pooling 
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() # expand mask
        sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1) # multiply last hidden state by mask to ignore padding tokens
        sum_mask = input_mask_expanded.sum(1)
        sum_mask = torch.clamp(sum_mask, min=1e-9) # sum mask != 0
        mean_embeddings = sum_embeddings / sum_mask # mean token embeddings
        return mean_embeddings

In [None]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, output1, output2, target):
        
        distances = 1.0 - cosine_similarity(output1, output2) # dissimilarity
        # distances: 0 - identical
        # distances: 1 - dissimilar
        
        # target == 1, just distance - to penalize the dissimilar pairs
        # target = 0, relu(margin - distance) - if the pair is dissimilar, but the distance is < margin, it contributes to the loss, 
                                                # otherwise, the result is 0 so it doesnt contribute
        loss = 0.5 * (target.float() * distances +
                      (1.0 - target).float() * torch.relu(self.margin - distances).float())
        return loss.mean()

In [None]:
dataset =  Dataset.from_pandas(df_train)
hf_dataset_splits = dataset.train_test_split(test_size=0.3)
train_dataset = hf_dataset_splits['train']
validation_dataset =  hf_dataset_splits['test']

test_dataset = Dataset.from_pandas(df_test)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_epochs = 5
batch_size = 4

model = SiameseNetwork().to(device)
criterion = ContrastiveLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.000001)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=True)

In [None]:
for epoch in range(num_epochs):
    print(f'Starting epoch {epoch+1}/{num_epochs}')
    
    running_loss = 0
    correct_predictions = 0
    total_predictions = 0
    
    model.train()
    
    for batch in train_dataloader:
        title, content, target = batch['title'], batch['content'], batch['category']
        
        inputs_title = tokenizer(title, padding=True, truncation=True, max_length=256, return_tensors='pt').to(device)
        inputs_text = tokenizer(content, padding=True, truncation=True, max_length=256, return_tensors='pt').to(device)
        
        optimizer.zero_grad()
        
        output1 = model(**inputs_title) 
        output2 = model(**inputs_text)
        target = target.to(device)
        loss = criterion(output1, output2, target)
       
        loss.backward()
        optimizer.step() 
        
        running_loss += loss.item()
        
        cos_sim = cosine_similarity(output1, output2)
        predicted_similarity = (1.0 - cos_sim) < 0.25
        correct_predictions += accuracy_score(target.cpu(), predicted_similarity.cpu().type(torch.LongTensor), normalize=False)
        total_predictions += target.shape[0]
        
    avg_loss_train = running_loss / len(train_dataloader)
    accuracy_train = correct_predictions / total_predictions  
                         
    print(f'Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_loss_train}, Train Accuracy: {accuracy_train}')
               
    model.eval()
    running_loss = 0
    correct_predictions = 0
    total_predictions = 0
                         
    with torch.no_grad():
        for batch in validation_dataloader:
            title, content, target = batch['title'], batch['content'], batch['category']

            inputs_title = tokenizer(title, padding=True, truncation=True, max_length=256, return_tensors='pt').to(device)
            inputs_text = tokenizer(content, padding=True, truncation=True, max_length=256, return_tensors='pt').to(device)
            
            output1 = model(**inputs_title) 
            output2 = model(**inputs_text)
            target = target.to(device)
            loss = criterion(output1, output2, target)

            running_loss += loss.item()

            cos_sim = cosine_similarity(output1, output2)
            predicted_similarity = (1.0 - cos_sim) < 0.25
            correct_predictions += accuracy_score(target.cpu(), predicted_similarity.cpu().type(torch.LongTensor), normalize=False)
            total_predictions += target.shape[0]

    avg_val_loss = running_loss / len(validation_dataloader)
    val_accuracy = correct_predictions / total_predictions
                                         
    print(f'Epoch {epoch+1}/{num_epochs} - Val Loss: {avg_val_loss}, Val Accuracy: {val_accuracy}')

In [None]:
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

running_loss = 0.0
correct_predictions = 0
total_predictions = 0

all_predictions = []
all_labels = []

model.eval()

with torch.no_grad():
    for batch in test_dataloader:
        title, content, target = batch['title'], batch['content'], batch['category']
        target = target.to(device)

        inputs_title = tokenizer(title, padding=True, truncation=True, max_length=256, return_tensors='pt').to(device)
        inputs_text = tokenizer(content, padding=True, truncation=True, max_length=256, return_tensors='pt').to(device)
            
        output1 = model(**inputs_title) 
        output2 = model(**inputs_text)
        
        loss = criterion(output1, output2, target)
        running_loss += loss.item()

        predicted_similarity = (1.0 - cosine_similarity(output1, output2)) < 0.25
        correct_predictions += accuracy_score(target.cpu(), predicted_similarity.cpu().type(torch.LongTensor), normalize=False)
        total_predictions += target.shape[0]

        all_predictions.extend(predicted_similarity.cpu().numpy())
        all_labels.extend(target.cpu().numpy())
        
avg_test_loss = running_loss / len(test_dataloader)
test_accuracy = correct_predictions / total_predictions

print(f'Test: Loss: {avg_test_loss}, Accuracy: {test_accuracy}')

In [None]:
model_path = FOLDER_PATH + "model_contrastive_learning.pt"
torch.save(model.state_dict(), model_path)