# Inference on Electoral Program

**Author:** [Giuseppe Tripodi](https://www.linkedin.com/in/giuseppe-tripodi-unical/)<br>
**Date created:** 2022/11/12<br>
**Description:** Predict model results on a test set consisting of electoral programs

# Setup

## Install package

In [None]:
!pip install datasets transformers
!pip install sentencepiece
!pip install sacremoses
!pip install nltk
!pip install transformers
!pip install evaluate
!pip install wandb

## Import Libraries

In [None]:
import json
import os
import csv
import re
import wandb
import transformers
from transformers import AutoConfig, AutoModelForSequenceClassification, TrainingArguments, Trainer, \
    EarlyStoppingCallback
from datasets import load_dataset, load_metric
from transformers import AutoTokenizer
from sklearn import preprocessing
import numpy as np
import evaluate
from transformers.integrations import TensorBoardCallback
import transformers
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
#load dataset
from datasets import load_dataset, load_metric
#tockenizer
from transformers import AutoTokenizer
from transformers import Pipeline, TextClassificationPipeline
import numpy as np
from datasets import load_dataset, load_metric
import pandas as pd
import torch
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from math import pi
from datetime import date
from sklearn.metrics import plot_confusion_matrix
from os import path, listdir
from os.path import isfile, join
import seaborn as sns

## Setup Weight&Biases and General variables

In [None]:
today = date.today()
today = today.strftime("%b-%d-%Y")

In [None]:
%env WANDB_PROJECT=
%env WANDB_LOG_MODEL=
%env WANDB_API_KEY=

In [None]:
wandb.login()

## Support Functions


In [None]:
def softmax(outputs):
    maxes = np.max(outputs, axis=-1, keepdims=True)
    shifted_exp = np.exp(outputs - maxes)
    return shifted_exp / shifted_exp.sum(axis=-1, keepdims=True)

In [None]:
def join_csv(input_dir, output_name):
    """
    takes the csv in input_dir and concatenates them
    """
    files = [f for f in listdir(input_dir) if isfile(join(input_dir, f))]
    li = []
    indexes = []
    for file in files:
        df = pd.read_csv(join(input_dir, file), index_col=None, header=0)
        index = file[:file.index(".")]
        li.append(df)
        indexes.append(index)

    frame = pd.concat(li, axis=0, ignore_index=True)
    frame["indexes"] = indexes
    frame.set_index("indexes", inplace=True)
    
    #save the output as a csv file
    frame.to_csv(output_name)
    
    return frame

### TextClassification Pipeline

In [None]:
class MyTextClassificationPipeline(TextClassificationPipeline):
    """
    Custom text classification pipeline
    """
    def _sanitize_parameters(self, **kwargs):
        """
        checks the parameters passed. Returns three dict of kwargs
        that will be passed to preprocess, _forward and postprocess.
        :param kwargs: 
        :return: 
        """""
        return {}, {}, {}

    def preprocess(self, inputs):
        """
        Takes the input and turn it into something feedable to the model
        :param inputs:
        :param maybe_arg:
        :return:
        """
        return self.tokenizer(inputs, return_tensors=self.framework)

    def _forward(self, model_inputs):
        """
        forward step
        :param model_inputs:
        :return:
        """
        return self.model(**model_inputs)

    def postprocess(self, model_outputs):
        """
        turns the output of the forward step into the final output
        :param model_outputs:
        :return:
        """
        logits = model_outputs.logits[0].numpy()
        probabilities = softmax(logits)

        best_class = np.argmax(probabilities)
        label = self.model.config.id2label[best_class]
        score = probabilities[best_class].item()
        logits = logits.tolist()
        return {"label": label, "best_class_code": best_class, "score": score, "logits": logits}

### Compute Metrics

In [None]:
PLOT_PATH = "./"
NUM_LABELS_plot = 6
class ComputeMetrics:
    """
    A class used to compute metrics on model output and plot the results.
    ...

    Methods
    ---------
    compute_metrics()
        returns the computed metrics
        
    plot_consistency_for_politician()
        plots a bar pot of true positive and total prediction
    
    confusion_matrix_plot()
        plots the confusion matrix
    
    misclassification_pie_chart()
        draws a pie chart of false positives
        

    """
    def __init__(self, model_predictions, model_inputs, mapping, tc2=False, tags=""):
        """
        :param model_predictions: dict
            output of the TextClassificationPipeline
        :param model_inputs: DataFrame
            model input
        :param mapping: 
            mapping between label and associated id, used to map input labels to ids used by models
        :parma tc2: boolean
            This is true if you perform text classification on election programs. Input labels are different in text classification of election programs.
        """
        self.model_predictions = model_predictions
        self.model_inputs = model_inputs
        self.tags = tags
        self.mapping = mapping
        self.tc2 = tc2


        # DEFINE Y_PRED AND Y_TRUE
        self.references_labels = self.model_inputs["label"].map(mapping).tolist() # y_true
        if self.tc2:
            #  If tc2, the labels must be changed because "CarloCalenda" and "MatteoRenzi" have the same labels on the test set.
            mapping_prediction_label = {
                "CarloCalenda": self.mapping["AzioneItaliaviva"],
                "EnricoLetta":self.mapping["PD"],
                "GiorgiaMeloni":self.mapping["FratellidItalia"],
                "GiuseppeConte":self.mapping["Movimento5Stelle"],
                "MatteoRenzi":self.mapping["AzioneItaliaviva"],
                "MatteoSalvini":self.mapping["Lega"],
                "SilvioBerlusconi":self.mapping["ForzaItalia"]
            }
            self.predictions_labels = pd.DataFrame(self.model_predictions)["label"].map(mapping_prediction_label).tolist()
        else: 
            self.predictions_labels = pd.DataFrame(self.model_predictions)["best_class_code"].tolist() #y_pred

            
            
    def compute_metrics(self):  
        """
        Prints the values of: Accuracy, F1, precision and recall
        """
        # load and define the different metrics
        accuracy = evaluate.load('accuracy')
        f1 = evaluate.load('f1', average='macro')
        precision = evaluate.load('precision')
        recall = evaluate.load('recall', average='macro')
        roc_auc_score = evaluate.load("roc_auc", "multiclass")

        # print metrics
        print(accuracy.compute(predictions=self.predictions_labels, references=self.references_labels))
        print(f1.compute(predictions=self.predictions_labels, references=self.references_labels, average='weighted'))
        print(precision.compute(predictions=self.predictions_labels, references=self.references_labels, average='weighted'))
        print(recall.compute(predictions=self.predictions_labels, references=self.references_labels, average='weighted'))

        # ROC AUC
        pred_scores = pd.DataFrame(self.model_predictions)["logits"].transform(softmax)
        try:
            print(self.roc_auc_score.compute(references=self.references_labels, prediction_scores=pred_scores,multi_class='ovr', labels=[0, 1, 2, 3, 4, 5, 6]))
        except:
            pass
        
    
    def plot_consistency_for_politician(self):
        """
        plots a bar pot of true positive and total prediction number
        """
        # compute the confusion matrix
        matrix = confusion_matrix(self.references_labels, self.predictions_labels, labels=np.arange(len(self.mapping.keys())))
        # takes only the TP
        diagonal = matrix.diagonal()
        # takes the number of predictions
        tot_ele = []
        for i in range(len(matrix)):
            tot_ele.append(sum(matrix[i]))
        
        #plot the results
        politician = self.mapping.keys()
        X_axis = np.arange(len(politician))

        fig = plt.figure(figsize=(10, 5))
        #creating the bar plot
        plt.bar(X_axis - 0.2,diagonal, color="maroon", width=0.4, label="Correct predictions")
        plt.bar(X_axis + 0.2,tot_ele, color="#E5BABA", width=0.4, label="Total number of predictions")

        plt.xticks(X_axis, politician)
        plt.xlabel("Electoral programs")
        plt.ylabel("number of predictions")
        plt.title("Italian Electoral Programs Accuracy", fontsize=12)
        plt.legend()
        plt.savefig(f"{PLOT_PATH}/accuracy_for_politician_{'tc2' if self.tc2 else 'tc1'}_{self.tags}_{today}.png")
        
        # print the percentage
        for i in range(len(tot_ele)):
            print(f"Politico: {list(politician)[i]}")
            print(f"predizioni corrette:{diagonal[i]}\npredizioni totali: {tot_ele[i]}")
            print(f"Accuracy: {diagonal[i] / tot_ele[i]}")
            print("\n")
    
    def confusion_matrix_plot(self):
        """
        Plots the confusion matrix
        """
        disp = ConfusionMatrixDisplay.from_predictions(y_true=self.references_labels, y_pred= self.predictions_labels, labels=np.arange(len(self.mapping.keys())), display_labels=list(self.mapping.keys()), cmap=plt.cm.Blues)
        fig = disp.ax_.get_figure()
        fig.set_figwidth(15)
        fig.set_figheight(10)
        plt.title("Confusion Matrix", fontsize=14)
        plt.savefig(f"{PLOT_PATH}/confusion_matrix_{'tc2' if self.tc2 else 'tc1'}_{self.tags}_{today}.png")

    def misclassification_pie_chart(self):
        y_pred = np.array(self.predictions_labels)
        y_true = np.array(self.references_labels)

        # takes only the misclassified element
        y_pred_mis = y_pred[y_pred != y_true]
        y_true_mis = y_true[y_pred != y_true]
        matrix = confusion_matrix(y_true_mis, y_pred_mis, labels=np.arange(len(self.mapping.keys())))
        politician_names = list(self.mapping.keys())

        colors = plt.get_cmap('Blues')(np.linspace(0.2, 0.7, len(matrix[0])))
        fig, axs = plt.subplots(nrows=4, ncols=2, figsize=(15, 15))
        fig.tight_layout()
        i = 0
        for ax in axs.ravel():
            if i < len(matrix):
                ax.set_title(politician_names[i], fontsize=15)
                label = [politician_names[pol_name] if matrix[i][pol_name] != 0 else None for pol_name in range(len(politician_names))]
                ax.pie(matrix[i], colors=colors,labels=label)
                i += 1
            else:
                # last pie
                ax.pie([1, 0, 0, 0, 0, 0, 0])
        plt.savefig(f"{PLOT_PATH}/politician_misclassification_{'tc2' if self.tc2 else 'tc1'}_{self.tags}_{today}.png")

In [None]:
def define_structure_for_line_plots(dataset_input, eval_predict) -> pd.DataFrame:
    """
    creates the dataframe that will be used to generate all line graphs
    :return: dataframe
    """
    df = pd.DataFrame(eval_predict).drop(["logits", "best_class_code"], axis=1)
    df["label"] = df["label"].map({
            "CarloCalenda":"AzioneItaliaviva",
            "MatteoRenzi":"AzioneItaliaviva",
            "EnricoLetta":"PD",
            "GiorgiaMeloni":"FratellidItalia",
            "GiuseppeConte":"Movimento5Stelle",
            "MatteoSalvini":"Lega",
            "SilvioBerlusconi":"ForzaItalia"
        })
    df = df.rename(columns={"label":"assigned_label"})
    #modified dataframe in input
    df_input = dataset_input.copy()
    df_input.rename(columns={"label":"original_label"}, inplace=True)
        
    # concat the dataframe
    df_input = pd.concat([df_input, df], axis=1)
    return df_input

In [None]:
def plot_bar_category_score_by_program(df: pd.DataFrame,title:str, tags ):
    fig, ax = plt.subplots(figsize=(15,8))
    ax.set_title(title, fontsize=15)
    sns.barplot(data=df, x="category", y="score", hue="original_label", ax=ax)
    plt.xticks(rotation=30)
    plt.savefig(f"{PLOT_PATH}/{title}_{tags}_{today}.png")

# Inference: Programs divided by Category

## Read test set

In [None]:
df_programs = pd.read_csv("/kaggle/input/text-classification-2/it/programs_by_index_by_nltk.csv")
arguments = df_programs.columns

# define the mapping between label and id for the text classification 2
mapping = {
            "AzioneItaliaviva":0,
            "PD":1,
            "FratellidItalia":2,
            "Movimento5Stelle":3,
            "Lega":4,
            "ForzaItalia":5
        }

In [None]:
df_programs.label.unique()

In [None]:
df_programs.head()

## Download Model

In [None]:
run = wandb.init()
#artifact = run.use_artifact('giusetrip98/ItalianPoliticianConsistency/alberto_tc2_new_speech_and_tweets:v0', type='model')
artifact = run.use_artifact('giusetrip98/ItalianPoliticianConsistency/alberto_tc2_new_speech_and_tweets:v0', type='model')
#artifact = run.use_artifact('giusetrip98/ItalianPoliticianConsistency/gilberto_tc2_new_speech_and_tweets:v0', type='model')

#TEST 
#END TEST
artifact_dir = artifact.download()

#checkpoint = "Musixmatch/umberto-wikipedia-uncased-v1"
#checkpoint = "idb-ita/gilberto-uncased-from-camembert"
checkpoint = "m-polignano-uniba/bert_uncased_L-12_H-768_A-12_italian_alb3rt0"

In [None]:
# Load Hugging Face model from that folder using the same model class
NUM_LABELS = 7
model = AutoModelForSequenceClassification.from_pretrained(artifact_dir, num_labels=NUM_LABELS)

tokenizer = AutoTokenizer.from_pretrained(checkpoint, padding=True, truncation=False)
my_pipeline = MyTextClassificationPipeline(model=model, tokenizer=tokenizer)


## Prediction and Plotting

In [None]:
#df_programs.drop([1150, 2908], inplace=True)
"""
# Test to find the problem
l = df_programs["text"].tolist()
for i in range(len(l)):
    print(f"last computed {i}: {l[i][:12]}")
    my_pipeline(l[i])
"""

In [None]:
eval_predict = my_pipeline(df_programs["text"].tolist())

In [None]:
test = df_programs.iloc[0]["text"]
test
my_pipeline(test)

In [None]:
# Metric computation
tags = "by_index_extraction2"
cm = ComputeMetrics(eval_predict, df_programs, mapping, tc2=True, tags=tags)
print(cm.compute_metrics())

In [None]:
cm.plot_consistency_for_politician()

In [None]:
cm.confusion_matrix_plot()

In [None]:
cm.misclassification_pie_chart()

## Analysis by category

In [None]:
tags = "by_category_extraction2"

In [None]:
#define the new structure 
df_index = define_structure_for_line_plots(df_programs, eval_predict)
df_index.head()

### Accuracy Analysis for category

In [None]:
for_pol = df_index.groupby(["original_label", "category"]).count()
for_pol["TP"] = df_index[df_index['assigned_label'] == df_index['original_label']].groupby(["original_label", "category"]).agg({"score": "count"})
for_pol["Acc"] = for_pol["TP"] / for_pol["score"] 
for_pol["Acc"]

In [None]:
general = df_index.groupby(by="category").agg({"score": "count"})
general["TP"] = df_index[df_index['assigned_label'] == df_index['original_label']].groupby(by="category").agg({"score": "count"})
general["acc"] = general["TP"] / general["score"]
general

### Score Analysis for category

In [None]:
title= "Prediction Score by Category"
plot_bar_category_score_by_program(df_index,title, tags)

#### Misclassified Elements

In [None]:
# plot the misclassified element score
df_index_mis = df_index[df_index["original_label"] != df_index["assigned_label"]]
df_index_mis.to_csv("dataframe_con_solo_misclassificazioni.csv")
print(df_index_mis.info())
title= "Prediction Score by Category - Misclassified Element"
plot_bar_category_score_by_program(df_index_mis, title, tags)

In [None]:
# compute avarage score of misclassification
grouped_single_tweets = df_index_mis.groupby('original_label').agg({'score': ['mean', 'max', 'min', 'count']})
print(grouped_single_tweets)

#### Correct Predictions

In [None]:
# plot the correct element score
df_index_corr = df_index[df_index["original_label"] == df_index["assigned_label"]]
print(df_index_corr.info())
title= "Prediction Score by Category - True Positive"
plot_bar_category_score_by_program(df_index_corr, title, tags)

In [None]:
# compute avarage score of correct predictions
grouped_single_tweets = df_index_corr.groupby('original_label').agg({'score': ['mean', 'max', 'min', 'count']})
print(grouped_single_tweets)