In [None]:
!pip install -q transformers

In [None]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import tensorflow as tf
from transformers import BertTokenizer
from nltk.corpus import stopwords
from contextlib import redirect_stdout
import keras
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from transformers import TFBertModel

In [None]:
dataset = pd.read_csv("/kaggle/input/tweet-analise-sentimento/analise_sentimento_dataset.csv")
dataset["id"] = dataset.index + 1

In [None]:
dataset = dataset.sample(frac = 1)

In [None]:
df = dataset[0:49893]
test = dataset[49893:]

In [None]:
df['tweet_text'] = df['tweet_text'].str.replace(':', '')
df['tweet_text'] = df['tweet_text'].str.replace(')', '')
df['tweet_text'] = df['tweet_text'].str.replace('(', '')

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

In [None]:
model = TFBertModel.from_pretrained('bert-base-cased') # bert base model with predfed weights

In [None]:
X_input_ids = np.zeros((len(df), 256))
X_attn_masks = np.zeros((len(df), 256))

In [None]:
def generate_training_data(df, ids, masks, tokenizer):
    for i, text in tqdm(enumerate(df['tweet_text'])):
        tokenized_text = tokenizer.encode_plus(
            text,
            max_length=256, 
            truncation=True, 
            padding='max_length', 
            add_special_tokens=True,
            return_tensors='tf'
        )
        ids[i, :] = tokenized_text.input_ids
        masks[i, :] = tokenized_text.attention_mask
    return ids, masks

def prepare_data(input_text, tokenizer):
    token = tokenizer.encode_plus(
        input_text,
        max_length=256, 
        truncation=True, 
        padding='max_length', 
        add_special_tokens=True,
        return_tensors='tf'
    )
    return {
        'input_ids': tf.cast(token.input_ids, tf.float64),
        'attention_mask': tf.cast(token.attention_mask, tf.float64)
    }

In [None]:
X_input_ids, X_attn_masks = generate_training_data(df, X_input_ids, X_attn_masks, tokenizer)

In [None]:
labels = np.zeros((len(df), 3))
labels.shape

In [None]:
labels[np.arange(len(df)), df['labels'].values.tolist()] = 1 # one-hot encoded target tensor

In [None]:
np.unique(labels, axis=0)

In [None]:
test.head()

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((X_input_ids, X_attn_masks, labels))
dataset.take(0) # one sample data

In [None]:
def SentimentDatasetMapFunction(input_ids, attn_masks, labels):
    return {
        'input_ids': input_ids,
        'attention_mask': attn_masks
    }, labels

In [None]:
dataset = dataset.map(SentimentDatasetMapFunction) # converting to required format for tensorflow dataset 

In [None]:
dataset = dataset.shuffle(10000).batch(16, drop_remainder=True) # batch size, drop any left out tensor

In [None]:
p = 0.8
df_size = int((len(df)//16)*p) # for each 16 batch of data we will have len(df)//16 samples, take 80% of that for df.

In [None]:
df_dataset = dataset.take(df_size)
val_dataset = dataset.skip(df_size)

In [None]:
# params = [{'dense': 32, 'learning_rate': 1e-5*16}, {'dense': 64, 'learning_rate': 1e-5*8}]
# params = [{'dense': 128, 'learning_rate': 1e-5*4},{'dense': 256, 'learning_rate': 1e-5*2}]
# params = [{'dense': 512, 'learning_rate': 1e-5}, {'dense': 1024, 'learning_rate': 1e-5/2}]
params = [{'dense': 256, 'learning_rate': 1e-5*2}]

In [None]:
for param in params:
    print('Inicio do processo com o modelo de ' + str(param['dense']) + ' camadas')
    
    # defining 2 input dense for input_ids and attn_masks
    input_ids = tf.keras.Input(shape=(256,), name='input_ids', dtype='int32')
    attn_masks = tf.keras.Input(shape=(256,), name='attention_mask', dtype='int32')

    bert_embds = model.bert(input_ids, attention_mask=attn_masks)[1]
    intermediate_layer = tf.keras.layers.Dense(param['dense'], activation='relu', name='intermediate_layer')(bert_embds)
    output_layer = tf.keras.layers.Dense(3, activation='softmax', name='output_layer')(intermediate_layer)

    rr_model = tf.keras.Model(inputs=[input_ids, attn_masks], outputs=output_layer)
    rr_model.summary()
    
    with open('/kaggle/working/' + str(param['dense']) + '_modelsummary.csv', 'w') as f:
        with redirect_stdout(f):
            rr_model.summary()
            
    print('Summary exportado')
    
    optim = tf.keras.optimizers.legacy.Adam(learning_rate=param['learning_rate'], decay=1e-6)
    loss_func = tf.keras.losses.CategoricalCrossentropy()
    acc = tf.keras.metrics.CategoricalAccuracy('accuracy')
    
    rr_model.compile(optimizer=optim, loss=loss_func, metrics=[acc])
    
    print('Treinamento iniciado')
        hist = rr_model.fit(
        df_dataset,
        validation_data=val_dataset,
        epochs=2
    )
    
    rr_model.save('/kaggle/working/' + str(param['dense']) + '_lia_model.h5')    

In [None]:
out =[]
for i in test['tweet_text']:
    pred = model.predict(prepare_data(i, tokenizer))
    out.append(np.argmax(pred).tolist())

In [None]:
y_true = test["labels"].tolist()
y_pred = out
metrics = classification_report(y_true, y_pred)

In [None]:
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Negativo', 'Positivo','Neutro'])
disp.plot(cmap=plt.cm.Blues)