# Fine Tuning Transformer for Text Classification

In [None]:
import numpy as np
import pandas as pd
from sklearn import metrics
import transformers
import torch
import torch.nn as nn
from sklearn.model_selection import KFold
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from transformers import BertTokenizer, BertModel, BertConfig
from sklearn.metrics import accuracy_score, f1_score, matthews_corrcoef
from sklearn.metrics import accuracy_score, f1_score, matthews_corrcoef, classification_report
import json


In [None]:
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

# Data

In [None]:
df = pd.read_csv('data/train/dataset_en_train_completed.csv')


# Balance Data

In [None]:
df= pd.read_csv('data/train/dataset_en_train_completed.csv')
df= df.iloc[:4000]

df_augmented= pd.read_csv('data/train/dataset_en_train_completed.csv')
df_augmented= df_augmented.iloc[4000:]
df_augmented

In [None]:
original_counts = df['category'].value_counts()
df_conspiracy = df[df['category'] == 'CONSPIRACY']
df_conspiracy_augmented = df_augmented[df_augmented['category'] == 'CONSPIRACY']
critical_count = original_counts.get('CRITICAL', 0)  
conspiracy_count = original_counts.get('CONSPIRACY', 0)


In [None]:
df_conspiracy = df_augmented[df_augmented['category'] == 'CONSPIRACY']

# Seleccionar 1242 filas aleatorias para balancear el dataset 
df_conspiracy_sampled = df_conspiracy.sample(n=1242, random_state=42)
df_combined = pd.concat([df, df_conspiracy_sampled])

df_combined.reset_index(drop=True, inplace=True)
df=df_combined.copy()


# Add Context

In [None]:
def add_context(df):
    # Añade información contextual usando corchetes
    df['text'] =   df['text'] + '. The text reflects the emotion: ' + df['max_emotion'] + ' and the moral value: ' + df['max_moral'] 
    return df

df=add_context(df)
df.loc[0].text

# Transform label from categoric to numeric

Critical = 1
Conspirancy = 0

In [None]:
df['class'] = df['category'].apply(lambda x: 1 if x == 'CRITICAL' else 0)

new_df = df[['text', 'class']].copy()
new_df

<a id='section03'></a>
# Preparing the Dataset and Dataloader




In [None]:
MAX_LEN = 512 
TRAIN_BATCH_SIZE = 32 
VALID_BATCH_SIZE = 32
EPOCHS = 3 
LEARNING_RATE = 2e-5 
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
class CustomDataset(Dataset):

    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.text = dataframe['text']
        self.targets = self.data['class']
        self.max_len = max_len

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())

        # Tokenize the text
        inputs = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_token_type_ids=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        ids = inputs['input_ids'].squeeze()
        mask = inputs['attention_mask'].squeeze()
        token_type_ids = inputs["token_type_ids"].squeeze()


        return {
            'ids': ids,
            'mask': mask,
            'token_type_ids': token_type_ids,
            'targets': torch.tensor(self.targets[index], dtype=torch.float)
        }

In [None]:
'''# Creating the dataset and dataloader for the neural network
train_size = 0.8
train_dataset=new_df.sample(frac=train_size,random_state=200)
test_dataset=new_df.drop(train_dataset.index).reset_index(drop=True)
train_dataset = train_dataset.reset_index(drop=True)


print("FULL Dataset: {}".format(new_df.shape))
print("TRAIN Dataset: {}".format(train_dataset.shape))
print("TEST Dataset: {}".format(test_dataset.shape))

training_set = CustomDataset(train_dataset, tokenizer, MAX_LEN)
testing_set = CustomDataset(test_dataset, tokenizer, MAX_LEN)

train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)'''

In [None]:
'''
training_set = CustomDataset(new_df, tokenizer, MAX_LEN)


train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
'''

<a id='section04'></a>
### Creating the Neural Network for Fine Tuning


In [None]:
class BERTClass(torch.nn.Module):
    def __init__(self):
        super(BERTClass, self).__init__()
        self.l1 = transformers.BertModel.from_pretrained('bert-base-uncased')
        self.l2 = torch.nn.Dropout(0.3)
        #1 es porque en este caso es una  tarea binaria
        self.l3 = torch.nn.Linear(768, 1)

    def forward(self, ids, mask, token_type_ids):
        _, output_1= self.l1(ids, attention_mask = mask, token_type_ids = token_type_ids, return_dict=False)
        output_2 = self.l2(output_1)
        output = self.l3(output_2).squeeze(1)
        return output

model = BERTClass()
model.to(device)

In [None]:
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

In [None]:
optimizer = torch.optim.Adam(params =  model.parameters(), lr=LEARNING_RATE)

# Fine Tuning the Model


In [None]:
def train(epoch):
    model.train()
    for _,data in enumerate(training_loader, 0):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.float)

        outputs = model(ids, mask, token_type_ids)

        optimizer.zero_grad()
        loss = loss_fn(outputs, targets)
        if _%5000==0:
            print(f'Epoch: {epoch}, Loss:  {loss.item()}')

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [None]:
for epoch in range(EPOCHS):
    train(epoch)


### Validating the Model


In [None]:
from sklearn.metrics import accuracy_score, f1_score

def validation(epoch):
    model.eval()  # Set the model to evaluation mode
    fin_targets = []  # List to store true labels
    fin_outputs = []  # List to store predicted probabilities

    with torch.no_grad():  # Disable gradient calculation
        for _, data in enumerate(testing_loader):
            ids = data['ids'].to(device)
            mask = data['mask'].to(device)
            token_type_ids = data['token_type_ids'].to(device)
            targets = data['targets'].to(device)

            # Get model outputs
            outputs = model(ids, mask, token_type_ids)  # Forward pass
            # Apply sigmoid to get probabilities
            outputs = torch.sigmoid(outputs)

            # Append targets and outputs to the lists
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(outputs.cpu().detach().numpy().tolist())

    return fin_outputs, fin_targets


In [None]:
for epoch in range(EPOCHS):
    outputs, targets = validation(epoch)
    outputs = np.array(outputs) >= 0.5
    accuracy = metrics.accuracy_score(targets, outputs)
    f1_score_micro = metrics.f1_score(targets, outputs, average='micro')
    f1_score_macro = metrics.f1_score(targets, outputs, average='macro')
    mcc = metrics.matthews_corrcoef(targets, outputs)  # Calculate MCC

    print(f"Accuracy Score = {accuracy}")
    print(f"F1 Score (Micro) = {f1_score_micro}")
    print(f"F1 Score (Macro) = {f1_score_macro}")
    print(f"MCC = {mcc:.4f}")

# Fine Tuning and Validating the Model (Cross Validation)

In [None]:
def train(epoch, model, train_loader):
    model.train()  # Poner el modelo en modo de entrenamiento
    for _, data in enumerate(train_loader):
        ids = data['ids'].to(device, dtype=torch.long)
        mask = data['mask'].to(device, dtype=torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
        targets = data['targets'].to(device, dtype=torch.float)

        optimizer.zero_grad()  # Limpiar los gradientes previos
        outputs = model(ids, mask, token_type_ids)  
        loss = loss_fn(outputs, targets)  # pérdida

        if _ % 5000 == 0:  
            print(f'Epoch: {epoch}, Loss: {loss.item()}')

        loss.backward()  
        optimizer.step()  

def validation(model, data_loader, device):
    model.eval()  
    fin_targets = []  # etiqueta verdadera
    fin_outputs = []  # predicciones

    with torch.no_grad():  
        for data in data_loader:
            ids = data['ids'].to(device)
            mask = data['mask'].to(device)
            token_type_ids = data['token_type_ids'].to(device)
            targets = data['targets'].to(device)

            # Get model outputs
            outputs = model(ids, mask, token_type_ids)  
            # funcion sigmoide para la salida como probabilidades
            outputs = torch.sigmoid(outputs)

            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(outputs.cpu().detach().numpy().tolist())

    return fin_outputs, fin_targets


def evaluate_metrics(outputs, targets):
    # Convertir las salidas a 0 o 1 (clases predichas) basadas en el umbral 0.5
    outputs = [1 if x > 0.5 else 0 for x in outputs]

    # Calcular las métricas
    accuracy = accuracy_score(targets, outputs)
    precision, recall, f1, _ = precision_recall_fscore_support(targets, outputs, average='binary')
    mcc = matthews_corrcoef(targets, outputs)  # Calcular MCC

    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'mcc': mcc
    }
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"MCC: {mcc:.4f}")


def cross_validate_model(model, dataframe, tokenizer,title, epochs=3, batch_size=16, k_folds=5):
    #kfold
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    metrics_list = [] 
    for fold, (train_idx, val_idx) in enumerate(kf.split(dataframe)):
        print(f"\nFold {fold + 1}/{k_folds}")
        train_df = dataframe.iloc[train_idx].reset_index(drop=True)
        val_df = dataframe.iloc[val_idx].reset_index(drop=True)

        # Dividir datos en entrenamiento y validación
        train_set = CustomDataset(train_df, tokenizer, MAX_LEN)
        val_set = CustomDataset(val_df, tokenizer, MAX_LEN)

        train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

        # Se entrena para cada fold
        for epoch in range(epochs):
            train(epoch, model, train_loader)  

        # Se valida
        outputs, targets = validation(model, val_loader, device)
        
        fold_metrics=evaluate_metrics(outputs, targets) 
        metrics_list.append(fold_metrics)
        
    metrics_df = pd.DataFrame(metrics_list)

    # Se guardan las métricas del entreno
    metrics_df.to_csv(f'metrics{title}.csv', index=False)

    print('Cross-validation complete')

In [None]:
# Cambiar el nombre del archivo que se guarda
cross_validate_model(model, new_df, tokenizer, 'moral_emotions_512_32_2e5_bbu', epochs=3, batch_size=TRAIN_BATCH_SIZE, k_folds=5)


# ************************************************************

# Saving the Trained Model 

In [None]:
MODEL_PATH = "/content/drive/MyDrive/MASTER/NLP/model/bert_finetuned_model_en.pth"

torch.save(model.state_dict(), MODEL_PATH)


# Load the Saving Model 

In [None]:
'''MODEL_PATH = "/content/drive/MyDrive/MASTER/NLP/model/bert_finetuned_model_en.pth"


# Initialize the model and load the saved state dict
loaded_model = BERTClass()
loaded_model.load_state_dict(torch.load(MODEL_PATH))
loaded_model.to(device)
loaded_model.eval()'''

# Test

In [None]:
test_df = pd.read_csv("data/test/dataset_en_test_cleaned.csv")
test_df['class'] = test_df['category'].apply(lambda x: 1 if x == 'CRITICAL' else 0)
test_df = test_df[['text', 'class']].copy()
test_df

In [None]:
model.eval()

#usar modelo y tokenizer
def predict(text, model, tokenizer):
    inputs = tokenizer.encode_plus(
        text,
        None,
        add_special_tokens=True,
        max_length=MAX_LEN,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    )

    ids = inputs['input_ids'].to(device)
    mask = inputs['attention_mask'].to(device)
    token_type_ids = inputs['token_type_ids'].to(device)

    with torch.no_grad():
        outputs = model(ids, mask, token_type_ids) 
        probabilities = torch.sigmoid(outputs).cpu().numpy()
        prediction = 1 if probabilities[0] >= 0.5 else 0

    return prediction, probabilities[0]


    
def test_and_evaluate(model, tokenizer, test_df, filename="test_results.json"):
    predictions = []
    probabilities = []
    for index, row in test_df.iterrows():
        text = row['text']
        prediction, probability = predict(text, model, tokenizer) 
        predictions.append(prediction)
        probabilities.append(probability)

    test_df['predictions'] = predictions
    test_df['probabilities'] = probabilities

    mcc = matthews_corrcoef(test_df['class'], test_df['predictions'])
    
    results = classification_report(
        test_df['class'], 
        test_df['predictions'], 
        target_names=['CONSPIRANCY', 'CRITICAL'],  # Specify labels for better readability
        digits=5, 
        output_dict=True
    )



    # Guardar
    output_data = {
        "mcc": mcc,
        "classification_report": results
    }

    with open(filename, "w") as f:
        json.dump(output_data, f, indent=4)

    print(f"Test results saved to {filename}")
    print(f"Test MCC = {mcc}")



In [None]:
#Cambiar nombre de fichero con métricas
test_and_evaluate(model, tokenizer, test_df, filename="test_evaluation_results_512_2e5_emotions_morals.json")
