In [1]:
# -*- coding: utf-8 -*-
"""
TP5 — Tarefa 1: Criação de Features com TF-IDF (IMDB 50K Reviews)
-----------------------------------------------------------------
Rodar em **um único bloco** no Jupyter. Este código:
1) Carrega o arquivo "IMDB Dataset.csv" (no mesmo diretório do notebook).
2) Faz limpeza textual mínima (HTML/URLs/ruído).
3) Vetoriza com TF-IDF (uni+bi-gramas) com bom controle de vocabulário.
4) Salva artefatos para as próximas tarefas: vectorizer, X (matriz esparsa), y (rótulos) e um relatório CSV.
5) Imprime um overview rápido (n docs, n features, esparsidade, etc.).

Boas práticas: nomes claros em PT-BR, funções pequenas, configuração centralizada (Clean Code/SOLID).
"""

# ===============================
# Imports
# ===============================
import os
import re
import json
from typing import Dict, Any

import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix, save_npz
from sklearn.feature_extraction.text import TfidfVectorizer
import joblib

# ===============================
# 1) Configuração
# ===============================
ARQUIVO_PADRAO = "IMDB Dataset.csv"     # espera-se no mesmo diretório do notebook
ALTERNATIVO = "/mnt/data/IMDB Dataset.csv"  # fallback (útil em alguns ambientes)
DIRETORIO_SAIDA = "."                   # salvos no diretório atual

cfg_tfidf: Dict[str, Any] = {
    "preprocessor": None,                 # será injetado depois
    "token_pattern": r"(?u)\b\w\w+\b",    # tokens com >=2 chars
    "stop_words": "english",              # dataset está em inglês
    "ngram_range": (1, 2),                # uni + bi-gramas
    "max_df": 0.95,                       # remove termos muito frequentes
    "min_df": 5,                          # remove termos muito raros
    "max_features": 50000,                # controla vocabulário
    "strip_accents": "unicode",
    "lowercase": True,
    "norm": "l2",
    "sublinear_tf": True                  # tf = 1 + log(tf)
}

# ===============================
# 2) Funções utilitárias
# ===============================
def caminho_csv() -> str:
    """Retorna o caminho existente para o CSV (prioriza o local)."""
    if os.path.exists(ARQUIVO_PADRAO):
        return ARQUIVO_PADRAO
    if os.path.exists(ALTERNATIVO):
        return ALTERNATIVO
    raise FileNotFoundError(
        f'Arquivo "{ARQUIVO_PADRAO}" não encontrado. '
        f'Coloque-o no mesmo diretório do notebook ou ajuste o caminho.'
    )

def preprocessador_limpo(texto: str) -> str:
    """
    Limpeza mínima e robusta:
    - Remove HTML
    - Remove URLs
    - Normaliza aspas/backticks
    - Remove pontuação extra/ruído e colapsa espaços
    """
    if not isinstance(texto, str):
        return ""
    texto = re.sub(r"<.*?>", " ", texto)                       # HTML
    texto = re.sub(r"http\S+|www\.\S+", " ", texto)            # URLs
    texto = texto.replace("’", "'").replace("`", "'")
    texto = texto.replace("“", '"').replace("”", '"')
    texto = re.sub(r"[^A-Za-z0-9\s]", " ", texto)              # mantém letras, números e espaços
    texto = re.sub(r"\s+", " ", texto).strip()
    return texto

def validar_colunas(df: pd.DataFrame) -> None:
    """Garante a presença de 'review' e 'sentiment' (case-insensitive)."""
    df.columns = [c.strip().lower() for c in df.columns]
    esperado = {"review", "sentiment"}
    faltantes = esperado.difference(df.columns)
    if faltantes:
        raise ValueError(f"Colunas esperadas ausentes: {faltantes}. "
                         f"Encontrado: {list(df.columns)}")

def mapear_rotulos(y_series: pd.Series) -> np.ndarray:
    """Mapeia positive/negative -> 1/0; descarta valores inválidos depois."""
    mapa = {"positive": 1, "negative": 0}
    y = y_series.str.lower().map(mapa)
    return y.astype("float")

def calcular_esparsidade(matriz: csr_matrix) -> float:
    """Esparsidade = 1 - (nnz / total_de_elementos)."""
    total = matriz.shape[0] * matriz.shape[1]
    return 1.0 - (matriz.nnz / total)

def montar_relatorio_vocab(vectorizer: TfidfVectorizer, n_docs: int, top_k: int = 25) -> pd.DataFrame:
    """
    Tabela com termos, idf e DF estimado.
    Fórmula sklearn: idf = log((n_samples + 1) / (df + 1)) + 1  =>  df ~= (n+1)/exp(idf-1) - 1
    """
    vocab_inv = {idx: termo for termo, idx in vectorizer.vocabulary_.items()}
    idfs = vectorizer.idf_
    dfs_estimados = ((n_docs + 1) / np.exp(idfs - 1.0)) - 1.0
    dfs_estimados = np.clip(dfs_estimados, 0, n_docs).astype(int)

    df_vocab = pd.DataFrame({
        "indice": np.arange(len(idfs), dtype=int),
        "termo": [vocab_inv[i] for i in range(len(idfs))],
        "idf": idfs,
        "df_estimado": dfs_estimados
    })

    mais_comuns = df_vocab.sort_values(["df_estimado", "termo"], ascending=[False, True]).head(top_k).copy()
    mais_raros  = df_vocab.sort_values(["idf", "termo"], ascending=[False, True]).head(top_k).copy()
    mais_comuns["grupo"] = "Mais comuns (maior DF)"
    mais_raros["grupo"]  = "Mais raros (maior IDF)"
    relatorio = pd.concat([mais_comuns, mais_raros], axis=0, ignore_index=True)
    return relatorio[["grupo", "termo", "df_estimado", "idf"]]

# ===============================
# 3) Pipeline da Tarefa 1 (TF-IDF)
# ===============================
# 3.1 Carregar CSV
caminho = caminho_csv()
df = pd.read_csv(caminho)
validar_colunas(df)

# 3.2 Limpeza de nulos/espaços
df["review"] = df["review"].fillna("").astype(str)
df = df[df["review"].str.strip().ne("")].copy()

# 3.3 Mapear rótulos e filtrar inválidos
y = mapear_rotulos(df["sentiment"])
mask_validos = ~y.isna()
df = df.loc[mask_validos].reset_index(drop=True)
y = y.loc[mask_validos].astype(int).to_numpy()

# 3.4 Configurar e ajustar o TF-IDF
cfg_tfidf_local = dict(cfg_tfidf)
cfg_tfidf_local["preprocessor"] = preprocessador_limpo
vetorizador = TfidfVectorizer(**cfg_tfidf_local)

X_tfidf: csr_matrix = vetorizador.fit_transform(df["review"].tolist())
n_docs, n_feats = X_tfidf.shape
esparsidade = calcular_esparsidade(X_tfidf)

# ===============================
# 4) Persistência para próximas tarefas
# ===============================
os.makedirs(DIRETORIO_SAIDA, exist_ok=True)
CAM_VETORIZADOR = os.path.join(DIRETORIO_SAIDA, "tfidf_vectorizer.pkl")
CAM_X = os.path.join(DIRETORIO_SAIDA, "X_tfidf.npz")
CAM_y = os.path.join(DIRETORIO_SAIDA, "y_labels.npy")
CAM_RELATORIO = os.path.join(DIRETORIO_SAIDA, "relatorio_vocab_tfidf.csv")

joblib.dump(vetorizador, CAM_VETORIZADOR)
save_npz(CAM_X, X_tfidf)
np.save(CAM_y, y)

relatorio_vocab = montar_relatorio_vocab(vetorizador, n_docs=n_docs, top_k=25)
relatorio_vocab.to_csv(CAM_RELATORIO, index=False)

# ===============================
# 5) Overview para conferência
# ===============================
overview = {
    "documentos": int(n_docs),
    "features_tfidf": int(n_feats),
    "esparsidade": float(esparsidade),
    "nao_nulos_na_matriz": int(X_tfidf.nnz),
    "exemplo_termos": sorted(list(vetorizador.vocabulary_.keys()))[:20],
    "salvo_vectorizer": CAM_VETORIZADOR,
    "salvo_X": CAM_X,
    "salvo_y": CAM_y,
    "salvo_relatorio_vocab_csv": CAM_RELATORIO
}

print(json.dumps(overview, indent=2, ensure_ascii=False))


{
  "documentos": 50000,
  "features_tfidf": 50000,
  "esparsidade": 0.9977487532,
  "nao_nulos_na_matriz": 5628117,
  "exemplo_termos": [
    "00",
    "000",
    "000 00",
    "000 000",
    "000 years",
    "007",
    "01",
    "02",
    "05",
    "06",
    "07",
    "08",
    "10",
    "10 000",
    "10 10",
    "10 15",
    "10 20",
    "10 Bethany",
    "10 Dir",
    "10 Fiend"
  ],
  "salvo_vectorizer": ".\\tfidf_vectorizer.pkl",
  "salvo_X": ".\\X_tfidf.npz",
  "salvo_y": ".\\y_labels.npy",
  "salvo_relatorio_vocab_csv": ".\\relatorio_vocab_tfidf.csv"
}
