 # **Political & Sentiment Classification**

---


 *Introduction*
 
> The goal of this notebook is to:
1.   Train a model for sentiment analysis based of the title and text description of Youtube videos.
2.   Train a model for political classification based of title and text description of Youtube videos

*Summary*

> Since we are dealing with a NLP problem, we decided to use the pretrained model BERT, more exactly [PolitBERT ](https://huggingface.co/maurice/PolitBERT) (BERT flavour specialized on political speeches, interviews and press briefings of English-speaking politicians), since we only consider videos from News&Politics category.

> We then fine-tuned the model for our specific task, using various datasets to capture the specificity of what we were looking for.



> ***Sentiment Analysis Model***

> For the Sentiment Analysis Model, we used the 
[Twitter and Reddit Sentimental analysis Dataset](https://www.kaggle.com/datasets/cosmos98/twitter-and-reddit-sentimental-analysis-dataset), in order to capture how people are talking online.



> ***Political Classification***

> For the Political Classification , we used the 
[Democrat Vs. Republican Tweets](https://www.kaggle.com/datasets/kapastor/democratvsrepublicantweets), in order to capture how political ideologies and affiliations are expressed online.







# Imports

In [1]:
!pip install -q transformers
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
import re
import copy
from tqdm.notebook import tqdm
import gc

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader

from transformers import (
    AutoTokenizer, 
    AutoModel,
    get_linear_schedule_with_warmup
)

from sklearn.metrics import (
    accuracy_score,
    f1_score
)

from sklearn.model_selection import train_test_split

from transformers import BertTokenizer

[K     |████████████████████████████████| 5.5 MB 7.5 MB/s 
[K     |████████████████████████████████| 7.6 MB 54.6 MB/s 
[K     |████████████████████████████████| 182 kB 54.1 MB/s 
[?25h

# Process the text

In [None]:
# Pre-process the text.
# Remove punctuation marks and trailing spaces from text.
def clean_and_parse_text(text):
    if type(text) is not str:
      text = ''

    text = text.split()
    text = [x.strip().lower() for x in text]
    text = [x.replace('\n', ' ').replace('\t', ' ') for x in text]
    text = ' '.join(text)
    text = re.sub('([.,!?()])', r' \1 ', text)
    return text

# Get the text from the dataframe and process it.
def get_texts(df):
    texts = df.apply(lambda x: clean_and_parse_text(x))
    texts = texts.values.tolist()
   
    return texts

In [None]:
# Define the Data used.
class TransformerDataset(Dataset):
  def __init__(self, df, labels=None, set_type=None):
    super(TransformerDataset, self).__init__()

    self.texts = get_texts(df)
    
    self.set_type = set_type
    if self.set_type != 'test':
      self.labels = labels
    
    self.tokenizer = config.TOKENIZER
    self.max_length = config.MAX_LENGTH

  def __len__(self):
      return len(self.texts)
    
  def __getitem__(self, index):
    tokenized = self.tokenizer.encode_plus(
        self.texts[index], 
        max_length=self.max_length,
        pad_to_max_length=True,
        truncation=True,
        return_attention_mask=True,
        return_token_type_ids=False,
        return_tensors='pt'
    )
    input_ids = tokenized['input_ids'].squeeze()
    attention_mask = tokenized['attention_mask'].squeeze()

    # For training, we also need the labels.
    if self.set_type != 'test':
      return {
          'input_ids': input_ids.long(),
          'attention_mask': attention_mask.long(),
          'labels': torch.Tensor(self.labels[index]),
      }

    return {
        'input_ids': input_ids.long(),
        'attention_mask': attention_mask.long(),
    }

# Model Configurations

In [None]:
# Used parameters to train the models.
class Config:
  def __init__(self):
    super(Config, self).__init__()

    self.SEED = 42
    self.MODEL_PATH = 'maurice/PolitBERT'
    self.NUMBER_POLITICAL_PARTIES = 2
    self.NUMBER_EMOTIONS = 3

    # data
    self.TOKENIZER = AutoTokenizer.from_pretrained(self.MODEL_PATH)
    self.MAX_LENGTH = 320
    self.BATCH_SIZE = 16
    self.VALIDATION_SPLIT = 0.25

     # model
    self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    self.FULL_FINETUNING = True
    self.LR = 3e-5
    self.OPTIMIZER = 'AdamW'
    self.CRITERION = 'BCEWithLogitsLoss'
    self.N_VALIDATE_DUR_TRAIN = 3
    self.N_WARMUP = 0
    self.SAVE_BEST_ONLY = True
    self.EPOCHS = 50

config = Config()

# Define the generic model

In [None]:
# Define the layers of the model. 
class TransformerModel(nn.Module):
  def __init__(self, transformer_model: AutoModel, classes: int):
    super(TransformerModel, self).__init__()
    self.transformer = transformer_model
    self.output = nn.Linear(768, classes)

  def forward(self, input_ids, attention_mask=None, token_type_ids=None):
    _, o2 = self.transformer(
      input_ids=input_ids,
      attention_mask=attention_mask,
      token_type_ids=token_type_ids,
      return_dict=False
    )
    x = self.output(o2)

    return x

# Generic functions for training the model

In [None]:
device = config.DEVICE
device

In [None]:
# Evaluate the current model
def val(model, val_dataloader, criterion):
    
    val_loss = 0
    true, pred = [], []
    
    # Set model.eval() every time during evaluation
    model.eval()
    
    for step, batch in enumerate(val_dataloader):
        # Unpack the batch contents and push them to the device (cuda or cpu).
        b_input_ids = batch['input_ids'].to(device)
        b_attention_mask = batch['attention_mask'].to(device)
        b_labels = batch['labels'].to(device)

        # Using torch.no_grad() during validation/inference is faster 
        # since it does not update gradients.
        with torch.no_grad():
            # Forward pass
            logits = model(input_ids=b_input_ids, attention_mask=b_attention_mask)
            
            # Calculate loss
            loss = criterion(logits, b_labels)
            val_loss += loss.item()
             
            # Since we're using BCEWithLogitsLoss, to get the predictions 
            # sigmoid has to be applied on the logits first
            logits = torch.sigmoid(logits)
            
            logits = np.round(logits.cpu().numpy())
            
            labels = b_labels.cpu().numpy()
            
            # The tensors are detached from the gpu and put back on 
            # the cpu, and then converted to numpy in order to 
            # use sklearn's metrics.
            pred.extend(logits)
            true.extend(labels)

    avg_val_loss = val_loss / len(val_dataloader)
    print('Eval Val loss:', avg_val_loss)
    print('Eval Val accuracy:', accuracy_score(true, pred))
    
    
    val_micro_f1_score = f1_score(true, pred, average='micro')
    print('Eval Val micro f1 score:', val_micro_f1_score)
    return val_micro_f1_score

def train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, epoch):
    
    # Validate config.N_VALIDATE_DUR_TRAIN times during the training loop
    nv = config.N_VALIDATE_DUR_TRAIN
    temp = len(train_dataloader) // nv
    
    if temp > 100:
      temp = temp - (temp % 100)
    validate_at_steps = [temp * x for x in range(1, nv + 1)]
    
    train_loss = 0
    for step, batch in enumerate(tqdm(train_dataloader, 
                                      desc='Epoch ' + str(epoch))):
        # Set model.eval() every time during training
        model.train()
        
        # Unpack the batch contents and push them to the device (cuda or cpu).
        b_input_ids = batch['input_ids'].to(device)
        b_attention_mask = batch['attention_mask'].to(device)
        b_labels = batch['labels'].to(device)

        # Clear accumulated gradients
        optimizer.zero_grad()

        # Forward pass
        logits = model(input_ids=b_input_ids, attention_mask=b_attention_mask)
        
        # Calculate loss
        loss = criterion(logits, b_labels)
        train_loss += loss.item()

        # Backward pass
        loss.backward()

        # Update weights
        optimizer.step()
        
        # Update scheduler
        scheduler.step()

        if step in validate_at_steps:
            print(f'-- Step: {step}')
            _ = val(model, val_dataloader, criterion)
    
    avg_train_loss = train_loss / len(train_dataloader)
    print('Training loss:', avg_train_loss)


In [None]:
def run(model, train_dataloader, val_dataloader, model_name):
    # Setting a seed ensures reproducible results.
    # Seed may affect the performance too.
    torch.manual_seed(config.SEED)

    criterion = nn.BCEWithLogitsLoss()
    
    # Define the parameters to be optmized 
    # and add regularization
    if config.FULL_FINETUNING:
        param_optimizer = list(model.named_parameters())
        no_decay = ["bias", "LayerNorm.bias", "LayerNorm.weight"]
        optimizer_parameters = [
            {
                "params": [
                    p for n, p in param_optimizer if not any(nd in n for nd in no_decay)
                ],
                "weight_decay": 0.001,
            },
            {
                "params": [
                    p for n, p in param_optimizer if any(nd in n for nd in no_decay)
                ],
                "weight_decay": 0.0,
            },
        ]
        optimizer = optim.AdamW(optimizer_parameters, lr=config.LR)
    
    num_training_steps = len(train_dataloader) * config.EPOCHS
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )
    
    max_val_micro_f1_score = float('-inf')
    for epoch in range(config.EPOCHS):
        train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, epoch)
        val_micro_f1_score = val(model, val_dataloader, criterion)
        print("Epoch " + str(epoch) + "/" + str(config.EPOCHS) + ": F1 Score " + str(val_micro_f1_score))
        if config.SAVE_BEST_ONLY:
            if val_micro_f1_score > max_val_micro_f1_score:
                best_model = copy.deepcopy(model)
                best_val_micro_f1_score = val_micro_f1_score

                torch.save(best_model.state_dict(), model_name + '.pt')

                print(f'--- Best Model. Val loss: {max_val_micro_f1_score} -> {val_micro_f1_score}')
                max_val_micro_f1_score = val_micro_f1_score

    return best_model, best_val_micro_f1_score


# Sentiment Analysis

## Get the training data

In [None]:
df = pd.read_csv("Reddit_Data.csv")
X_sent = df['clean_comment']
y_sent = pd.DataFrame(df['category'])

## Pre-process the training data

In [None]:
# Create a column for each sentiment. 
# Each snetiment will be mapped for an individual node in the final layer.
y_sent['-1'] = int(y_sent['category'] == -1)
y_sent['0'] = int(y_sent['category'] == 0)
y_sent['1'] = int(y_sent['category'] == 1)
y_sent = y_sent.drop('category', axis = 1)

## Split dataset into train and test

In [None]:
X_train_sent, X_test_sent, y_train_sent, y_test_sent = train_test_split(X_sent, y_sent, test_size=0.2, random_state=42)

## Train the model

In [None]:
transformer_weights = AutoModel.from_pretrained(
  config.MODEL_PATH
)
sentiment_analysis = TransformerModel(transformer_weights, config.NUMBER_EMOTIONS)

In [None]:
# Run the model on GPU
sentiment_analysis.to(device);

In [None]:
train_data_sent = TransformerDataset(X_train_sent, np.vstack(y_train_sent.values).astype(np.float))
val_data_sent = TransformerDataset(X_test_sent, np.vstack(y_test_sent.values).astype(np.float))

train_dataloader_sent = DataLoader(train_data_sent, batch_size=config.BATCH_SIZE)
val_dataloader_sent = DataLoader(val_data_sent, batch_size=config.BATCH_SIZE)

best_model, best_val_micro_f1_score = run(sentiment_analysis, train_dataloader_sent, val_dataloader_sent, 'sentiment_model')

# Political Analysis

## Get the training data

In [None]:
df = pd.read_csv('./ExtractedTweets.csv')
X_pol = df['Tweet']
y_pol = pd.DataFrame(df['Party'])

## Pre-process the training data

In [None]:
# Create a column for each sentiment. 
# Each party will be mapped for an individual node in the final layer.
y_pol['Democrat'] = int(y_pol['Party'] == 'Democrat')
y_pol['Republican'] = int(y_pol['Party'] == 'Republican')
y_pol = y_pol.drop('Party', axis = 1)

## Split dataset into train and test

In [None]:
X_train_pol, X_test_pol, y_train_pol, y_test_pol = train_test_split(X_pol, y_pol, test_size=0.2, random_state=42)

## Train the model

In [None]:
transformer_weights = AutoModel.from_pretrained(
  config.MODEL_PATH
)
political_model = TransformerModel(transformer_weights, config.NUMBER_POLITICAL_PARTIES)

In [None]:
# Run the model on GPU
political_model.to(device);

In [None]:
train_data_pol = TransformerDataset(X_train_pol, np.vstack(y_train_pol.values).astype(np.float))
val_data_pol = TransformerDataset(X_test_pol, np.vstack(y_test_pol.values).astype(np.float))

train_dataloader_pol = DataLoader(train_data_pol, batch_size=config.BATCH_SIZE)
val_dataloader_pol = DataLoader(val_data_pol, batch_size=config.BATCH_SIZE)

best_model, best_val_micro_f1_score = run(political_model, train_dataloader_pol, val_dataloader_pol, 'political_model')

**Comment**: For both models, the test accuracy and F1-score are quite high (sentiment analysis has an accuracy of 0.94 and for political classification ~0.82)