# Mineração de Texto - Experimentos e Treinamento do modelo de classificação

## Bibliotecas

In [None]:
# TODO: corrigir versão do pacote (incompatibilidade)
%pip install numpy==2.2.2
%pip install gensim
%pip install datasets
%pip install optuna

In [None]:
from datasets import Dataset, ClassLabel
import pandas as pd
import numpy as np
import nltk
import re
from nltk.tokenize import word_tokenize
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
from transformers import AutoTokenizer, DataCollatorWithPadding, AutoModelForSequenceClassification, AutoModel, TrainingArguments, Trainer, EarlyStoppingCallback
from sklearn.model_selection import StratifiedKFold
import string

from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support, precision_score, recall_score, f1_score
import numpy as np

nltk.download('stopwords')
nltk.download('punkt')
nltk.download('rslp')
nltk.download('punkt_tab')

from sklearn.utils.class_weight import compute_class_weight
import torch.nn as nn
import optuna
from sklearn.pipeline import Pipeline as PipelineSkt

from transformers import pipeline
import gensim
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import GridSearchCV
from collections import Counter
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler
from sklearn.ensemble import RandomForestClassifier
import logging
from google.colab import drive
from datetime import datetime
import os
import gc
from sklearn.dummy import DummyClassifier
import joblib
import seaborn as sns


In [None]:
# Altere para True a variavel abaixo para salvar os experimentos no Google Drive
IS_LOGGIN_IN_DRIVE = True

In [None]:
# Altere essa variavel para testar cada estratégia de pré-processamento na etapa
# de extração de padrões
CURRENT_PROCESS_LEVEL = 'text_strategy_0'

In [None]:
if IS_LOGGIN_IN_DRIVE:
  drive.mount('/content/drive')

  now = datetime.now()
  formatted_time = now.strftime('%Y-%m-%d%H:%M:%S')

  folder_path = '/content/drive/MyDrive/mba-eng-de-software/experimentos'
  if not os.path.exists(folder_path):
      os.makedirs(folder_path)

  log_file_path = f'{folder_path}/log_{formatted_time}.txt'

  logging.basicConfig(filename=log_file_path, level=logging.INFO, format='%(asctime)s %(message)s', force=True)

In [None]:
def log(to_log):
  """Salva o google drive e exibe no console

  Args:
    value: Texto para ser exibido.
  """
  out = to_log
  if not isinstance(out, str):
    try:
      out = str(to_log)
    except Exception as e:
      try:
        out = to_log.to_string()
      except Exception as e:
        print(f"Erro no log: {e}")

  if IS_LOGGIN_IN_DRIVE:
    logging.info(out)
  print(out)

In [None]:
!gdown 1hTx4dTsgFc5NfK_VOW07gbZaBQ30nK9b

In [None]:
!unzip portuguese_tweets_for_sentiment_analysis.zip

## Parte 1: Seleção da Base de Dados e análises

In [None]:
df = pd.read_csv('./TrainingDatasets/TrainTema.csv', sep=';', encoding='utf-8')
df = df[['tweet_text', 'sentiment']] # capturando apenas as colunas que irão ser utilizadas
df

In [None]:
# Analisando variável target
df_target_count = df.sentiment.value_counts()
df_target_percentage = df.sentiment.value_counts(normalize = True) * 100

log(f'Textos classificados como positivos: {df_target_count[1]} ({(df_target_percentage[0]):.2f}%)')
log(f'Textos classificados como negativos: {df_target_count[0]} ({(df_target_percentage[1]):.2f}%)')

In [None]:
# Analisando se possui algum dado nulo
df.isnull().sum()

In [None]:
# Analisando se possui dados duplicados
duplicated = df.duplicated()
log(f'Número de linhas duplicadas: {duplicated.sum()}')
log(f'dados duplicados: {df[duplicated]}')

In [None]:
# Removendo dados duplicados
df = df.drop_duplicates().reset_index(drop=True)

In [None]:
# Analisando variável target apos remocao da duplicação
df_target_count = df.sentiment.value_counts()
df_target_percentage = df.sentiment.value_counts(normalize = True) * 100

log(f'Textos classificados como positivos: {df_target_count[1]} ({(df_target_percentage[0]):.2f}%)')
log(f'Textos classificados como negativos: {df_target_count[0]} ({(df_target_percentage[1]):.2f}%)')

In [None]:
# Maior texto que existe no dataset
max_text_length = df['tweet_text'].apply(len).max()

max_text_length

In [None]:
# analisando textos com contexto negativo
negation_texts = df[df['tweet_text'].str.contains(r'\b(?:não|nao|nunca|nem)\b', case=False, na=False)]
negation_texts_count = negation_texts.sentiment.value_counts()
log(negation_texts_count)
negation_texts

### Para o Dataset de teste

In [None]:
# df_test
df_test = pd.read_csv('./TestDatasets/TestTema.csv', sep=';', encoding='utf-8')
df_test = df_test[['tweet_text', 'sentiment']] # capturando apenas as colunas que irão ser utilizadas
df_test

In [None]:
# Analisando variável target
df_test_target_count = df_test.sentiment.value_counts()
df_test_target_percentage = df_test.sentiment.value_counts(normalize = True) * 100

log(f'Textos de teste classificados como positivos: {df_test_target_count[1]} ({(df_test_target_percentage[0]):.2f}%)')
log(f'Textos de teste classificados como negativos: {df_test_target_count[0]} ({(df_test_target_percentage[1]):.2f}%)')

In [None]:
# Analisando se possui algum dado nulo
df_test.isnull().sum()

In [None]:
# Analisando se possui dados duplicados
duplicated_df_test = df_test.duplicated()
log(f'Número de linhas duplicadas no teste: {duplicated_df_test.sum()}')
log(f'dados duplicadosno teste: {df_test[duplicated_df_test]}')

In [None]:
# Removendo dados duplicados
df_test = df_test.drop_duplicates().reset_index(drop=True)

In [None]:
# Analisando variável target
df_test_target_count = df_test.sentiment.value_counts()
df_test_target_percentage = df_test.sentiment.value_counts(normalize = True) * 100

log(f'Textos de teste classificados como positivos: {df_test_target_count[1]} ({(df_test_target_percentage[0]):.2f}%)')
log(f'Textos de teste classificados como negativos: {df_test_target_count[0]} ({(df_test_target_percentage[1]):.2f}%)')

## Pré-processamento

In [None]:
# função para fazer a remoção basica e textos específicos (pontuações, url, @, #..)
def basic_cleaning(text):
  s = str(text).lower() # tudo para caixa baixa
  s = s.replace('\n', ' ') # quebras de linha
  s = re.sub(r'http\S+', '', s) # url
  s = re.sub(r'@\w+', '', s) # \@s
  s = re.sub(r'#\w+', '', s) # \#s
  s = re.sub(f"[{re.escape(string.punctuation)}]", " ", s)
  s = re.sub(r'\s+', ' ', s).strip()
  s = re.sub(r'(k{2,}|h{2,})', '', s) # risadas
  s = re.sub(r'\d+', '', s) # números
  return s.strip()

In [None]:
# função para remover stopwords
stop_words = nltk.corpus.stopwords.words('portuguese') # obtem stopwords

def remove_stopwords(text, domain_stopwords=[], keep_stopwords=[]):
  tokens = word_tokenize(text) # obtem tokens
  v = [i for i in tokens if (not i in stop_words and not i in domain_stopwords) or i in keep_stopwords] # remove stopwords
  s = ""
  for token in v:
    s += token + " "
  return s

In [None]:
# função para fazer a radicalização das palavras
stemmer = nltk.stem.RSLPStemmer() # stemming para portuguese

def stemming(text):
  tokens = word_tokenize(text) # obtem tokens
  sentence_stem = ''
  doc_text_stems = [stemmer.stem(i) for i in tokens]
  for stem in doc_text_stems:
    sentence_stem += stem + " "

  return sentence_stem.strip()

In [None]:
def apply_preprocess(df):
  df['text_strategy_0'] = df['tweet_text'].apply(basic_cleaning)
  df['text_strategy_1'] = df['text_strategy_0'].apply(remove_stopwords, args=(['tt', 'rt'], ['não', 'nunca', 'nem', 'nao'],))
  df['text_strategy_2'] = df['text_strategy_1'].apply(stemming)

  return df

In [None]:
df = apply_preprocess(df)
df

In [None]:
## Para o dataset de teste
df_test = apply_preprocess(df_test)
df_test

In [None]:
text_hate_c = ' '.join(df[df['sentiment'] == 1]['text_strategy_1'])
wordcloud_hate_c = WordCloud(background_color="white", width=800, height=400).generate(text_hate_c)

plt.figure(figsize=(15,7))
plt.imshow(wordcloud_hate_c, interpolation='bilinear')
plt.axis('off')
plt.title('Nuvem - Sentimento positivo - Estratégia 1')
plt.show()

text_non_hate_c = ' '.join(df[df['sentiment'] == 0]['text_strategy_1'])
wordcloud_non_hate_c = WordCloud( background_color="white", width=800, height=400).generate(text_non_hate_c)

plt.figure(figsize=(15,7))
plt.imshow(wordcloud_non_hate_c, interpolation='bilinear')
plt.axis('off')
plt.title('Nuvem - Sentimento negativo - Estratégia 1')
plt.show()

tokens_label_1 = []
tokens_label_0 = []

In [None]:
# verifica a ocorrencia das palavras
counter_1 = Counter(text_hate_c.split())
counter_0 = Counter(text_non_hate_c.split())

most_common_1 = counter_1.most_common(200)
most_common_0 = counter_0.most_common(200)

log(f'Classe 1 contextualizada: {most_common_1}')
log(f'Classe 0 contextualizada: {most_common_0}')

In [None]:
text_hate_sc = ' '.join(df[df['sentiment'] == 1]['text_strategy_2'])
wordcloud_hate_sc = WordCloud(background_color="white", width=800, height=400).generate(text_hate_sc)

plt.figure(figsize=(15,7))
plt.imshow(wordcloud_hate_sc, interpolation='bilinear')
plt.axis('off')
plt.title('Nuvem - Sentimento Positivo - Estratégia 2')
plt.show()

text_non_hate_sc = ' '.join(df[df['sentiment'] == 0]['text_strategy_2'])
wordcloud_non_hate_sc = WordCloud( background_color="white", width=800, height=400).generate(text_non_hate_sc)

plt.figure(figsize=(15,7))
plt.imshow(wordcloud_non_hate_sc, interpolation='bilinear')
plt.axis('off')
plt.title('Nuvem - Sentimento Negativo - Estratégia 2')
plt.show()

tokens_label_1 = []
tokens_label_0 = []

In [None]:
# verifica a ocorrencia das palavras
counter_1 = Counter(text_hate_sc.split())
counter_0 = Counter(text_non_hate_sc.split())

most_common_1 = counter_1.most_common(200)
most_common_0 = counter_0.most_common(200)

log(f'Classe 1 contextualizada: {most_common_1}')
log(f'Classe 0 contextualizada: {most_common_0}')

## Parte 3 - Extração de padrões

In [None]:
log(f'Extraindo padrões utilizando: {CURRENT_PROCESS_LEVEL}')

In [None]:
df_current_process = df[[CURRENT_PROCESS_LEVEL, 'sentiment']].copy()
df_current_process.columns = ['text', 'label']
# Poda para ficar somente 10000 samples
df_current_process = df_current_process.sample(n=10000, random_state=42)
df_current_process

In [None]:
# df_test
df_test_current_process = df_test[[CURRENT_PROCESS_LEVEL, 'sentiment']].copy()
df_test_current_process.columns = ['text', 'label']
df_test_current_process

In [None]:
def knn_classifier(X_train, y_train, X_test, y_test, experiment, scaller, classes, should_save=False):
  log(f'Iniciando experimento {experiment}')

  pipeline = PipelineSkt([
      ('scaler', scaller),
      ('knn', KNeighborsClassifier())
  ])

  param_grid = {
      'knn__n_neighbors': [1, 3, 5, 7, 9],
      'knn__metric': ['euclidean', 'cosine'],
      'knn__weights': ['uniform', 'distance'],
  }

  grid = GridSearchCV(pipeline, param_grid, cv=5, n_jobs=-1, verbose=2)

  grid.fit(X_train, y_train)

  if should_save:
    joblib.dump(grid, f'{experiment}.joblib')

  best_index = grid.best_index_
  best_mean_accuracy = grid.best_score_
  best_std_accuracy = grid.cv_results_['std_test_score'][best_index]

  log(f'Melhores parâmetros: {grid.best_params_}')
  log(f'Acurácia: {best_mean_accuracy}')
  log(f'Desvio padrão: {best_std_accuracy}')

  y_pred = grid.predict(X_test)

  cm = confusion_matrix(y_test, y_pred)
  df_cm = pd.DataFrame(cm, index=classes, columns=classes)
  cr = classification_report(y_test, y_pred)

  log('\n\n' + 'Report')
  log('\n\n' + cr)
  log('\n\n' + 'Matriz de confusão')
  log('\n\n' + df_cm.to_string())
  log('\n\n' + "Acurácia: %0.4f, Desvio padrão: %0.4f" % (best_mean_accuracy, best_std_accuracy))

  log(f'Finalizando experimento {experiment}')

  plt.figure(figsize=(10,7))
  sns.heatmap(df_cm, annot=True, fmt='d')
  plt.ylabel('Classe verdadeira')
  plt.xlabel('Classe predita')
  plt.show()

In [None]:
def rf_classifier(X_train, y_train, X_test, y_test, experiment, classes, should_save=False):
  log(f'Iniciando experimento {experiment}')

  pipeline = PipelineSkt([
      ('rf', RandomForestClassifier(random_state=42, class_weight='balanced'))
  ])

  param_grid = {
      'rf__n_estimators': [100, 150],
      'rf__max_depth': [None, 10, 20],
      'rf__min_samples_split': [2, 5],
      'rf__min_samples_leaf': [1, 2],
      'rf__bootstrap': [True]
  }

  grid = GridSearchCV(pipeline, param_grid, cv=5, n_jobs=-1, verbose=2)

  grid.fit(X_train, y_train)

  if should_save:
    joblib.dump(grid, f'{experiment}.joblib')

  best_index = grid.best_index_
  best_mean_accuracy = grid.best_score_
  best_std_accuracy = grid.cv_results_['std_test_score'][best_index]

  log(f'Melhores parâmetros: {grid.best_params_}')
  log(f'Acurácia: {best_mean_accuracy}')
  log(f'Desvio padrão: {best_std_accuracy}')

  y_pred = grid.predict(X_test)

  cm = confusion_matrix(y_test, y_pred)
  df_cm = pd.DataFrame(cm, index=classes, columns=classes)
  cr = classification_report(y_test, y_pred)

  log('\n\n' + 'Report')
  log('\n\n' + cr)
  log('\n\n' + 'Matriz de confusão')
  log('\n\n' + df_cm.to_string())
  log('\n\n' + "Acurácia: %0.4f, Desvio padrão: %0.4f" % (best_mean_accuracy, best_std_accuracy))

  log(f'Finalizando experimento {experiment}')

  plt.figure(figsize=(10,7))
  sns.heatmap(df_cm, annot=True, fmt='d')
  plt.ylabel('Classe verdadeira')
  plt.xlabel('Classe predita')
  plt.show()

#### Teste 0: Ponderação TF-IDF - Com classificador Knn e Random Forest

In [None]:
log('Teste 0: Ponderação TF-IDF - Com classificador Knn e Random Forest')

In [None]:
df_train_t0 = df_current_process
df_test_t0 = df_test_current_process

In [None]:
vectorizer = TfidfVectorizer(min_df=2)

In [None]:
X_train = vectorizer.fit_transform(df_train_t0['text'].to_list())
y_train = df_train_t0['label'].to_list()

X_test = vectorizer.transform(df_test_t0['text'].to_list())
y_true = df_test_t0['label'].to_list()

##### Usando KNN

In [None]:
knn_classifier(
  X_train,
  y_train,
  X_test,
  y_true,
  "Teste 0: Ponderação TF-IDF - Com classificador Knn",
  MaxAbsScaler(),
  ['Classe 0', 'Classe 1']
)

##### Usando Random Forest

In [None]:
rf_classifier(
  X_train,
  y_train,
  X_test,
  y_true,
  "Teste 0: Ponderação TF-IDF - Com classificador Random Forest",
  ['Classe 0', 'Classe 1']
)

#### Teste 1: Word Embeddings Estática - Word2Vec - Com classificador Knn e Random Forest

In [None]:
log('Teste 1: Word Embeddings Estática - Word2Vec - Com classificador Knn e Random Forest')

In [None]:
W2V_VECTOR_SIZE = 300

In [None]:
df_current_process['tokens'] = df_current_process['text'].apply(word_tokenize)
df_test_current_process['tokens'] = df_test_current_process['text'].apply(word_tokenize)
w2v_model = gensim.models.Word2Vec(sentences=df_current_process['tokens'], vector_size=W2V_VECTOR_SIZE, window=5, min_count=1, workers=2)

In [None]:
df_train_t1 = df_current_process
df_test_t1 = df_test_current_process

In [None]:
def get_embeddings(df):
  doc_embeddings = []
  for index, row in df.iterrows():
      tokens = row['tokens']
      L = []
      for token in tokens:
        try:
            L.append(w2v_model.wv[token])
        except KeyError:
            print('Ocorreu um erro')
            pass

      if len(L) > 0:
          text_vec = np.mean(np.array(L), axis=0)
      else:
          text_vec = np.zeros(W2V_VECTOR_SIZE)

      doc_embeddings.append(text_vec)
  return doc_embeddings

In [None]:
train_embeddings = get_embeddings(df_train_t1)
test_embeddings = get_embeddings(df_test_t1)

In [None]:
X_train = np.array(train_embeddings)
y_train = df_train_t1['label'].to_list()

X_test = np.array(test_embeddings)
y_true = df_test_t1['label'].to_list()

##### Usando KNN

In [None]:
knn_classifier(
  X_train,
  y_train,
  X_test,
  y_true,
  "Teste 1: Word Embeddings Estática - Word2Vec - Com classificador Knn",
  MinMaxScaler(),
  ['Classe 0', 'Classe 1']
)

##### Usando Random Forest

In [None]:
rf_classifier(
  X_train,
  y_train,
  X_test,
  y_true,
  "Teste 1: Word Embeddings Estática - Word2Vec - Com classificador Random Forest",
  ['Classe 0', 'Classe 1']
)

#### Teste 2: Word Embeddings Contextuais - Com classificador Knn e Random Forest

In [None]:
torch.cuda.empty_cache()
gc.collect()

In [None]:
log('Teste 2: Word Embeddings Contextuais - Com classificador Knn e Random Forest')

In [None]:
MODEL_NAME = 'neuralmind/bert-base-portuguese-cased'

In [None]:
df_current_process = df[[CURRENT_PROCESS_LEVEL, 'sentiment']].copy()
df_current_process.columns = ['text', 'label']
df_current_process

# df_test
df_test_current_process = df_test[[CURRENT_PROCESS_LEVEL, 'sentiment']].copy()
df_test_current_process.columns = ['text', 'label']
df_test_current_process

In [None]:
df_train_t2 = df_current_process
df_test_t2 = df_test_current_process

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME)
model.to('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def get_context_embeddings(texts, batch_size=32):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    all_embeddings = []

    # Loop em minibatches
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i : i + batch_size].tolist()

        inputs = tokenizer(
            batch_texts,
            padding=True,
            truncation=True,
            return_tensors='pt',
            max_length=300
        )
        inputs = inputs.to(device)

        with torch.no_grad():
            outputs = model(**inputs)

        # Pega o embedding do [CLS] (ou seja, o primeiro token = index 0)
        batch_embeddings = outputs.last_hidden_state[:, 0, :]
        all_embeddings.append(batch_embeddings.cpu())  # Se quiser retornar para CPU

    # Concatena todos os embeddings numa única matriz
    return torch.cat(all_embeddings, dim=0)

In [None]:
train_outputs = get_context_embeddings(df_train_t2['text'], batch_size=32)
test_outputs = get_context_embeddings(df_test_t2['text'], batch_size=32)

In [None]:
X_train = train_outputs
y_train = df_train_t2['label'].to_list()

X_test = test_outputs
y_true = df_test_t2['label'].to_list()

##### Usando KNN

In [None]:
knn_classifier(
  X_train,
  y_train,
  X_test,
  y_true,
  "Teste 1: Word Embeddings Estática - Word2Vec - Com classificador Knn",
  MinMaxScaler(),
  ['Classe 0', 'Classe 1']
)

##### Usando Random Forest

In [None]:
# Comentado devido ao poder computacional
# rf_classifier(
#   X_train,
#   y_train,
#   X_test,
#   y_true,
#   "Teste 2: Word Embeddings Contextuais - Com classificador Random Forest",
#   ['Classe 0', 'Classe 1']
# )

#### Teste 3: Ajuste fino em um modelo pré treinado com o Trainer do Hugging Face


In [None]:
torch.cuda.empty_cache()
gc.collect()

if 'train_outputs' in globals() or 'train_outputs' in locals():
  del train_outputs
if 'test_outputs' in globals() or 'train_outputs' in locals():
  del test_outputs

In [None]:
log('Teste 3: Ajuste fino em um modelo pré treinado com o Trainer do Hugging Face')

In [None]:
df_lv_2_with_sw = df_current_process
df_lv_2_with_sw.columns = ['text', 'label']
df_lv_2_with_sw

In [None]:
df_lv_2_with_sw_ds =  Dataset.from_pandas(df_lv_2_with_sw.reset_index(drop=True))

In [None]:
MODEL_NAME = 'neuralmind/bert-base-portuguese-cased'
log('Modelo: ' + MODEL_NAME)

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, do_lower_case=False)

In [None]:
def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True)

In [None]:
df_tokenize = df_lv_2_with_sw_ds.map(preprocess_function, batched=True)
df_tokenize = df_tokenize.remove_columns(["text"])
df_tokenize.set_format("torch")

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
df_tokenize = df_tokenize.cast_column('label', ClassLabel(num_classes=len(set(df_tokenize['label']))))

#### Para o teste

In [None]:
df_test_lv_2_with_sw = df_test_current_process
df_test_lv_2_with_sw.columns = ['text', 'label']
df_test_lv_2_with_sw

In [None]:
df_test_lv_2_with_sw_ds =  Dataset.from_pandas(df_test_lv_2_with_sw.reset_index(drop=True))
df_test_tokenize = df_test_lv_2_with_sw_ds.map(preprocess_function, batched=True)
df_test_tokenize = df_test_tokenize.remove_columns(["text"])
df_test_tokenize.set_format("torch")
df_test_tokenize = df_test_tokenize.cast_column('label', ClassLabel(num_classes=len(set(df_test_tokenize['label']))))

In [None]:
dft_test_val = df_test_tokenize.train_test_split(test_size=0.30, seed=42, stratify_by_column='label')

In [None]:
train_ds = df_tokenize
test_ds = dft_test_val['train']
val_ds = dft_test_val['test']

In [None]:
log(f'Modelo: {MODEL_NAME}')
log(f'Dataset inicial: {str(df_tokenize.shape)}')
log(f'Dataset de treinamento: {str(train_ds.shape)}')
log(f'Dataset de teste: {str(test_ds.shape)}')
log(f'Dataset de validação: {str(val_ds.shape)}')

In [None]:
class CustomTrainer(Trainer):
    def __init__(self, class_weights, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [None]:
def model_init():
    return AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2)

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average='binary')
    recall = recall_score(labels, preds, average='binary')
    f1 = f1_score(labels, preds, average='binary')
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

def optuna_hp_space(trial):
    return {
        'learning_rate': trial.suggest_float('learning_rate', 1e-5, 5e-5, log=True),
        'num_train_epochs': trial.suggest_int('num_train_epochs', 2, 5),
        'per_device_train_batch_size': trial.suggest_categorical('per_device_train_batch_size', [8, 16, 32]),
        'weight_decay': trial.suggest_float('weight_decay', 0.0, 0.3),
    }

training_args = TrainingArguments(
    output_dir='optuna_results',
    eval_strategy="epoch",

    save_strategy="no",
    # save_strategy="epoch",
    # load_best_model_at_end=True,
    greater_is_better=True,
    metric_for_best_model="eval_f1",

    logging_strategy="no",
    seed=42,
    report_to="none",
)

train_labels = np.array(train_ds['label'])
classes = np.unique(train_labels)
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=train_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float)
class_weights = class_weights.to('cuda' if torch.cuda.is_available() else 'cpu')

print(f'Quantidade de 0s e 1s: {np.bincount(train_labels)}')
print(f'Pesos para as classes: {class_weights}')

trainer = CustomTrainer(
    model_init=model_init,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    class_weights=class_weights,
)

def compute_objective(metrics):
    return metrics['eval_f1']

best_trials = trainer.hyperparameter_search(
    direction="maximize",
    backend="optuna",
    hp_space=optuna_hp_space,
    n_trials=10,
    compute_objective=compute_objective,
)

In [None]:
log('\nMelhor configuração encontrada')
log(best_trials.hyperparameters)

In [None]:
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

accuracy_list = []
precision_list = []
recall_list = []
f1_list = []

train_labels = np.array(train_ds['label'])
classes = np.unique(train_labels)
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=train_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float)
class_weights = class_weights.to('cuda' if torch.cuda.is_available() else 'cpu')

for fold, (train_indices, val_indices) in enumerate(skf.split(np.zeros(len(train_labels)), train_labels)):
    log(f"Iniciando Fold {fold + 1}/{n_splits}")

    train_dataset = train_ds.select(train_indices)
    val_dataset = train_ds.select(val_indices)

    model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME, num_labels=2
    )

    training_args = TrainingArguments(
        output_dir=f"my-model-fold-{fold}",
        learning_rate=best_trials.hyperparameters['learning_rate'],
        per_device_train_batch_size=best_trials.hyperparameters['per_device_train_batch_size'],
        per_device_eval_batch_size=best_trials.hyperparameters['per_device_train_batch_size'],
        num_train_epochs=best_trials.hyperparameters['num_train_epochs'],
        weight_decay=best_trials.hyperparameters['weight_decay'],
        eval_strategy="epoch",
        save_strategy="no",
        # save_strategy="epoch",
        # load_best_model_at_end=True,
        metric_for_best_model="eval_f1",
        save_total_limit=1,
        seed=42,
        report_to="none"
    )

    trainer = CustomTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
        class_weights=class_weights,
    )

    trainer.train()

    eval_result = trainer.evaluate()

    accuracy_list.append(eval_result['eval_accuracy'])
    precision_list.append(eval_result['eval_precision'])
    recall_list.append(eval_result['eval_recall'])
    f1_list.append(eval_result['eval_f1'])

    log(f'\n Resultados do Fold {fold + 1}:')
    log(f"\n Acurácia: {eval_result['eval_accuracy']}")
    log(f"\n Precisão: {eval_result['eval_precision']}")
    log(f"\n Recall: {eval_result['eval_recall']}")
    log(f"\n F1 Score: {eval_result['eval_f1']}")

avg_accuracy = np.mean(accuracy_list)
avg_precision = np.mean(precision_list)
avg_recall = np.mean(recall_list)
avg_f1 = np.mean(f1_list)
std_accuracy = np.std(accuracy_list)

log('Resultados da Validação Cruzada:')
log(f'Acurácia Média: {avg_accuracy}')
log(f'Desvio padrão da acurácia: {std_accuracy}')
log(f'Precisão Média: {avg_precision}')
log(f'Recall Médio: {avg_recall}')
log(f'F1 Score Médio: {avg_f1}')

In [None]:
# treinando o modelo final com os melhores hiperparametros

train_labels = np.array(train_ds['label'])
classes = np.unique(train_labels)
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=train_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float)
class_weights = class_weights.to('cuda' if torch.cuda.is_available() else 'cpu')

log(f'Quantidade de 0s e 1s: {np.bincount(train_labels)}')
log(f'Pesos para as classes: {class_weights}')

training_args = TrainingArguments(
    output_dir=f"melhor-modelo-{CURRENT_PROCESS_LEVEL}",
    learning_rate=best_trials.hyperparameters['learning_rate'],
    per_device_train_batch_size=best_trials.hyperparameters['per_device_train_batch_size'],
    per_device_eval_batch_size=best_trials.hyperparameters['per_device_train_batch_size'],
    num_train_epochs=best_trials.hyperparameters['num_train_epochs'],
    weight_decay=best_trials.hyperparameters['weight_decay'],
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_f1",
    save_total_limit=1,
    seed=42,
    report_to="none"
)

early_stopping = EarlyStoppingCallback(early_stopping_patience=2)

trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    class_weights=class_weights,
    callbacks=[early_stopping],
)

trainer.train()

In [None]:
eval_result = trainer.evaluate()
log(f'\n Resultado do melhor modelo')
log(eval_result)

In [None]:
# avaliando ele no dataset de teste
predictions_output = trainer.predict(test_ds)
pred_logits = predictions_output.predictions
y_true = predictions_output.label_ids
y_pred = np.argmax(pred_logits, axis=-1)

In [None]:
cm = confusion_matrix(y_true, y_pred)
classes = ['Classe 0', 'Classe 1']
df_cm_t3 = pd.DataFrame(cm, index=classes, columns=classes)
cr_t3 = classification_report(y_true, y_pred)

In [None]:
log(f'\n Teste 3: Ajuste fino em um modelo pré treinado com o Trainer do Hugging Face')
log(f'\n Report')
log(cr_t3)
log(f'\n Matriz de confusão')
log(df_cm_t3)

In [None]:
df_cm = pd.DataFrame(cm, index=classes, columns=classes)
plt.figure(figsize=(10,7))
sns.heatmap(df_cm, annot=True, fmt='d')
plt.ylabel('Classe verdadeira')
plt.xlabel('Classe predita')
plt.show()

In [None]:
trainer.save_model(f'melhor-modelo-{CURRENT_PROCESS_LEVEL}')
tokenizer.save_pretrained(f'melhor-modelo-{CURRENT_PROCESS_LEVEL}')

In [None]:
if IS_LOGGIN_IN_DRIVE: # se ja ta salvando logs entao tem acesso o drive, entao só salva
  !zip -r melhor-modelo-{CURRENT_PROCESS_LEVEL}.zip melhor-modelo-{CURRENT_PROCESS_LEVEL}
  !cp -r melhor-modelo-{CURRENT_PROCESS_LEVEL}.zip /content/drive/MyDrive/mba-eng-de-software/experimentos

#### Verifica o desempenho de um classificador Dummy

In [None]:
df_train_tdummy = df_current_process
df_test_tdummy = df_test_current_process

In [None]:
X_train = df_train_tdummy['text'].to_list()
y_train = df_train_tdummy['label'].to_list()

X_test = df_test_tdummy['text'].to_list()
y_true = df_test_tdummy['label'].to_list()

In [None]:
texts = df_lv_2_with_sw['text'].to_list()
labels = df_lv_2_with_sw['label'].to_list()

dummy_clf = DummyClassifier(strategy='stratified')
dummy_clf.fit(X_train, y_train)
print(classification_report(y_true, dummy_clf.predict(X_test)))