# classification of emotions

This is a multi-label classification problem in which you have to label a set 
of tweets according to 11 feelings ('anger', 'anticipation', 'disgust', 'fear', 'joy', 'love', 
'optimism', 'pessimism', 'sadness', 'surprise', 'trust').

## Import of libraries

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from transformers import AutoTokenizer, AutoConfig, TFAutoModelForSequenceClassification, BertTokenizer, BertForSequenceClassification
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from transformers import TFAutoModel
import transformers
import keras
import re, string, spacy
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from configLogger import *


from sklearn.model_selection import train_test_split
pd.set_option('display.max_colwidth', None)


## Read data

In [None]:
df = pd.read_csv('sem_eval_train_es.csv')

In [None]:
df.shape, df.columns

In [None]:
df.head()

## Data Processing and cleaning

In [None]:
class ProcessingCleanData():
    def __init__(self, df:pd.DataFrame, clean_emoji:bool, typ_data:str='train'):
        self.df = df
        self.clean_emoji = clean_emoji
        self.typ_data = typ_data
        
        if self.typ_data == 'test':
            pass
        else:
            self.sentiment_columns = self.df.columns[2:]
            
        self.id11label = {}
        self.label11id = {}
        
    def clean_text(self, text:str) -> str:
        pattern1 = re.compile(r'@[\w_]+') #elimina menciones
        pattern2 = re.compile(r'https?://[\w_./]+') #elimina URL
        pattern4 = re.compile('[{}]+'.format(re.escape(string.punctuation))) #elimina símbolos de puntuación
        emoji_pattern = re.compile("["
                u"\U0001F600-\U0001F64F"  # emoticons
                u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                u"\U0001F680-\U0001F6FF"  # transport & map symbols
                u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                                   "]+", flags=re.UNICODE)


        """Limpiamos las menciones, URL y hashtags del texto. Luego 
        quitamos signos de puntuación"""
        text = pattern1.sub('', text)
        text = pattern2.sub('', text)
        text = pattern4.sub('', text)
        if self.clean_emoji:        
            text = emoji_pattern.sub('', text)

        return text
    def verify_row_without_clasification(self):
        logger.warning(f""" En el conjunto de datos hay un total de: {self.df[
            (self.df.anger == False) 
           & (self.df.anticipation == False)
           & (self.df.disgust == False) 
           & (self.df.fear == False) 
           & (self.df.joy == False) 
           & (self.df.joy == False) 
           & (self.df.love == False)
          & (self.df.optimism == False)
          & (self.df.pessimism == False)
          & (self.df.sadness == False)
          & (self.df.surprise == False)
          & (self.df.trust == False)
          ].shape[0]} mensajes que no tienen ninguna clasificación""")
        
        self.df = self.df[~(
                (self.df.anger == False) 
               & (self.df.anticipation == False)
               & (self.df.disgust == False) 
               & (self.df.fear == False) 
               & (self.df.joy == False) 
               & (self.df.joy == False) 
               & (self.df.love == False)
              & (self.df.optimism == False)
              & (self.df.pessimism == False)
              & (self.df.sadness == False)
              & (self.df.surprise == False)
              & (self.df.trust == False)
            )].reset_index(drop=True)
        
        logger.info(f'La longitud del dataframe con mensajes que contienen al menos una clasificación es: {len(self.df)}')
        
    def replace_true_false(self):
        self.df = self.df.replace({True:1, False:0})
        
    def create_dict_class(self):
        for i, col in enumerate(self.sentiment_columns):
            self.id11label[i] = col

        self.label11id = {val: key for key, val in self.id11label.items()}
        
    def run_all(self):
        logger.info(f'Limpieza de mensajes')
        self.df['text_clean'] = self.df.Tweet.map(lambda x: self.clean_text(x))
        
        logger.info(f'Eliminar mensajes que no tienen ninguna clasificación.')
        self.verify_row_without_clasification()
        
        
        logger.info(f'Creando id11label y label11id')
        self.create_dict_class()

        logger.info('Reemplazando True y False por ceros y unos')
        self.replace_true_false()
        

### Text cleaning

In [None]:
obj_clean = ProcessingCleanData(df=df, clean_emoji=True)

In [None]:
obj_clean.run_all()

In [None]:
obj_clean.df

In [None]:
obj_clean.id11label

### Preparing from the data set

In [None]:
class ProcessingDataForModel():
    
    def __init__(self, df:pd.DataFrame, text_column:str, sentiment_columns:list):
        self.df = df
        self.text_column = text_column
        self.sentiment_columns = sentiment_columns
        self.labels = []
        self.X_train_tweets = np.array([])
        self.X_test_tweets = np.array([])
        self.Y_train = np.array([])
        self.Y_test = np.array([])
        self.MAX_SEQUENCE_LENGTH = np.nan
        
    def create_labels(self):
        
        for _, row in self.df.iterrows():
            label = [int(row[column]) for column in self.sentiment_columns]
            self.labels.append(label)

        self.labels = [[label for label in label_list] for label_list in self.labels]
        self.labels = np.array(self.labels)
        
        
    def split_data_train_test(self):
        self.X_train_tweets, self.X_test_tweets, self.Y_train, self.Y_test = train_test_split(
                                                                                        self.df[self.text_column],
                                                                                        self.labels,
                                                                                        test_size = 0.3,
                                                                                        random_state = 0
                                                                                        )

        logger.info(f'Shape X_train: {self.X_train_tweets.shape}, Shape Y_train: {self.Y_train.shape}')
        logger.info(f'Shape X_test: {self.X_test_tweets.shape}, Shape Y_test: {self.Y_test.shape}')
    

        
    def max_lenght_document(self):

        self.MAX_SEQUENCE_LENGTH = np.max([len(l.split()) for l in self.X_train_tweets])
        logger.info(f'longitud máxima: {self.MAX_SEQUENCE_LENGTH}')
        
    def run_all(self):
        logger.info(f'Creando variable labels')
        self.create_labels()
        
        logger.info('Dividiendo el conjunto de datos en entrenamiento y prueba')
        self.split_data_train_test()
        
        logger.info('Creando la variable de longitud máxima del documento')
        self.max_lenght_document()

In [None]:
obj_processing_data = ProcessingDataForModel(
                            df                = obj_clean.df, 
                            text_column       = 'text_clean', 
                            sentiment_columns = obj_clean.sentiment_columns
                            )
obj_processing_data.run_all()

## Model create and training

In [None]:
class CreateModel():
    def __init__(self, X_train_tweets: pd.Series, X_test_tweets: pd.Series, Y_train: pd.Series, Y_test: pd.Series, model_name: str, id11label: dict, label11id: dict, function_activation: str, n_epochs: int, batch_size: int):
        self.X_train_tweets = X_train_tweets
        self.X_test_tweets = X_test_tweets
        self.Y_train = Y_train
        self.Y_test = Y_test
        self.model_name = model_name
        self.id11label = id11label
        self.label11id = label11id
        self.function_activation = function_activation
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.MAX_SEQUENCE_LENGTH = 0
        self.train_encodings = transformers.tokenization_utils_base.BatchEncoding()
        self.test_encodings = transformers.tokenization_utils_base.BatchEncoding()
        self.train_dataset = None
        self.test_dataset = None
        self.model = None
        self.history = None

    def tokenize_encode_data(self):
        # We tokenize and encode as a Dataset
        tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.train_encodings = tokenizer(self.X_train_tweets.to_list(), truncation=True, padding=True, return_tensors="tf")

        self.MAX_SEQUENCE_LENGTH = self.train_encodings['input_ids'].shape[1]
        self.test_encodings = tokenizer(self.X_test_tweets.to_list(), truncation=True, padding='max_length', max_length=self.MAX_SEQUENCE_LENGTH, return_tensors="tf")

    def create_train_test_dataset(self):
        self.train_dataset = tf.data.Dataset.from_tensor_slices((
            dict(self.train_encodings),
            self.Y_train
        ))
        
        self.test_dataset = tf.data.Dataset.from_tensor_slices((
            dict(self.test_encodings),
            self.Y_test
        ))

    def create_model(self):
        # Load transformers config and set output_hidden_states to False
        config = AutoConfig.from_pretrained(self.model_name, hidden_dropout_prob=0.1, num_labels=11, id2label=self.id11label, label2id=self.label11id)

        # Load the Transformers BERT model
        transformer_model = TFAutoModelForSequenceClassification.from_pretrained(self.model_name, config=config)
        transformer_model.config.num_labels = 11
        self.model = transformer_model

    def train_model(self):
        # Define the checkpoint to save the best model
        checkpoint = ModelCheckpoint('best_model_sentiment.tf', monitor='val_accuracy', mode='max', verbose=1, save_best_only=True)
        # Define early stopping to stop training after 5 epochs without improvement
        early_stopping = EarlyStopping(monitor='val_accuracy', patience=5, mode='max')

        optimizer = tf.keras.optimizers.Adam(learning_rate=2e-5, epsilon=1e-08)
        self.model.classifier = tf.keras.layers.Dense(11, activation=self.function_activation)
        loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)
        metric = tf.keras.metrics.BinaryAccuracy('accuracy')
        self.model.compile(loss=loss, optimizer=optimizer, metrics=[metric])
        self.history = self.model.fit(self.train_dataset.batch(self.batch_size), 
                                      epochs=self.n_epochs, 
                                      batch_size=self.batch_size, 
                                      validation_data=self.test_dataset.batch(self.batch_size),
                                      callbacks=[checkpoint, early_stopping])  # Agregar los callbacks aquí
        
    def predict(self, new_messages: list[str]):
        # We tokenize and encrypt new messages
        tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        new_encodings = tokenizer(new_messages, truncation=True, padding='max_length', max_length=self.MAX_SEQUENCE_LENGTH, return_tensors="tf")

        # We convert the new encoded messages into a Dataset
        new_dataset = tf.data.Dataset.from_tensor_slices((
            dict(new_encodings)
        ))

        # Make predictions
        predictions = self.model.predict(new_dataset.batch(self.batch_size))

        # To convert predictions into labels
        predicted_labels = [self.id11label[np.argmax(prediction)] for prediction in predictions]
        
        return predicted_labels

    def run_all(self, train_model=True):
        self.tokenize_encode_data()
        self.create_train_test_dataset()
        self.create_model()
        
        if train_model:
            self.train_model()


### Run model bert-base-multilingual-uncased

In [None]:
obj_model = CreateModel(
            X_train_tweets      = obj_processing_data.X_train_tweets,  
            X_test_tweets       = obj_processing_data.X_test_tweets,
            Y_train             = obj_processing_data.Y_train,
            Y_test              = obj_processing_data.Y_test, 
            model_name          = 'bert-base-multilingual-uncased',
            id11label           = obj_clean.id11label,
            label11id           = obj_clean.label11id,
            function_activation = 'softmax',
            n_epochs            = 30,
            batch_size          = 25
            )

In [None]:
obj_model.run_all(train_model=True)

### Predictions

In [None]:
model = load_model('best_model_sentiment.tf')

In [None]:
predict = model.predict(obj_model.test_dataset.batch(obj_model.batch_size))


In [None]:
max_indices = np.argmax(predict['logits'], axis=1)
true_result = np.argmax(obj_model.Y_test, axis=1)

# Get the name of the emotion corresponding to each index
emotions = [obj_clean.id11label[index] for index in max_indices]
emotions_true = [obj_clean.id11label[index] for index in true_result]

# Print the emotions corresponding to each prediction
count = 0
for i, emotion in enumerate(emotions):
    if emotion != emotions_true[i]:
        count+=1
        # logger.info(f'{emotion}, {emotions_true[i]}')
    logger.info(f"Predicción {i+1}: {emotion}. Valor real: {emotions_true[i]}")
logger.info(f"El modelo predice como principal {count} resultados diferentes de {len(predict['logits'])}. No obstante no es representativo ya que puede haber más de un sentimiento por Tweet")

## Model evaluation

In [None]:
# Read the test dataset
df_test = pd.read_csv('sem_eval_test_grupo_01.csv')
df_test.shape

In [None]:
# to process the test data set for model validation
obj_clean_test = ProcessingCleanData(df=df_test, clean_emoji=True, typ_data='test')

In [None]:
obj_clean_test.df['text_clean'] = obj_clean_test.df.Tweet.map(lambda x: obj_clean_test.clean_text(x))
obj_clean_test.df

In [None]:
def predict_(df, new_messages: list[str], model):
    # We tokenize and encrypt new messages
    tokenizer = AutoTokenizer.from_pretrained(obj_model.model_name)
    new_encodings = tokenizer(new_messages, truncation=True, padding='max_length', max_length=obj_model.MAX_SEQUENCE_LENGTH, return_tensors="tf")

    # We convert the new encoded messages into a Dataset
    new_dataset = tf.data.Dataset.from_tensor_slices((
        dict(new_encodings)
    ))

    # Make predictions
    predictions = model.predict(new_dataset.batch(obj_model.batch_size))

    # to convert the predictions in lable
    max_indices = np.argmax(predictions['logits'], axis=1)
    
    for i, idx in enumerate(max_indices):
        emotion = obj_clean.id11label[idx]

        df.loc[i, emotion] = True


    df = df.fillna(False)

    return df#[obj_clean.id11label[index] for index in max_indices]

In [None]:
# model = load_model('best_model_sentiment.tf')

In [None]:
# obj_clean_test.df['emotion'] = predict_(new_messages=obj_clean_test.df.text_clean.to_list(), model=model)
obj_clean_test.df = predict_(obj_clean_test.df, new_messages=obj_clean_test.df.text_clean.to_list(), model=model)

In [None]:
obj_clean_test.df.info()

In [None]:
obj_clean_test.df