# Projeto Final de NLP - Sentiment Classification de tweets

Neste trabalho final da disciplina de Processamento de Linguagem Natural, foi proposto aos alunos a elaboração de um projeto de tema livre com o objetivo aplicar os modelos e técnicas aprendidas em algum problema real. O tópico escolhido pelo autor foi uma aplicação de sentiment classification voltada para comentários em redes sociais, para isto foi escolhido o dataset dísponivel no Hugging Face chamado tweet_eval nele possuimos uma entrada de quase 60k de frases na qual são separadas em tweets positivos, negativos e neutros.

## Instalando Dependências

A biblioteca datasets é fornecida pelo Hugging Face como uma interface para facilitar o acesso dos úsuarios aos seus datasets

In [None]:
!pip install datasets

## Manipulação do Dataset

### Clase tweetDataset

Esta é a classe responsável pela manipulação dos dados. Ela realiza a tokenização dos dados e separa as entradas e labels em 3 grupos para serem usados na aplicação sendo eles: Train,Validation e Test

In [None]:
#Class Dataset

from datasets import load_dataset
import tensorflow as tf
import nltk
import random
import copy
from nltk.tokenize import TweetTokenizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Baixe os recursos necessários (você pode pular isso se já tiver feito o download)
nltk.download('punkt')

class tweetDataset:
  def __init__(self,datasetName):
    self.dataset = load_dataset('tweet_eval',datasetName)
    self.text_list = self.dataset['train']['text'] + self.dataset['validation']['text'] + self.dataset['test']['text']
    self.label_list = self.dataset['train']['label'] + self.dataset['validation']['label'] + self.dataset['test']['label']

  def tokenizer(self,maxlen):
      tweet_list = []
      label = []
      self.tokenizer = TweetTokenizer()

      label = self.label_list
      tokenized_tweets = [self.tokenizer.tokenize(tweet) for tweet in self.text_list]
      self.new_tokenizer = Tokenizer()
      self.new_tokenizer.fit_on_texts(tokenized_tweets)
      sequence = self.new_tokenizer.texts_to_sequences(tokenized_tweets)
      sequence = pad_sequences(sequence,maxlen=maxlen,padding='post')
      vocab_size = len(self.new_tokenizer.word_index)
      self.sequence = sequence
      self.label = label
      self.vocab_size = vocab_size + 1

  def create_mask(self,sequence):

      no_zero_nums = lambda array: len([x for x in array if x != 0])
      mask_seq = []
      new_sequence = copy.deepcopy(sequence)
      for seq in new_sequence:
        real_nums = no_zero_nums(seq)
        pos1 = random.randint(0,real_nums - 1)
        pos2 = random.randint(0,real_nums - 1)
        seq[pos1] = self.vocab_size
        seq[pos2] = self.vocab_size
        mask_seq.append(seq)

      return mask_seq


  def tokenize_phrase(self,text:str,maxlen):
      txt_list = [text]
      tweet = [self.tokenizer.tokenize(text) for text in txt_list]
      sequence = self.new_tokenizer.texts_to_sequences(tweet)
      return pad_sequences(sequence,maxlen=maxlen,padding='post')

  def split_dataset(self,train_perc=0.8,val_perc=0.1,test_perc=0.1):
    train_limit = int(self.vocab_size * train_perc)
    val_limit = int(self.vocab_size*val_perc  + train_limit)
    test_limit = int(self.vocab_size*test_perc + train_limit + val_limit)

    train = []
    validation = []
    test = []
    train_label = []
    val_label = []
    test_label = []

    size = len(self.sequence - 1)
    for i in range(size):
      if i < train_limit:
        train.append(self.sequence[i])
        train_label.append(self.label[i])
      elif i >= train_limit and i < val_limit:
        validation.append(self.sequence[i])
        val_label.append(self.label[i])
      else:
        test.append(self.sequence[i])
        test_label.append(self.label[i])

    return train,train_label,validation,val_label,test,test_label,self.vocab_size

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


### Captura e separação dos dados

In [None]:
tweet = tweetDataset('sentiment')
tweet.tokenizer(87)
train_tweet,train_label,val_tweet,val_label,test_tweet,test_label,vocab_size = tweet.split_dataset()


## Modelo para Classificação

O modelo implementado para este problema foi uma rede composta por um Transformer Encoder que tem a sua saída ligada em uma MLP que será responsável pela classificação nas classes previamente explicadas.

### Classe Transformer

Esta classe foi implementada seguindo algumas especificações dísponiveis no site do Keras. Regularizações, normalizações e dropout foram implementados a medida que foi sentido necessário.

In [None]:
from tensorflow import keras

class TransformerBlock(keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim,layer_rate,l2_reg,rate=0.1):
        super().__init__()
        self.att = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [keras.layers.Dense(ff_dim, activation="relu",kernel_regularizer=keras.regularizers.L2(l2_reg)),
             keras.layers.BatchNormalization(),
             keras.layers.Dropout(rate),
             keras.layers.Dense(embed_dim,kernel_regularizer=keras.regularizers.L2(l2_reg)),
        ])
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=layer_rate)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=layer_rate)
        self.dropout1 = keras.layers.Dropout(rate)
        self.dropout2 = keras.layers.Dropout(rate)

    def call(self, inputs,training):
        attn_output = self.att(inputs, inputs,use_causal_mask=True)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)




### Classe Embedding

Esta classe também segue padrões estabelecidos pelo Keras. Tendo outros aspectos adicionados posteriormente para melhorar a rede.

In [None]:
class TokenAndPositionEmbedding(keras.layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim,dropout_rate,l2_reg):
        super().__init__()
        self.token_emb = keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim,embeddings_regularizer=keras.regularizers.L2(l2_reg))
        self.pos_emb = keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim,embeddings_regularizer=keras.regularizers.L2(l2_reg))
        self.dropout = keras.layers.Dropout(rate=dropout_rate)

    def call(self, x,training):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        x = self.dropout(x, training=training)
        return x + positions


### Classe tweetModel

Modelo de rede completo, nele recebemos uma entrada de tamanho determinado, passo pela camada de embedding e Transformer, que são ligadas em uma rede MLP em formato Funil no qual o tamanho da camada vai dimunindo de maneira continua.

In [None]:
class tweetModel:
  def __init__(self,embed_dim,num_heads,ff_dim,input_length,rate,vocab_size,l2_reg,layer_norm=1e-6):

    # set_memory_growth(list_physical_devices('GPU')[0], True)
    inputs = keras.layers.Input(shape=(input_length,))
    embedding_layer = TokenAndPositionEmbedding(input_length,vocab_size + 1,embed_dim,rate,l2_reg)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim,num_heads,ff_dim,layer_norm,l2_reg,rate)
    x = transformer_block(x)
    x = keras.layers.GlobalAveragePooling1D()(x)
    x = keras.layers.Dropout(rate)(x)
    x = keras.layers.Dense(64,activation='relu')(x)
    x = keras.layers.Dropout(rate)(x)
    x = keras.layers.Dense(32,activation='relu')(x)
    x = keras.layers.Dropout(rate)(x)
    x = keras.layers.Dense(16,activation='relu')(x)
    x = keras.layers.Dropout(rate)(x)
    x = keras.layers.Dense(8,activation='relu')(x)
    x = keras.layers.Dropout(rate)(x)
    outputs = keras.layers.Dense(3,activation='sigmoid')(x)
    self.model = keras.Model(inputs=inputs,outputs=outputs)

  def compile(self):
    self.model.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

  def summary(self):
    self.model.summary()

  def fitModel(self,train,train_label,validation,batch_size,epochs):
    self.model.fit(train,train_label,batch_size=batch_size,epochs=epochs,validation_data=validation)

  def save(self,filename):
    self.model.save(filename)

  def predict(self,input):
    return self.model.predict(input)

### Compilando o modelo

In [None]:
import numpy as np

twModel = tweetModel(12,6,32,len(train_tweet[0]),0.1,vocab_size,1e-7,1e-7)
twModel.compile()
twModel.summary()

Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_5 (InputLayer)        [(None, 87)]              0         
                                                                 
 token_and_position_embeddi  (None, 87, 12)            747552    
 ng_6 (TokenAndPositionEmbe                                      
 dding)                                                          
                                                                 
 transformer_block_6 (Trans  (None, 87, 12)            4672      
 formerBlock)                                                    
                                                                 
 global_average_pooling1d_3  (None, 12)                0         
  (GlobalAveragePooling1D)                                       
                                                                 
 dropout_43 (Dropout)        (None, 12)                0   

### Treinamento

In [None]:
twModel.fitModel(train=np.array(train_tweet),train_label=np.array(train_label),batch_size=12,epochs=3,validation=(np.array(val_tweet),np.array(val_label)))

Epoch 1/3
Epoch 2/3
Epoch 3/3


### Salvando modelo

In [None]:
twModel.save('tweet_sentimental.h5')

## Subindo modelos para Testes

In [None]:
!git clone -b tweet_eval https://github.com/TarcizioLafaiete/Faraday.git

Cloning into 'Faraday'...
remote: Enumerating objects: 20090, done.[K
remote: Counting objects: 100% (95/95), done.[K
remote: Compressing objects: 100% (70/70), done.[K
remote: Total 20090 (delta 27), reused 90 (delta 22), pack-reused 19995[K
Receiving objects: 100% (20090/20090), 326.27 MiB | 28.10 MiB/s, done.
Resolving deltas: 100% (27/27), done.


## Teste e Estatisticas de predição

In [None]:
import numpy as np

def array_to_label(array):
  element = max(array[0])
  return array[0].tolist().index(element)

testModel = tf.keras.models.load_model('/content/Faraday/tweet_sentimental.h5',
                                       custom_objects={'TokenAndPositionEmbedding':TokenAndPositionEmbedding,
                                                       'TransformerBlock':TransformerBlock})
correct_predicts = 0
wrong_predicts = 0
pair_labels = []
for i in range(len(test_tweet) - 1):
  solution = testModel.predict([test_tweet[i].tolist()])
  predict_label = array_to_label(solution)
  real_label = test_label[i]
  pair_labels.append([predict_label,real_label])
  if predict_label == real_label:
    correct_predicts += 1
  else:
    wrong_predicts += 1


In [None]:
print("wrong_predicts: ",wrong_predicts," correct_predict: ",correct_predicts," total of predicts: ",len(test_tweet) - 1);
print("Accurancy: ",correct_predicts/(len(test_tweet) - 1))

wrong_predicts:  1410  correct_predict:  2502  total of predicts:  3912
Accurancy:  0.6395705521472392


In [None]:
def diff_mean(diff_list):
  size = len(diff_list)
  sum = 0
  for diff in diff_list:
    sum += diff
  return sum/size

def calc_metrics(pair_list,label_ref):
  t_neg = 0
  f_neg = 0
  t_pos = 0
  f_pos = 0
  diff_list = []
  for pair in pair_list:
    diff_list.append(abs(pair[0] - pair[1]))
    if pair[0] == label_ref and pair[1] == label_ref :
      t_pos += 1
    elif pair[0] == label_ref and pair[1] != label_ref:
      f_pos += 1
    elif pair[0] != label_ref and pair[1] == label_ref:
      f_neg += 1
    else:
      t_neg += 1

  precision = t_pos/(t_pos + f_pos)
  recall = t_pos/(t_pos + f_neg)
  f1_score = (2*precision*recall)/(precision + recall)


  return {
      'ref' : label_ref,
      'precision' : precision,
      'recall' : recall,
      'f1_score': f1_score,
      'diff_mean' : diff_mean(diff_list)
  }

def print_statitics(labels):
  classification = { 0 : 'negative', 1: 'neutral', 2: 'positive'}
  print(classification[labels['ref']],"metrics: precision: ",labels['precision']," recall: ",labels['recall'],
        " f1_score: ",labels['f1_score']," diff_mean: ",labels['diff_mean'])

In [None]:
neutral_label = calc_metrics(pair_labels,1)
print_statitics(neutral_label)
negative_label = calc_metrics(pair_labels,0)
print_statitics(negative_label)
positive_label = calc_metrics(pair_labels,2)
print_statitics(positive_label)

neutral metrics: precision:  0.6231884057971014  recall:  0.7155126140633387  f1_score:  0.6661669165417291  diff_mean:  0.37934560327198363
negative metrics: precision:  0.6536170212765957  recall:  0.5840304182509506  f1_score:  0.6168674698795181  diff_mean:  0.37934560327198363
positive metrics: precision:  0.6705685618729097  recall:  0.5463215258855586  f1_score:  0.6021021021021021  diff_mean:  0.37934560327198363


## Teste de Novas Frases

In [None]:
#Carrega modelo ja treinado
promptModel = tf.keras.models.load_model('/content/Faraday/tweet_sentimental.h5',
                                       custom_objects={'TokenAndPositionEmbedding':TokenAndPositionEmbedding,
                                                       'TransformerBlock':TransformerBlock})

In [None]:
def predict_prhase(text):
  #Realiza nova predicao com base na entrada coloca pelo
  token = tweet.tokenize_phrase(text,87)
  solution = promptModel.predict([token[0].tolist()])
  element = max(solution[0])
  return solution[0].tolist().index(element)

classification = {0 : 'negative', 1: 'neutral', 2: 'positive'}
user_input = input("Publish a new tweet: ")
label = predict_prhase(user_input)
print("Your prhase was classified as",classification[label])