# Supervised models


The supervised models are trained on a large number of labelled messages. 

The following notebook provides code for the comparison with Naive-Bayes model and a supervised BERT model.

## Imports

In [1]:
import pandas as pd
import time
import re
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score, classification_report, precision_score, recall_score, f1_score
from transformers import BertTokenizer, BertForSequenceClassification,Trainer, TrainingArguments
import torch
import numpy as np
import os
import statsmodels.api as sm
from statsmodels.stats.proportion import proportion_confint   


# Performance calculation functions

These functions provide standardized calculations of performance measures: precision, recall, accuracy, balanced F1, etc. 

In [None]:
#Help functions: We first define functions to calculate model performance.

# Calculate accuracy scores.
def calculate_accuracy(S, P):

    S = np.array(S)
    P = np.array(P)
    accuracy = accuracy_score(S, P)
    correct_predictions = sum(P == S)
    confidence_lower, confidence_upper = proportion_confint(correct_predictions, len(S), method='wilson')
    return accuracy, confidence_lower, confidence_upper 

def calculate_precision(true_labels, predicted_labels):
    # Assumes binary classification; for multi-class, set the 'average' parameter
    return precision_score(true_labels, predicted_labels, average='binary')

def calculate_recall(true_labels, predicted_labels):
    # Assumes binary classification; for multi-class, set the 'average' parameter
    return recall_score(true_labels, predicted_labels, average='binary')

def calculate_f1_score(true_labels, predicted_labels):
    # Assumes binary classification; for multi-class, set the 'average' parameter
    return f1_score(true_labels, predicted_labels, average='binary')

def invert_labels(labels):
    return [1 if l == 0 else 0 if l==1 else None for l in labels]

def estimate_accuracies(true_labels, predicted_labels):
    accuracy,lower_acc,upper_acc = calculate_accuracy(true_labels, predicted_labels)
    
    precision = calculate_precision(true_labels, predicted_labels)
    recall = calculate_recall(true_labels, predicted_labels)
    
    f1 = calculate_f1_score(true_labels, predicted_labels)
    f1_second = calculate_f1_score(invert_labels(true_labels), invert_labels(predicted_labels))

    macro_f1 = np.mean([f1,f1_second])
    
    print(f"Accuracy: {accuracy} [{lower_acc},{upper_acc}]")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    print(f"Second F1 Score: {f1_second}")
    print(f"Macro F1: {macro_f1}")
    
    return accuracy, precision, recall, f1, f1_second, macro_f1, lower_acc,upper_acc


# Training supervised models

### Train Naive-bayes
This code trains and applies the Naive-Bayes model. We're using default parameters and standard code.

In [None]:
# Train the NB model on the entire training dataset.
def train_naive_bayes(texts,labels):

    data = {
        'text': texts,
        'label': labels
    }
    df = pd.DataFrame(data)
    
    def preprocess(text):
        return text.lower()
    
    df['text'] = df['text'].apply(preprocess)
    
    # Convert to bag-of-words
    vectorizer = CountVectorizer()
    X = vectorizer.fit_transform(df['text'])
    y = df['label']
    
    # Train classifier (splitting into train and test has been done separately, so no need to do so here.)
    clf = MultinomialNB()
    clf.fit(X, y)
    
    return clf,vectorizer


### Train BERT 
This code trains and applies the BERT model. We're using default parameters and standard code.

In [None]:
# Train the bert model on the entire training dataset.
def train_bert_model(texts, labels, bert_model='bert-base-uncased'):
    # Tokenization
    tokenizer = BertTokenizer.from_pretrained(bert_model)    
    encodings = tokenizer(texts, truncation=True, padding=True)
    
    # Create a Dataset Class:
    class TextDataset(torch.utils.data.Dataset):
        def __init__(self, encodings, labels):
            self.encodings = encodings
            self.labels = labels
    
        def __getitem__(self, idx):
            item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
            item['labels'] = torch.tensor(self.labels[idx])
            return item
    
        def __len__(self):
            return len(self.labels)
    
    dataset = TextDataset(encodings, labels)
    
    # Load Pre-trained BERT Model:
    model = BertForSequenceClassification.from_pretrained(bert_model, num_labels=len(set(labels)))
    
    # Define the training arguments and train the model.
    training_args = TrainingArguments(
        output_dir='./results',          
        num_train_epochs=3,              
        per_device_train_batch_size=16,  
        per_device_eval_batch_size=64,   
        warmup_steps=500,                
        weight_decay=0.01,               
        logging_dir='./logs',            
    )
    
    trainer = Trainer(
        model=model,                         
        args=training_args,                  
        train_dataset=dataset,         
        eval_dataset=None  # We're validating seperately
    )
    
    trainer.train()
    return model, tokenizer

def predict_bert(text, model, tokenizer):
    device = torch.device("mps")  # Use 'mps' for Apple Silicon
    model.to(device)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    outputs = model(**inputs)
    predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
    return torch.argmax(predictions, dim=1).item(), predictions.cpu()  # Move predictions back to CPU for further operations or printing


# Data preparation for supervised models

To download the database of the X/Twitter messages that were used, see: 

https://figshare.com/articles/dataset/The_Twitter_Parliamentarian_Database/10120685

(The messages unfortunately cannot be shared here, as this would be a violation of X/Twitter's ToS.)

We select the two largest parties (e.g., Democrats and Republicans). We choose the largest possible number of tweets that produces a balanced dataset. We remove retweets, replies and posts including URLs. We remove any duplicates. For the training, we use all messages except those included in the testing data. 


In [None]:
# Function to remove mentions from text
def remove_mentions(text):
    return str(re.sub(r'@\w+', '', text))

# Prepare the training data from the twitter parliamentarian database dataset
# We select the messages from the top parties
def prepare_data(df,country,test_texts,parties):

    dfs = df.loc[df.country == country]
    dfs['text'] = [str(a) for a in dfs['text']]
    
    # Remove retweets, replies, messages with URLs, and messages shorter than 100
    dfs = dfs.loc[(~dfs.text.str.startswith('RT @'))&(~dfs.text.str.startswith('@'))&(~dfs.text.str.contains('http'))&(dfs.text.str.len()>100) ][['created_at','from_user_name','from_user_id','from_user_realname','party','region','text']]
    dfs = dfs.drop_duplicates(subset='text') #Remove duplicates
    dfs = dfs.loc[~dfs.text.isin(test_texts)] 
    
    # Apply the function to the 'text' column and create a new column
    dfs['text_no_mentions'] = dfs['text'].apply(remove_mentions)
    dfs = dfs[dfs['text_no_mentions'].str.len() >= 100]
    dfs = dfs.loc[dfs['party'].isin(parties)]

    #Get the max number that is possible to get balanced, but no more than 10,000
    selection_size = min([len(dfs.loc[dfs.party==parties[0]]),len(dfs.loc[dfs.party==parties[1]]),5000])
    sample = pd.concat( [ dfs.loc[dfs.party==parties[0]].sample(selection_size), dfs.loc[dfs.party==parties[1]].sample(selection_size) ] )

    texts = list(sample['text_no_mentions'])
    labels = [0 if a==parties[0] else 1 if a==parties[1] else None for a in sample['party']]
    
    return texts,labels
    


# Train and test the supervised models

We train and test Naive-Bayes model and BERT model on the data. 

The following code cleans the data, and uses it to train and test the performance of the two models.

In [None]:
country = 'United States'
val_file = "../../US_test_sample.csv"
parties = ['Democrat','Republican']
bert_model = 'bert-base-uncased'

df = pd.read_feather('full_twitter_database.feather')

print("Reading validation data...")
validation_data = pd.read_csv(val_file)

print("Preparing training data...")
texts,labels = prepare_data(df,texts_to_remove=set(validation_data.text),country=country,parties=parties)
validation_data['solution'] = [0 if a==parties[0] else 1 if a==parties[1] else None for a in validation_data['party']]

print("Training bert model...")
model_bert,tokenizer = train_bert_model(texts,labels,bert_model=bert_model)

print("Evaluating bert model...")
validation_data['prediction_bert'] = [predict_bert(a,model_bert,tokenizer)[0] for a in validation_data['text']]
estimate_accuracies(list(validation_data['solution']),list(validation_data['prediction_bert']))

print("Training bayes model...")
model_bayes,vectorizer = train_naive_bayes(texts,labels)

print("Evaluating bayes model...")
X_val = vectorizer.transform(validation_data['text'])
y_val = model_bayes.predict(X_val)
validation_data['prediction_bayes'] = y_val
estimate_accuracies(list(validation_data['solution']),y_val)

    