# Enviroment

In [None]:
!pip install transformers



In [None]:
!pip install contractions

# Librerias

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# Clean Text
import re, string, unicodedata
import nltk
import contractions
import inflect
from nltk import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
nltk.download('stopwords')

from transformers import BertTokenizer

In [None]:
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tokenizers import BertWordPieceTokenizer
from transformers import TFBertModel, BertConfig
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import OneHotEncoder
configuration = BertConfig() 

In [None]:
# Metrics
from sklearn.metrics import confusion_matrix,ConfusionMatrixDisplay, precision_recall_fscore_support, accuracy_score

# Bajar Datos

In [None]:
#df = pd.read_csv("tweets_combined.csv")
df = pd.read_csv("text_audio.csv")
df.head(10)

In [None]:
df = df.drop('Unnamed: 0', axis=1)

# Preprocesamiento de datos 

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
def replace_contractions(text):
    """Replace contractions in string of text"""
    return contractions.fix(text)

def remove_stuff(text):
    """Remove URLs from a sample string"""
    text = re.sub(r'https?:/\/\S+', ' ', text) # remove urls
    text = re.sub(r'@\w+', ' ', text) # remove at mentions
    text = re.sub(r'#', '', text) # remove hashtag symbol
    text = re.sub(r'[0-9]+', ' ', text) # remove numbers
    text = text.lower()
    return text

def remove_non_ascii(words):
    """Remove non-ASCII characters from list of tokenized words"""
    new_words = []
    for word in words:
        new_word = unicodedata.normalize('NFKD', word).encode('ascii', 'ignore').decode('utf-8', 'ignore')
        new_words.append(new_word)
    return new_words

def remove_stopwords(words):
    """Remove stop words from list of tokenized words"""
    new_words = []
    for word in words:
        if word not in stopwords.words('english'):
            new_words.append(word)
    return new_words

def normalize(words):
    words = remove_non_ascii(words)
    words = remove_stopwords(words)
    return words

def preprocess(sample):
    sample = remove_stuff(sample)
    sample = replace_contractions(sample)
    # Tokenize
    words = tokenizer.tokenize(sample)
    words = normalize(words)
    sentence = ' '.join(words)
    return sample

In [None]:
df['tweet'] = df.text.apply(lambda x: preprocess(x))

In [None]:
df = df.sample(frac=1).reset_index(drop=True)
df.head(10)

In [None]:
Max_lenght = df['tweet'].apply(lambda x: len(x.split(' '))).max()
print(Max_lenght)

In [None]:
Max_lenght = 80

# BERT

## Tokenize los datos

### **Funcion para tokenizar:**
Tokenizamos el texto y extraemos los ids y las mascaras de attention

In [None]:
def bert_encode(data, maximum_length) :
    """Tokenize text using BERT base and return the ids and attention masks"""
    input_ids = []
    attention_masks = []

    for text in data:
        encoded = tokenizer.encode_plus(
            text, 
            add_special_tokens=True,
            max_length=maximum_length,
            pad_to_max_length=True,
            truncation = True,
            return_attention_mask=True,
        )
        input_ids.append(encoded['input_ids'])
        attention_masks.append(encoded['attention_mask'])  
        
    # Regresamos dos arrays, uno con los ids y el otro con las mascaras de attention
    return np.array(input_ids),np.array(attention_masks)

### **Dividimos en train y test:**
Dividimos el dataset entre los que vamos a utilizar para entrenar el modelo y los que utilizaremos para hacer el test

In [None]:
df = df[df['label'] != 'xxx']
df = df[df['label'] != 'exc']
#df = df[df['label'] != 'fru']
df = df[df['label'] != 'oth']
df = df[df['label'] != 'dis']
df = df[df['label'] != 'fea']
df = df[df['label'] != 'sur']
df = df[df['label'] != 'hap']
#dfb =  df[df['label'] == 'fru'].copy()
#dfb.append(df[df['label'] == 'neu'])
#dfb.append(df[df['label'] == 'ang'])
#dfb.append(df[df['label'] == 'sad'])
target = df['label'] 
target.head(10)
#dfb[dfb['label'] == 'ang']
#np.unique(x, return_counts=True)
print(target.value_counts())

In [None]:
Y = [[y] for y in target]
enc = OneHotEncoder(handle_unknown='ignore')
enc.fit(Y)
Y = enc.transform(Y).toarray()
target = Y

In [None]:
enc.categories_
#np.unique(Y[0], return_counts=True)

In [None]:
# Definimos la proporción para la división
split = 0.80

# Calculamos la cantidad que será de datos para el train
S = int(len(df['tweet'])*split)

#Extraemos los datos para el train
texts_train = df['tweet'][0:S]
#target_train = df['target'][0:S]
target_train = target[0:S]

#Extraemos los datos para el test
texts_test = df['tweet'][S:]
#target_test = df['target'][S:]
target_test = target[S:]

### **Tokenizamos los datos:**
Pasarmos los datos por la función que usa BERT para tokenizar el train y el test

In [None]:
# Obtenemos los arreglos de los ids y de las mascaras para el train
train_input_ids, train_attention_masks = bert_encode(list(texts_train),Max_lenght)
# Obtenemos los arreglos de los ids y de las mascaras para el test
test_input_ids, test_attention_masks = bert_encode(list(texts_test),Max_lenght)

## Creamos y entrenamos el modelo

#### **Definimos el modelo:**
Definimos la función para crear el modelo en donde se especifican todas las capas que va a llevar

In [None]:
def create_model():
    # Usamos un modelo de BERT preentrenado, en este caso bert basr
    bert_model = TFBertModel.from_pretrained('bert-base-uncased')
    #bert_model.trainable = False

    input_ids = tf.keras.Input(shape=(Max_lenght,),dtype='int32')
    attention_masks = tf.keras.Input(shape=(Max_lenght,),dtype='int32')

    # Primero pasamos por un BERT
    output = bert_model([input_ids,attention_masks]).last_hidden_state
    # El resultado de BERT lo pasamos por un LSTM
    output = tf.keras.layers.LSTM(64, dropout=0.1, return_sequences=True)(output)
    # Después volvermos a pasarlo por un LSTM
    output = tf.keras.layers.LSTM(32, dropout=0.1, return_sequences=False)(output)
    # Al final utilizamos un Dense para concentrar los resultados en un solo output
    output = tf.keras.layers.Dense(len(enc.categories_[0]),activation='sigmoid')(output)
    
    # Armamos el modelo
    model = tf.keras.models.Model(inputs = [input_ids,attention_masks],outputs = output)
    # Compilamos el modelo usando Accuracy como metrica
    model.compile(Adam(learning_rate=1e-5), loss='binary_crossentropy', metrics=['accuracy'])
    return model

### **Nos conectamos a un TPU:**
Aquí generamos la conexión con el TPU de colab

In [None]:
try:
 device_name = os.environ['COLAB_TPU_ADDR']
 TPU_ADDRESS = 'grpc://' + device_name
 print('Found TPU at: {}'.format(TPU_ADDRESS))
except KeyError:
 print('TPU not found')

### **Creamos el modelo en el TPU:**
Esta parte solo funciona en colab y hace que se utilice una tpu en lugar de una cpu para que sea más rápido

In [None]:
use_tpu = True
if use_tpu:
    # Create distribution strategy
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=TPU_ADDRESS)
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)

    # Create model
    with strategy.scope():
        model = create_model()
else:
    model = create_model()

model.summary()

### **Entrenar modelo**

In [None]:
# Definimos los pesos de los elementos
class_weight = {0: 1.0, 1: 6.0}

In [None]:
# Entrenamos el modelo y guardamos el mejor
history = model.fit(
    [train_input_ids, train_attention_masks],
    target_train,
    validation_data=([test_input_ids, test_attention_masks],target_test), 
    epochs=40,
    batch_size=20,
    callbacks=[ModelCheckpoint(filepath='best_model.h5', monitor='val_accuracy', save_best_only=True,save_weights_only=True)]
)

Epoch 1/40


INFO:absl:TPU has inputs with dynamic shapes: [<tf.Tensor 'Const:0' shape=() dtype=int32>, <tf.Tensor 'cond_8/Identity:0' shape=(None, 80) dtype=int64>, <tf.Tensor 'cond_8/Identity_1:0' shape=(None, 80) dtype=int64>, <tf.Tensor 'cond_8/Identity_2:0' shape=(None, 4) dtype=float32>, <tf.Tensor 'cond_8/Identity_3:0' shape=(None,) dtype=float64>]














Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40

KeyboardInterrupt: ignored

# Evaluamos el desempeño del modelo

In [None]:
def plot_learning_curves(history, arr):
    fig, ax = plt.subplots(1, 2, figsize=(20, 5))
    for idx in range(2):
        ax[idx].plot(history.history[arr[idx][0]])
        ax[idx].plot(history.history[arr[idx][1]])
        ax[idx].legend([arr[idx][0], arr[idx][1]],fontsize=18)
        ax[idx].set_xlabel('A ',fontsize=16)
        ax[idx].set_ylabel('B',fontsize=16)
        ax[idx].set_title(arr[idx][0] + ' X ' + arr[idx][1],fontsize=16)

In [None]:
plot_learning_curves(history, [['loss', 'val_loss'],['accuracy', 'val_accuracy']])

Cargamos el mejor modelo

In [None]:
best_model = create_model()
best_model.load_weights('best_model.h5')

Predicciones modelo

In [None]:
y_pred = best_model.predict([test_input_ids, test_attention_masks])

Metricas

In [None]:
y_pred = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_true, axis=1)

In [None]:
plt.hist(y_true)

In [None]:
labels = enc.categories_[0]

cm = confusion_matrix(y_true, y_pred)
print(cm)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels= labels)
fig, ax = plt.subplots(figsize=(10,10))
disp.plot(ax=ax)

In [None]:
Metrics = precision_recall_fscore_support(y_true, y_pred, average='macro')
acc = accuracy_score(y_true, y_pred)
print(f'El accuracy en los datos de prueba es: {acc:0.4f}')
print(f'La precision en los datos de prueba es: {Metrics[0]:0.4f}')
print(f'El recall en los datos de prueba es: {Metrics[1]:0.4f}')
print(f'El F1 en los datos de prueba es: {Metrics[2]:0.4f}')