Experimento Híbrido: Features Concatenadas (TF-IDF + NILC-Metrix)

Este script implementa a concatenação das 50 melhores features do TF-IDF com as 50 melhores features do NILC-Metrix para criar um vetor de 100 features e avaliar o desempenho dos modelos.

In [None]:
import nltk
nltk.download('stopwords')
from datasets import Dataset, DatasetDict
import pandas as pd
import numpy as np
import mord as m
import os
from nltk.corpus import stopwords
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, Normalizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_selection import mutual_info_regression, SelectKBest, chi2, f_classif, VarianceThreshold
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, cohen_kappa_score
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, HistGradientBoostingRegressor, GradientBoostingRegressor
from sklearn.svm import SVR, SVC
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import GridSearchCV
from functools import partial

# Definição do Tipo de Análise:
# Valores de 0 a 4: Competências do ENEM (C1 a C5)
# Valor 5: Nota final (soma das competências)
REFERENCE_CONCEPT = 4

# Carregamento do dataset

In [None]:
DATASET_PATH = "../corpus/"
DATASET_NAME = "-00000-of-00001.parquet"
DIVISIONS = ("train", "validation", "test")
def target_dataset_path(target: str):
    if target in DIVISIONS:
        return DATASET_PATH + target + DATASET_NAME
    else:
        raise ValueError("ERROR: Invalid target for dataset.")

datasets_dict = {}

for division in DIVISIONS:
    file_path = target_dataset_path(division)
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Arquivo não encontrado: {file_path}")
    df = pd.read_parquet(file_path, engine='pyarrow')
    datasets_dict[division] = Dataset.from_pandas(df)

dataset_filtrado = DatasetDict(datasets_dict)

In [None]:
# Mapeamento de notas para competências (não aplicável à nota final)
grade_mapping = {
    0: 0,
    40: 1,
    80: 2,
    120: 3,
    160: 4,
    200: 5,
}

def create_label(row):
  if REFERENCE_CONCEPT == 5:
    return {"label": row["grades"][-1]}
  else:
    grade = row["grades"][REFERENCE_CONCEPT]
    return {"label": grade_mapping[grade]}

dataset = dataset_filtrado.map(create_label)

In [None]:
# Carregamento das métricas NILC-Metrix
nome_arquivo = "metricas_redacoes_nilcmetrix.csv"
nilc_df = pd.read_csv(nome_arquivo)

nilc_df = nilc_df.rename(columns={"id_texto": "id"})

# Tirar 'redacao_' e adicionar '.html' no arquivo .csv na coluna dos IDs
nilc_df["id"] = nilc_df["id"].str.replace("redacao_", "") + ".html"

In [None]:
train_df = dataset["train"].to_pandas()
val_df = dataset["validation"].to_pandas()
test_df = dataset["test"].to_pandas()

# Fazer o merge com as métricas NILC
dados_train = pd.merge(train_df, nilc_df, on="id", how="left")
dados_val = pd.merge(val_df, nilc_df, on="id", how="left")
dados_test = pd.merge(test_df, nilc_df, on="id", how="left")

metric_cols = nilc_df.columns.drop('id')

dados_train_clean = dados_train.dropna(subset=metric_cols)
dados_val_clean = dados_val.dropna(subset=metric_cols)
dados_test_clean = dados_test.dropna(subset=metric_cols)

In [None]:
def enem_accuracy_score(true_values, predicted_values):
    """Calcula acurácia no padrão ENEM (diferença <= 80 pontos)"""
    if REFERENCE_CONCEPT == 5:
      limite_pontos = 80
    else:
      limite_pontos = 2
    assert len(true_values) == len(predicted_values), "Mismatched length between true and predicted values."  # Verifica se cada valor predito tem um correspondente pra calcular a diferença

    non_divergent_count = sum([1 for t, p in zip(true_values, predicted_values) if abs(t - p) <= limite_pontos])

    return non_divergent_count / len(true_values)

In [None]:
def regression_report(y_true, y_pred):
    """Relatório completo de métricas de regressão"""
    r2 = r2_score(y_true, y_pred)
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    enem_acc = enem_accuracy_score(y_true, y_pred)

    # QWK precisa transformar as notas em inteiros
    y_true_rounded = np.round(y_true).astype(int)
    y_pred_rounded = np.round(y_pred).astype(int)

    qwk = cohen_kappa_score(y_true_rounded, y_pred_rounded, weights="quadratic")

    print("Regression Report:")
    print(f"R² Score: {r2:.4f}")
    print(f"Mean Squared Error (MSE): {mse:.4f}")
    print(f"Mean Absolute Error (MAE): {mae:.4f}")
    print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")
    print(f"ENEM Accuracy Score: {enem_acc:.2f}")
    print(f"Quadratic Weighted Kappa (QWK): {qwk:.4f}")
    print()

In [None]:
stop_words = stopwords.words("portuguese")

X_train = dados_train_clean.drop('label', axis=1)
y_train = dados_train_clean['label']

X_test = dados_test_clean.drop('label', axis=1)
y_test = dados_test_clean['label']

# Definição do nome da coluna de texto com as redações
text_col = 'essay_text'

# Definição da lista de nomes das colunas de métricas
nilc_cols = metric_cols.tolist() # A variável 'metric_cols' é um objeto do Pandas -> transforma em lista do python

# Pré-Processamento Concatenado

In [None]:
# Regressão
if REFERENCE_CONCEPT == 0:
    # C1 (Gramática): Caracteres (mantém stop words)
    params = {'sublinear_tf': True, 'analyzer': 'char_wb', 'min_df': 5, 'ngram_range': (3, 5), 'max_features': 5000}

elif REFERENCE_CONCEPT == 3:
    # C4 (Coesão): Palavras + Conectivos (mantém stop words)
    params = {'sublinear_tf': True, 'min_df': 5, 'ngram_range': (1, 3)}

else:
    # C2, C3, C5 (Tema/Argumentação): Palavras sem stop words
    params = {'sublinear_tf': True, 'min_df': 5, 'ngram_range': (1, 2), 'stop_words': stop_words}

preprocessador_reg = ColumnTransformer(
    transformers=[
        ("tfidf", make_pipeline(
            TfidfVectorizer(**params),
            SelectKBest(score_func=partial(mutual_info_regression, random_state=1), k=50),
            Normalizer(norm='l2')
        ), text_col),

        ("nilc", make_pipeline(
            VarianceThreshold(),
            StandardScaler(),
            SelectKBest(score_func=partial(mutual_info_regression, random_state=1), k=50)
        ), nilc_cols)
    ],
    sparse_threshold=0 
)

# Classificação
preprocessador_clf = ColumnTransformer(
    transformers=[
        ("tfidf", make_pipeline(
            TfidfVectorizer(**params),
            SelectKBest(score_func=chi2, k=50),
            Normalizer(norm='l2')
        ), text_col),

        ("nilc", make_pipeline(
            VarianceThreshold(),
            StandardScaler(),
            SelectKBest(score_func=f_classif, k=50)
        ), nilc_cols)
    ],
    sparse_threshold=0 
)

X_train_transf_reg = preprocessador_reg.fit_transform(X_train, y_train)
X_test_transf_reg = preprocessador_reg.transform(X_test)

if REFERENCE_CONCEPT != 5:
    X_train_transf_clf = preprocessador_clf.fit_transform(X_train, y_train)
    X_test_transf_clf = preprocessador_clf.transform(X_test)

In [None]:
# Dicionário de modelos
modelos = {
    # Modelos de Regressão
    "Lasso": Lasso(),
    "Regressao Linear": LinearRegression(),
    "Ridge": Ridge(),
    "Random Forest Regression": RandomForestRegressor(random_state=1),
    "HistGradientBoostingRegressor": HistGradientBoostingRegressor(random_state=1),
    "SVR": SVR(),
    "MLP": MLPRegressor(random_state=1, max_iter=1000),
    "KNeighborsRegressor": KNeighborsRegressor(),
    "DecisionTreeRegressor": DecisionTreeRegressor(random_state=1),
    "GradientBoostingRegressor": GradientBoostingRegressor(random_state=1),

    # Modelos de regressão ordinal (apenas para competências)
    "Ordinal Regression LogisticAT": m.LogisticAT(alpha=1.0),
    "Ordinal Regression LogisticIT": m.LogisticIT(alpha=1.0),
    "Ordinal Regression OrdinalRidge": m.OrdinalRidge(),
    "Least Absolute Deviation (LAD)": m.LAD(random_state=1) # É como uma regressão linear, mas utiliza o erro absoluto em vez do erro quadrático
}

In [None]:
print(f"++++++++++++ ANÁLISE DA {'NOTA FINAL' if REFERENCE_CONCEPT == 5 else f'COMPETÊNCIA {REFERENCE_CONCEPT + 1}'} ++++++++++++")

for nome, modelo in modelos.items():
  if REFERENCE_CONCEPT == 5 and (nome.startswith("Ordinal Regression") or nome == "Least Absolute Deviation (LAD)"):
        continue  # Pula esses modelos se estiver avaliando a nota final porque mord não funciona sem o mapeamento

  modelo.fit(X_train_transf_reg, y_train)
  y_pred = modelo.predict(X_test_transf_reg)

  print(f"****  Modelo: {nome} ****")
  regression_report(y_test, y_pred)

  if REFERENCE_CONCEPT != 5:
        y_pred_round = np.round(y_pred).astype(int)
        print("Classification Report:")
        print(classification_report(y_test, y_pred_round, zero_division=0))
        print("Confusion Matrix:")
        print(confusion_matrix(y_test, y_pred_round))
        print()

In [None]:
if REFERENCE_CONCEPT != 5:
    print("Otimização de SVC para Classificação:")
    param_grid = {'kernel': ['linear', 'rbf', 'poly'], 'C': [0.1, 1, 10, 100]}
    grid_search = GridSearchCV(SVC(), param_grid, cv=5, n_jobs=-1, verbose=1)
    grid_search.fit(X_train_transf_clf, y_train)

    y_pred_clf = grid_search.predict(X_test_transf_clf)
    print("\nMelhores parâmetros:", grid_search.best_params_)
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred_clf))