# Important Links

### Install Pytorch on windows:
https://saturncloud.io/blog/how-to-install-pytorch-on-windows-using-conda/

### Preprocessing:
https://towardsdatascience.com/basic-tweet-preprocessing-in-python-efd8360d529e

### XAI:
https://www.mzes.uni-mannheim.de/socialsciencedatalab/article/bert-explainable-ai/
https://towardsdatascience.com/introducing-transformers-interpret-explainable-ai-for-transformers-890a403a9470
https://github.com/cdpierse/transformers-interpret
https://levelup.gitconnected.com/huggingface-transformers-interpretability-with-captum-28e4ff4df234
https://silviatulli.com/2021/11/02/explaining-the-outputs-of-transformers-models-a-working-example/
https://brainsteam.co.uk/2022/03/14/painless-explainability-for-text-models-with-eli5/#eli5-and-transformershuggingface

# Imports

In [None]:
## install libraries
# !pip install transformers[torch]
# !pip install accelerate -U
# !pip install -U xformers
# !pip install datasets evaluate
# !!pip install emoji
# !pip install scikit-learn scipy matplotlib
# !pip install openpyxl --upgrade
# !pip install wordcloud
# !pip install nltk
# !pip install tweet-preprocessor
# !pip install captum
# !pip install transformers-interpret
# !pip install eli5

In [None]:
## Imports

# General
from copy import deepcopy
from collections import Counter
import os
from numpy.random import seed
from sklearn.utils import shuffle
import string
from matplotlib.colorbar import ColorbarBase


# Data processing
import pandas as pd
import numpy as np
from tqdm import tqdm
from nltk.stem import WordNetLemmatizer

# Modeling
# import tensorflow as tf
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback, TextClassificationPipeline
from transformers.pipelines import TextClassificationPipeline
import torch
torch.set_flush_denormal(True)

# XAI
from captum.attr import LayerIntegratedGradients, TokenReferenceBase
from transformers_interpret import SequenceClassificationExplainer, MultiLabelClassificationExplainer
from eli5.lime import TextExplainer
import eli5 

# Hugging Face Dataset
from datasets import Dataset

# Model performance evaluation
import evaluate

# NLP
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer

# Visualization
import matplotlib.pyplot as plt

# from google.colab import drive
# drive.mount('/content/drive')

path = 'dataset/'

# Reading dataset

In [None]:
dataset_type = 'racism'

original_text_column = 'data_text'
label_column = 'label'

dataset_file = 'Racism.xlsx'

df_hate_speech = pd.read_excel(path+dataset_file)[original_text_column]

df_non_hate_speech = pd.read_csv(path+'non_hate_speech.csv')[original_text_column]

df_non_hate_speech = shuffle(df_non_hate_speech, random_state=42)
df_non_hate_speech = df_non_hate_speech.head(df_hate_speech.shape[0])

label = [1]*df_non_hate_speech.shape[0] + [0]*df_non_hate_speech.shape[0]

hate_speech_df = pd.concat([df_hate_speech, df_non_hate_speech]).to_frame()

hate_speech_df.columns = [original_text_column]

text_column = original_text_column

hate_speech_df[label_column] = label

hate_speech_df.dropna(subset=[original_text_column], inplace=True)

hate_speech_df = shuffle(hate_speech_df, random_state=42)

hate_speech_df.reset_index(drop=True, inplace=True)

hate_speech_df

# Data Handler

In [None]:
import emoji
import preprocessor as p
import re

import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
# Create a set of stop words
stop_words = set(stopwords.words('english'))

class DataHandler():
    def __init__(self, df, text_column, label_column, random_state=42):
        self.random_state = random_state
        self.df = df
        self.text_column = text_column
        self.processed_text_column = None
        self.label_column = label_column
        self.number_of_labels = len(df[label_column].value_counts())


    def __demojize_text(self, text):
        return emoji.demojize(text)

    def __remove_words_with_euro(self, input_string):
        # Define a regular expression pattern to match words containing 'euro'
        pattern = r'\b\w*#?euro\w*\b'
        # Use re.sub to replace matching words with an empty string
        result = re.sub(pattern, '', input_string)

        return result

    def __remove_stop_words(self, sentence):
        # Split the sentence into individual words
        words = sentence.split()
        # Use a list comprehension to remove stop words
        filtered_words = [word for word in words if word not in stop_words]
        # Join the filtered words back into a sentence
        return ' '.join(filtered_words)

    def __preprocess_sentence(self, text, setup):

        if setup['lower_case']:
            text = text.lower()

        if setup['remove_emojis']:
            text = self.__demojize_text(text)

        if setup['remove_stop_words']:
            text = self.__remove_stop_words(text)

        if setup['remove_numbers']:
            text = text.replace('\d+', '') # Removing numbers

        # text = p.clean(text) #heavy cleaning
        
        new_text = []
        for t in text.split(" "):
            # t = remove_words_with_euro(t)

            if setup['remove_users']:
                t = '' if t.startswith('@') and len(t) > 1 else t
                # t = '@user' if t.startswith('@') and len(t) > 1 else t
            if setup['remove_urls']:
                t = '' if t.startswith('http') else t
                # t = 'http' if t.startswith('http') else t

            new_text.append(t)

        new_text = " ".join(new_text)
        
        if setup['lemmatize']:
            wnl = WordNetLemmatizer()
            list2 = nltk.word_tokenize(new_text)
            new_text = ' '.join([wnl.lemmatize(words) for words in list2])

        return new_text

    def get_text_column_name(self):
        if self.processed_text_column:
            return self.processed_text_column
        else:
            return self.text_column

    def get_top_words(self, n=100):

        temp_text_column = self.get_text_column_name()

        # Combine all tweets into a single string
        all_tweets = " ".join(self.df[temp_text_column])

        # Tokenize the text
        words = word_tokenize(all_tweets)

        # Remove stopwords and non-alphabetic words
        stop_words = set(stopwords.words('english'))
        words = [word.lower() for word in words if word.isalpha() and word.lower() not in stop_words]

        # Calculate word frequencies
        word_freq = Counter(words)

        # Get the top n words
        top_words_and_count = word_freq.most_common(n)
        top_words = [word for word, counter in top_words_and_count]
        counters = [counter for word, counter in top_words_and_count]

        return {'words':top_words, 'counters':counters}


    def get_top_words_tfidf(self, n):

        temp_text_column = self.get_text_column_name()

        # Create a TF-IDF vectorizer
        tfidf_vectorizer = TfidfVectorizer(max_features=n, stop_words='english')

        # Fit and transform the text data
        tfidf_matrix = tfidf_vectorizer.fit_transform(self.df[temp_text_column])

        # Get feature names (words)
        feature_names = tfidf_vectorizer.get_feature_names_out()

        # Sum the TF-IDF scores for each word across all tweets
        word_scores = tfidf_matrix.sum(axis=0)

        # Sort words by their TF-IDF scores
        top_indices = word_scores.argsort()[0, ::-1][:n]

        # Get the top n words and their TF-IDF scores
        top_words = [(feature_names[i], word_scores[0, i]) for i in top_indices]

        return top_words[0][0][0]

    def preprocess(self, setup):

        self.df.dropna(subset=[self.text_column], inplace=True)
        self.df.reset_index(drop=True, inplace=True)

        self.processed_text_column = 'processed_'+self.text_column
        self.df[self.processed_text_column] = self.df.apply(lambda x: self.__preprocess_sentence(x[self.text_column], setup), axis=1)

        if setup['remove_non_text_characters']:
            pattern = re.compile(r'[^\x00-\x7F]+')
            self.df[self.processed_text_column] = self.df.apply(lambda x: pattern.sub('', x[self.processed_text_column]), axis=1)

        return self.df

    def unsample(self):

        # temp_text_column = self.get_text_column_name()
        # columns = [temp_text_column, self.label_column]

        columns = [self.text_column, self.processed_text_column, self.label_column]

        processed_df_grouped = self.df[columns].groupby(self.label_column)
        processed_df_grouped.groups.values()

        frames_of_groups = [x.sample(processed_df_grouped.size().min(), random_state=self.random_state) for y, x in processed_df_grouped]
        self.df = pd.concat(frames_of_groups)

        self.df = shuffle(self.df, random_state=self.random_state)
        self.df.reset_index(drop=True, inplace=True)

        return self.df

    def split_train_test_dataset(self, train_size=0.8):
        # Training dataset
        train_data = self.df[[self.get_text_column_name(), self.label_column]].sample(frac=train_size, random_state=self.random_state)

        # Testing dataset
        test_data = self.df[[self.get_text_column_name(), self.label_column]].drop(train_data.index)

        return train_data, test_data

# ----------------------------------------------------------------------------------------------------

preprocessing_setup = {
    'lower_case': True,
    'remove_emojis': False,
    'remove_stop_words': True,
    'remove_numbers': False,
    'remove_users': True,
    'remove_urls': True,
    'remove_non_text_characters': True,
    'lemmatize': False
}


data_handler = DataHandler(df=hate_speech_df, text_column=original_text_column, label_column=label_column)

data_handler.preprocess(setup=preprocessing_setup)

data_handler.unsample()

# print(data_handler.get_top_words(100))
# print(data_handler.get_top_words_tfidf(100))

train_data, test_data = data_handler.split_train_test_dataset()
data_handler.df

# Language Model Handler

In [None]:
class LanguageModelHandler():
    def __init__(self, model_name, dataset_type, text_column, label_column):
        self.model_name = model_name
        self.tokenizer = None
        self.model = None
        self.trainer = None
        self.pipeline = None
        self.num_labels = 0
        self.text_column = text_column
        self.label_column = label_column
        self.dataset_type = dataset_type
        self.create_tokenizer()

        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.device = 0 if torch.cuda.is_available() else -1


    def test_gpu(self):
        print(f"Is CUDA supported by this system? {torch.cuda.is_available()}")
        print(f"CUDA version: {torch.version.cuda}")
        # Storing ID of current CUDA device
        cuda_id = torch.cuda.current_device()
        print(f"ID of current CUDA device: {torch.cuda.current_device()}")
        print(f"Name of current CUDA device: {torch.cuda.get_device_name(cuda_id)}")

    def create_tokenizer(self):
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        return self.tokenizer
    
    def __tokenize_dataset(self, data):
        return self.tokenizer(data[self.text_column], max_length= 32,
                              truncation=True,
                              padding="max_length")

    def add_new_tokens_to_tokenizer(self, new_tokens):
        if self.tokenizer is not None:
            number_of_tokens_added = self.tokenizer.add_tokens(new_tokens=specific_words)

            if self.model is not None:
                print('### Resizing the model embeddings layer...')
                self.model.resize_token_embeddings(len(self.tokenizer))

            return number_of_tokens_added

    def prepare_training_testing_datasets(self, train_data, test_data):
        self.hg_train_data = Dataset.from_pandas(train_data)
        self.hg_test_data = Dataset.from_pandas(test_data)

        self.num_labels =len(train_data[self.label_column].value_counts())

        # Tokenize the dataset
        self.tokenized_dataset_train = self.hg_train_data.map(self.__tokenize_dataset)
        self.tokenized_dataset_test = self.hg_test_data.map(self.__tokenize_dataset)

        return self.tokenized_dataset_train, self.tokenized_dataset_test

    def create_model(self):
        try:
            self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name,
                                                                            num_labels=self.num_labels,
                                                                            id2label={0: 'non-'+self.dataset_type, 1:self.dataset_type})
        except:
            print('Error to import the model, ignore mismatched sizes')
            self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name,
                                                                            num_labels=self.num_labels,
                                                                            ignore_mismatched_sizes=True,
                                                                            id2label={0: 'non-'+self.dataset_type, 1:self.dataset_type})
        self.model.to(self.device)

        return self.model

    # Function to compute the metric
    def __compute_metrics(self, eval_pred):
        metric_accuracy = evaluate.load("accuracy")
        metric_precision = evaluate.load("precision")
        metric_recall = evaluate.load("recall")
        metric_f1 = evaluate.load("f1")

        logits, labels = eval_pred
        # probabilities = tf.nn.softmax(logits)
        predictions = np.argmax(logits, axis=1)

        results = {
            'accuracy': metric_accuracy.compute(predictions=predictions, references=labels),
            'precision': metric_precision.compute(predictions=predictions, references=labels),
            'recall': metric_recall.compute(predictions=predictions, references=labels),
            'f1': metric_f1.compute(predictions=predictions, references=labels)
        }

        return results

    def train_evaluate_model(self, training_args, early_stopping_patience, iterations):

        results_summary = {}
        detailed_metrics = ['eval_accuracy', 'eval_precision', 'eval_recall',  'eval_f1']

        model = deepcopy(self.model)

        self.trainer = Trainer(
            model = model,
            args = training_args,
            train_dataset = self.tokenized_dataset_train,
            eval_dataset = self.tokenized_dataset_test,
            compute_metrics = self.__compute_metrics
        )

        if early_stopping_patience:
            self.trainer.callbacks = [EarlyStoppingCallback(early_stopping_patience=early_stopping_patience)]

        self.trainer.train()

        results = self.trainer.evaluate(self.tokenized_dataset_test)

        for metric in results:
            if metric not in results_summary:
                if metric in detailed_metrics:
                    results_summary[metric] = [results[metric]["".join(metric.split('eval_'))]]
                else:
                    results_summary[metric] = [results[metric]]
            else:
                if metric in detailed_metrics:
                    results_summary[metric].append(results[metric]["".join(metric.split('eval_'))])
                else:
                    results_summary[metric].append(results[metric])
        
        # torch.cuda.empty_cache()
        
        return results_summary, self.trainer

    def __create_classification_column(self, df, classification_column='classification'):
        # Add a new column 'classification' with 0 if 'non-sexist' has higher probability, else 1
        df[classification_column] = df.apply(lambda row: 0 if row['non-'+self.dataset_type] > row[self.dataset_type] else 1, axis=1)
        return df

    def __data_loader(self, dataframe, column=1):
        for row in dataframe.values:
            yield row[column] # Getting the text of the tweet

    def classify_unlabaled_datasets(self, dataset_name_file, result_file_name, batch_size_to_save):
        
        if self.pipeline is None:
            self.pipeline = pipeline('text-classification', model=self.model,
                                     tokenizer=self.tokenizer, device=self.device)

        df = pd.read_csv(dataset_name_file)#.head(4000)
        df = df.loc[:, ~df.columns.str.contains('^Unnamed')]

        if os.path.isfile(result_file_name): # if the results file exists
            df_results = pd.read_csv(result_file_name)
            df = df.tail(df.shape[0] - df_results.shape[0])
        else:
            df_results = pd.DataFrame(columns=list(self.model.config.id2label.values()))

        i = 0

        for prediction in tqdm(self.pipeline(self.__data_loader(df), batch_size=32, return_all_scores=True), total=df.shape[0]):
            result = {
                text_column: [df.iloc[i]['text']],
                prediction[0]['label']: [prediction[0]['score']],
                prediction[1]['label']: [prediction[1]['score']]
            }

            df_results = pd.concat([df_results, pd.DataFrame.from_dict(result)])

            if i % batch_size_to_save == 0 and i > 0:
                self.__create_classification_column(df_results, self.dataset_type).to_csv(result_file_name, index=False)
            i += 1

        self.__create_classification_column(df_results, self.dataset_type).to_csv(result_file_name, index=False)#['label_match'].value_counts()
    
    def predict_proba(self, texts_array):
        
        if self.pipeline is None:
            self.pipeline = pipeline('text-classification', model=self.model,
                                     tokenizer=self.tokenizer, device=self.device)
            
        all_results = []
        
        for predictions in tqdm(self.pipeline(self.__data_loader(pd.DataFrame(texts_array), column=0), 
                                              batch_size=32, return_all_scores=True), 
                                total=len(texts_array)):
        #for predictions in [{'label': 'non-racism', 'score': 0.44055721163749695}, {'label': 'racism', 'score': 0.5594428181648254}]:
            all_results.append([prediction['score'] for prediction in predictions])
        
        return np.array(all_results)
                            
    

# --------------------------------------------------------------------------------

language_model_manager = LanguageModelHandler(model_name= 'bert-base-uncased', #'cardiffnlp/twitter-roberta-base-offensive'
                                              dataset_type=dataset_type,
                                              text_column=data_handler.get_text_column_name(),
                                              label_column=data_handler.label_column)

print(language_model_manager.test_gpu())

language_model_manager.prepare_training_testing_datasets(train_data, test_data)

language_model_manager.create_model()

# Set up training arguments
training_args = TrainingArguments(
    output_dir="./sentiment_transfer_learning_transformer/",
    logging_dir='./sentiment_transfer_learning_transformer/logs',
    logging_strategy='epoch',
    logging_steps=100,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    learning_rate=5e-6,
    save_strategy='epoch',
    save_steps=100,
    evaluation_strategy='epoch',
    eval_steps=100,
    load_best_model_at_end=True,
    num_train_epochs=10,
    # seed=42
)

results, trainer = language_model_manager.train_evaluate_model(training_args=training_args,
                                                               early_stopping_patience=2,
                                                               iterations=1) # '''


# XAI

In [None]:
class ExplainableTransformerPipeline():
    """Wrapper for Captum framework usage with Huggingface Pipeline"""

    def __init__(self, model, tokenizer, device, pipeline_name='text-classification'):
        
        if 'Roberta' in model.__class__.__name__:
            self.__name = 'roberta'
        elif 'Bert' in model.__class__.__name__:
            self.__name = 'bert'
            
        self.__pipeline = pipeline(pipeline_name, model=model, tokenizer=tokenizer, device=device)
        self.__cls_explainer = SequenceClassificationExplainer(model, tokenizer)
        self.__device = device

    def forward_func(self, inputs, position = 0):
        """
            Wrapper around prediction method of pipeline
        """
        pred = self.__pipeline.model(inputs, attention_mask=torch.ones_like(inputs))
        return pred[position]

    def visualize_word_importance_in_sentence(self, text:str):
        
        word_attributions = self.__cls_explainer(text)

        print('Prediction:', self.__cls_explainer.predicted_class_name)
        print('Words importance:', word_attributions)

        self.__cls_explainer.visualize()


    def visualize_word_importance(self, inputs: list, attributes: list, prediction:str):
        """
            Visualization method.
            Takes list of inputs and correspondent attributs for them to visualize in a barplot
        """
        attr_sum = attributes.sum(-1)

        attr = attr_sum / torch.norm(attr_sum)

        word_importance = pd.Series(attr.cpu().numpy()[0],
                         index = self.__pipeline.tokenizer.convert_ids_to_tokens(inputs.detach().cpu().numpy()[0],skip_special_tokens=False))

        print(word_importance)

        plt.title(prediction)
        plt.show(word_importance.plot.barh(figsize=(10,20)))

        return word_importance
    
    def __generate_inputs(self, text: str):
        """
            Convenience method for generation of input ids as list of torch tensors
        """
        return torch.tensor(self.__pipeline.tokenizer.encode(text, add_special_tokens=False), 
                            device = self.__device).unsqueeze(0)
    
    def generate_baseline(self, sequence_len: int):
        """
            Convenience method for generation of baseline vector as list of torch tensors
        """
        return torch.tensor([self.__pipeline.tokenizer.cls_token_id] + [self.__pipeline.tokenizer.pad_token_id] * (sequence_len - 2) + [self.__pipeline.tokenizer.sep_token_id], device = self.__device).unsqueeze(0)

    def __clean_text_for_explanation(self, text):
        text = re.sub(r'(?<=:)\s+|\s+(?=:)', '', text)
        text = emoji.emojize(text)
        
        regular_punct = list(string.punctuation) # python punctuations 
        special_punct = ['©', '^', '®',' ','¾', '¡','!'] # user defined special characters to remove 
        
        for punc in regular_punct:
            if punc in text:
                text = text.replace(punc, ' ')
                
        return text.strip()
        
    ## LIME
    def model_adapter(self, texts):
    
        all_scores = []
        batch_size = 64

        for i in range(0, len(texts), batch_size):

            batch = texts[i:i+batch_size]
            
            # use bert encoder to tokenize text 
            encoded_input = self.__pipeline.tokenizer(batch, 
                              return_tensors='pt', 
                              padding=True, 
                              truncation=True, 
                              max_length=self.__pipeline.model.config.max_position_embeddings-2)
            
            for key in encoded_input:
                encoded_input[key] = encoded_input[key].to(self.__device)
                
            output = self.__pipeline.model(**encoded_input)
            # by default this model gives raw logits rather 
            # than a nice smooth softmax so we apply it ourselves here
            
            scores = output[0].softmax(1).detach().cpu().numpy()

            all_scores.extend(scores)

        return np.array(all_scores)
            
    
    def get_most_impactful_words_lime(self, text, keyword, word_importance_results):
        
        prediction = self.__pipeline(text)[0]['label']
        
        if prediction == keyword:
            print(text)
            te = TextExplainer(n_samples=500, random_state=42)
            te.fit(text, self.model_adapter)
            
            graphic_explanation = te.explain_prediction(target_names=list(self.__pipeline.model.config.id2label.values()))

            print(graphic_explanation.targets)

            for element in graphic_explanation.targets:
                for f in element.feature_weights.pos:
                    for word in f.feature.split():
                        if word in word_importance_results:
                            word_importance_results[word] += f.weight
                        else:
                            word_importance_results[word] = f.weight
                return word_importance_results, graphic_explanation
        else:
            return word_importance_results, None

    
    
    ## INTEGRATED GRADIENTS
    def explain(self, text: str):
        """
            Main entry method. Passes text through series of transformations and through the model.
            Calls visualization method.
        """
        prediction = self.__pipeline.predict(text)
        inputs = self.__generate_inputs(text)
        baseline = self.generate_baseline(sequence_len = inputs.shape[1])
        
        print('inputs', len(inputs[0]))
        # print('se liga:', self.__pipeline.model.config.label2id)

        lig = LayerIntegratedGradients(self.forward_func, 
                                       getattr(self.__pipeline.model, self.__name).embeddings)

        # For some reason we need to swap the label dictionary
        labels_swaped = {v: k for k, v in self.__pipeline.model.config.id2label.items()}

        attributes, delta = lig.attribute(inputs=inputs,
                                  baselines=baseline,
                                  target=labels_swaped[prediction[0]['label']],
                                  return_convergence_delta=True)

        self.visualize_word_importance(inputs, attributes, prediction)


    def join_tokens_into_words(self, token_tuples):
        self.tokens_to_exclude = ['[CLS]', '[SEP]']
        tokens_list = []
        scores_list = []
        
        current_tokens_list = []
        current_scores_list = []
        
        for i, (token, score) in enumerate(token_tuples):
          if token in self.tokens_to_exclude:
              continue
              
          if i < len(token_tuples)-1:
            next_token = token_tuples[i+1][0]
        
            if '##' not in next_token and len(current_tokens_list) > 0:
              current_tokens_list.append(token)
              current_scores_list.append(score)
        
              tokens_list.append(current_tokens_list)
              scores_list.append(current_scores_list)
        
              current_tokens_list = []
              current_scores_list = []
        
            elif '##' not in next_token and len(current_tokens_list) == 0:
              tokens_list.append([token])
              scores_list.append([score])
        
            elif '##' in next_token:
              current_tokens_list.append(token)
              current_scores_list.append(score)
        
        last_token = token_tuples[-1][0]
        last_score = token_tuples[-1][1]
        
        if '##' in last_token and last_token not in self.tokens_to_exclude:
          tokens_list.append(current_tokens_list+[last_token])
          scores_list.append(current_scores_list+[last_score])
        
        elif '##' not in last_token and last_token not in self.tokens_to_exclude:
          tokens_list.append([last_token])
          scores_list.append([last_score])

        return tokens_list, scores_list
        
    
    def __get_most_impactful_words_integrated_gradients(self, text_to_evaluate, threshold, keyword, results):

        word_attributions = self.__cls_explainer(text=text_to_evaluate)
        # print(self.__cls_explainer.predicted_class_name)
        tokens_list, scores_list = self.join_tokens_into_words(word_attributions)

        new_word_attributions = []
        for i, tokens in enumerate(tokens_list):
            new_word_attributions.append((self.__pipeline.tokenizer.convert_tokens_to_string(tokens), np.mean(scores_list[i])))
            
        if self.__cls_explainer.predicted_class_name == keyword:

            for word in new_word_attributions:
                if word[1] > threshold:
                    if word[0] in results:
                        results[word[0]] += 1
                    else:
                        results[word[0]] = 1

        return results

    def plot_vertical_bar(self, text, word_scores):
        # Split the text into words
        words = text.split()
    
        # Create a vertical bar plot
        fig, ax = plt.subplots(figsize=(8, 4))
        bars = ax.bar(words, word_scores, color='skyblue')

        margin = 0.02
    
        for word, score, bar in zip(words, word_scores, bars):
            if score >= 0:
                ax.text(word, score + margin, f'{score:.2f}', ha='center', va='bottom', fontsize=10)
            else:
                ax.text(word, score - margin, f'{score:.2f}', ha='center', va='top', fontsize=10)
    
        # Rotate word labels by 45 degrees for readability
        ax.set_xticklabels(words, rotation=45, ha='right')
    
        # Set labels and title
        ax.set_xlabel('Words')
        ax.set_ylabel('Word Impact Scores')
        # ax.set_title('Word Scores Vertical Bar Plot')
    
        # Adjust the position of the y-axis labels
        ax.yaxis.set_label_coords(-0.1, 0.5)

        ax.set_ylim([np.min(word_scores)-0.1, np.max(word_scores)+0.1])
    
        # Display the plot
        plt.tight_layout()
        plt.show()
    
    def plot_colored_text(self, text, word_scores):

        # Create a colormap based on the 'viridis' colormap
        cmap = plt.get_cmap('winter')
        # cmap = plt.get_cmap('viridis')
    
        # Normalize word scores to the range [0, 1]
        norm = plt.Normalize(min(word_scores), max(word_scores))
    
        # Create a color map using the normalized scores and the colormap
        mappable = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
        mappable.set_array([])
    
        # Split the text into words
        words = text.split()
    
        # Calculate the horizontal spacing between words
        total_word_count = len(words)
        spacing = 1.0 / total_word_count
    
        # Create a figure and axis for the text
        fig, ax = plt.subplots(figsize=(10, 2))
    
        for i, (word, score) in enumerate(zip(words, word_scores)):
            color = cmap(norm(score))
            x_position = i * spacing
            ax.text(x_position, 0, word, color=color, fontsize=12, ha='center', rotation=45)
    
        # Add a color scale just above the text
        colorbar = ColorbarBase(ax=fig.add_axes([0.2, 0.8, 0.6, 0.02]),
                                cmap=cmap,
                                norm=norm,
                                orientation='horizontal')
        colorbar.set_label('Word Impact Scores')
    
        # Remove the axis and display the plot
        ax.axis('off')
        plt.show()
    

    def plot_word_importance(self, sentence, bar=True):

        sentence = self.__clean_text_for_explanation(sentence)
        
        word_attributions = self.__cls_explainer(text=sentence)
        tokens_list, scores_list = self.join_tokens_into_words(word_attributions)

        scores_list = [np.mean(scores) for scores in scores_list]
        if bar:
            self.plot_vertical_bar(sentence, scores_list)
        else:
            self.plot_colored_text(sentence, scores_list)
            # self.plot_sentence(sentence)
        
        
    def get_most_impactful_words_for_dataset(self, dataset, column_text, 
                                             threshold, keyword, method, n=20):
        results = {}
        
        i = 0
        for index, row in dataset.iterrows():
            
            if i % 100 == 0:
                print('Processing:', i)
            
            i += 1

            text = self.__clean_text_for_explanation(row[column_text])
            
            if method == 'integrated_gradients':
                results = self.__get_most_impactful_words_integrated_gradients(text_to_evaluate=text, 
                                                            threshold=threshold,
                                                            results=results,
                                                            keyword=keyword)
            elif method == 'lime':
                results, graphic_explanation = self.get_most_impactful_words_lime(
                                                                    text=text, 
                                                                    keyword=keyword,
                                                                    word_importance_results=results)

            # if i > 5:
            #     break
        
        return pd.DataFrame([(key, value) for key, value in dict(sorted(results.items(), key=lambda item: item[1], reverse=True)).items()], columns=['word','frequency']).head(n)


# --------------------------------------------------------------------------------------------------------------------------------
exp_model = ExplainableTransformerPipeline(model=language_model_manager.trainer.model,
                                           tokenizer=language_model_manager.tokenizer,
                                           device=language_model_manager.device,
                                           pipeline_name='text-classification')


####### LIME
# results_most_important_words = exp_model.get_most_impactful_words_for_dataset(dataset=test_data, 
#                                                                               column_text=data_handler.get_text_column_name(), 
#                                                                               threshold=0, keyword='racism', 
#                                                                               method='lime', n=100)


## Using lime to plot the word importance for few samples
# samples = test_data[test_data[label_column]==1].sample(n=3, random_state=42)

# for sample in samples[data_handler.get_text_column_name()]:
#     print('*** Sample:',sample)
#     word_importance_results, graphic_explanation = exp_model.get_most_impactful_words_lime(sample, 'racism', {})
#     print(word_importance_results)
#     graphic_explanation


####### INTEGRATED GRADIENTS

# Using integrated gradients to plot the word importance for few samples
samples = test_data[test_data[label_column]==1].sample(n=3,random_state=42)

for sample in samples[data_handler.get_text_column_name()]:
# for sample in samples[data_handler.text_column]:
    print(sample)
    # exp_model.explain(sample)
    # exp_model.visualize_word_importance_in_sentence(sample)
    exp_model.plot_word_importance(sample, bar=True)
    

# results = exp_model.get_most_impactful_words_for_dataset(dataset=test_data, 
#                                                column_text=data_handler.get_text_column_name(), 
#                                                threshold=0.1, 
#                                                keyword=dataset_type,
#                                                method='integrated_gradients',
#                                                n=50)
# results

In [None]:

# List of tuples
token_tuples = [
    ('I', 0.444),
    ('am', 0.6533),
    ('not', 0.2342),
    ('anti', 0.333),
    ('##va', 0.3674232),
    ('##x', 0.145151)
]

token_tuples = [('euro', -0.04504043073408189), 
                ('##20', -0.0012444165141832979), 
                ('##20', 0.009583244537505661), 
                ('##fin', 0.049853830068387804), 
                ('##al', -0.019816686608616105),
                ('I', 0.444),
               ]



word_list = []
score_list = []

current_word_list = None
current_score_list = None

for token, score in token_tuples:

    if current_word_list == None: # first token
        current_word_list = [token]
        current_score_list = [score]
    
    elif '##' in token:
    elif '##' not in token: 
        
        
        

'''
results_tokens = []  # To store the combined tokens
results_scores = []  # To store the sum of scores for combined tokens

# Initialize variables to track combined token and its score
tokens_to_be_combined = []
acculumated_score = 0
previous_token = None
previous_score = 0

for token, score in token_tuples:

    if previous_token is None: # first token
        previous_score = score
        previous_token = token
        continue
        
    elif '##' in token:
        tokens_to_be_combined.append(previous_token)
        acculumated_score += previous_score
        
    elif '##' not in token and previous_token is not None:
        if len(tokens_to_be_combined) == 0:
            results_scores.append(previous_score)
            
        else:
            results_scores.append(acculumated_score)
            acculumated_score = 0
            tokens_to_be_combined = []
            
    previous_score = score
    previous_token = token
    print('******', token, previous_score, acculumated_score)
            
            
if '##' not in token:
    results_scores.append(score)
else:
    results_scores.append(acculumated_score+score)
    

token_tuples = [token for token, score in token_tuples]
combined_tokens = language_model_manager.tokenizer.convert_tokens_to_string(token_tuples)'''



# Experiment Manager

In [None]:
class ExperimentManager():
    def __init__(self, data_handler, dataset_type):
        self.data_handler = data_handler
        self.dataset_type = dataset_type
        self.metrics = ('eval_accuracy','eval_precision','eval_recall', 'eval_f1')

    def start_experiment(self, experiment_design, preprocessing_setup):
        data_handler.preprocess(setup=preprocessing_setup)
        
        train_data, test_data = self.data_handler.split_train_test_dataset()

        if experiment_design['unsample']:
            data_handler.unsample()
        
        experiment_results = {}
        
        for model_name in experiment_design['model_list']:
            
            print('----------------------------------------')
            print('Training:', model_name)            

            language_model_manager = LanguageModelHandler(model_name=model_name,
                                              dataset_type=self.dataset_type,
                                              text_column=self.data_handler.get_text_column_name(),
                                              label_column=self.data_handler.label_column)

            language_model_manager.prepare_training_testing_datasets(train_data, test_data)

            language_model_manager.create_model()

            results, trainer = language_model_manager.train_evaluate_model(training_args=experiment_design['training_args'],
                                                               early_stopping_patience=experiment_design['early_stopping_patience'],
                                                               iterations=experiment_design['iterations'])
            
            df_results = pd.DataFrame()
            df_results['Dataset'] = [self.dataset_type] * len(self.metrics)
            df_results['Model'] = [model_name] * len(self.metrics)
            df_results['Metric'] = [metric.replace('eval_', '').capitalize() for metric in self.metrics]
            df_results['Value'] = [np.mean(results[k]) for k in self.metrics if k in results]
            
            experiment_results[model_name] = {'results':df_results, 'model':language_model_manager}
            
        return experiment_results

preprocessing_setup = {
    'lower_case': False,
    'remove_emojis': False,
    'remove_stop_words': True,
    'remove_numbers': False,
    'remove_users': True,
    'remove_urls': True,
    'remove_non_text_characters': False
}

# No preprocessing
# preprocessing_setup = {key: False for key in preprocessing_setup}

# Set up training arguments
training_args = TrainingArguments(
    output_dir="./sentiment_transfer_learning_transformer/",
    logging_dir='./sentiment_transfer_learning_transformer/logs',
    logging_strategy='epoch',
    logging_steps=100,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    learning_rate=5e-6,
    save_strategy='epoch',
    save_steps=100,
    evaluation_strategy='epoch',
    eval_steps=100,
    load_best_model_at_end=True,
    num_train_epochs=10,
    # seed=42
)

experiment_design = {
    'model_list': [
#         'bert-base-uncased',
#         'vinai/bertweet-base',
        'cardiffnlp/twitter-roberta-base-offensive', # Offensive speech Roberta
#         'Hate-speech-CNERG/dehatebert-mono-english' # Hate speech Roberta
    ],
    'unsample': True,
    'early_stopping_patience': 2,
    'training_args': training_args,
    'iterations': 1

}

data_handler = DataHandler(df=hate_speech_df, text_column=original_text_column, label_column=label_column)

experiment_manager = ExperimentManager(data_handler, dataset_type=dataset_type)
results = experiment_manager.start_experiment(experiment_design, preprocessing_setup)

results[experiment_design['model_list'][0]]['results']

# Starting the Pipeline

In [None]:
preprocessing_setup = {
    'lower_case': True,
    'remove_emojis': True,
    'remove_stop_words': True,
    'remove_numbers': False,
    'remove_users': True,
    'remove_urls': True,
    'remove_non_text_characters': True
}

'''
data_handler = DataHandler(df=hate_speech_df, text_column=original_text_column, label_column=label_column)

data_handler.preprocess(setup=preprocessing_setup)

data_handler.unsample()

# print(data_handler.get_top_words(100))
# print(data_handler.get_top_words_tfidf(100))

train_data, test_data = data_handler.split_train_test_dataset()

language_model_manager = LanguageModelHandler(model_name='cardiffnlp/twitter-roberta-base-offensive',
                                              dataset_type=dataset_type,
                                              text_column=data_handler.get_text_column_name(),
                                              label_column=data_handler.label_column)

language_model_manager.prepare_training_testing_datasets(train_data, test_data)

language_model_manager.create_model()

# Set up training arguments
training_args = TrainingArguments(
    output_dir="./sentiment_transfer_learning_transformer/",
    logging_dir='./sentiment_transfer_learning_transformer/logs',
    logging_strategy='epoch',
    logging_steps=100,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    learning_rate=5e-6,
    save_strategy='epoch',
    save_steps=100,
    evaluation_strategy='epoch',
    eval_steps=100,
    load_best_model_at_end=True,
    num_train_epochs=10,
    # seed=42
)

results, trainer = language_model_manager.train_evaluate_model(training_args=training_args,
                                                               early_stopping_patience=2,
                                                               iterations=1) # '''

# year = '2008'
# dataset_path = 'dataset/euros_second/'
# dataset_name_file = dataset_path+year+'.csv'
# result_file_name = dataset_path+language_model_manager.dataset_type
# result_file_name += '/'+year+'_'+language_model_manager.model_name.split('/')[-1]+'.csv'

# language_model_manager.classify_unlabaled_datasets(dataset_name_file=dataset_name_file,
#                                                    result_file_name=result_file_name,
#                                                    batch_size_to_save=100)

exp_model = ExplainableTransformerPipeline(model=language_model_manager.trainer.model,
                                           tokenizer=language_model_manager.tokenizer,
                                           device=language_model_manager.device,
                                           pipeline_name='text-classification')

samples = test_data[test_data[label_column]==1].sample(n=3,random_state=42)

for sample in samples[data_handler.get_text_column_name()]:
    print(sample)
    exp_model.explain(sample)
    exp_model.visualize_word_importance_in_sentence(sample)

# results = exp_model.get_most_impactful_words_for_dataset(dataset=test_data, 
#                                                column_text=data_handler.get_text_column_name(), 
#                                                threshold=0.1, 
#                                                keyword=dataset_type,
#                                                n=50)
# results

In [None]:
results