# Pré-processamento e Transformação de Dados

## Objetivos

- Compreender a importância do pré-processamento
- Implementar transformações numéricas e categóricas
- Usar StandardScaler, MinMaxScaler, RobustScaler
- Aplicar OneHotEncoder e LabelEncoder
- Trabalhar com dados faltantes

## Pré-requisitos

- Manipulação de dados com pandas
- Conceitos básicos de ML
- Pipelines em sklearn


## 1. Por que Pré-processamento?

Diferentes algoritmos têm diferentes requisitos:

- **Árvores de decisão**: Não precisam de normalização
- **SVM, KNN**: Sensíveis à escala das features
- **Redes neurais**: Requerem normalização para convergência
- **Linear models**: Beneficiam de normalização

Problemas comuns:

- Features em escalas diferentes
- Variáveis categóricas não numéricas
- Dados faltantes
- Outliers extremos


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import (
    StandardScaler,
    MinMaxScaler,
    RobustScaler,
    OneHotEncoder,
    LabelEncoder,
    OrdinalEncoder,
)
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
import warnings

warnings.filterwarnings("ignore")

# Configurar visualização
plt.style.use("default")
sns.set_palette("husl")

## 2. Demonstrando o Problema da Escala


In [None]:
# Criar dataset com features em escalas diferentes
np.random.seed(42)
n_samples = 1000

# Features em escalas muito diferentes
age = np.random.normal(35, 10, n_samples)  # Idade: 15-55
income = np.random.normal(50000, 15000, n_samples)  # Renda: 20k-80k
score = np.random.normal(0.7, 0.1, n_samples)  # Score: 0.4-1.0

# Target baseado em combinação das features
y = (
    0.3 * (age - 35) / 10
    + 0.4 * (income - 50000) / 15000
    + 0.3 * (score - 0.7) / 0.1
    + np.random.normal(0, 0.1, n_samples)
) > 0

# Criar DataFrame
df = pd.DataFrame({"age": age, "income": income, "score": score, "target": y.astype(int)})

print("Estatísticas descritivas:")
print(df.describe())

# Visualizar diferenças de escala
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
for i, col in enumerate(["age", "income", "score"]):
    axes[i].hist(df[col], bins=30, alpha=0.7, edgecolor="black")
    axes[i].set_title(f"{col.title()}\nRange: {df[col].min():.0f} - {df[col].max():.0f}")
    axes[i].set_ylabel("Frequência")
plt.tight_layout()
plt.show()

In [None]:
# Demonstrar impacto nos algoritmos
X = df[["age", "income", "score"]]
y = df["target"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Testar modelos sem normalização
models = {
    "Random Forest": RandomForestClassifier(random_state=42),
    "SVM": SVC(random_state=42),
    "Logistic Regression": LogisticRegression(random_state=42),
}

print("=== Performance SEM normalização ===")
scores_without = {}
for name, model in models.items():
    scores = cross_val_score(model, X_train, y_train, cv=5)
    scores_without[name] = scores
    print(f"{name:20}: {scores.mean():.3f} ± {scores.std():.3f}")

# Testar modelos COM normalização
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("\n=== Performance COM normalização ===")
scores_with = {}
for name, model in models.items():
    scores = cross_val_score(model, X_train_scaled, y_train, cv=5)
    scores_with[name] = scores
    print(f"{name:20}: {scores.mean():.3f} ± {scores.std():.3f}")

# Visualizar comparação
improvement = {}
for name in models.keys():
    improvement[name] = scores_with[name].mean() - scores_without[name].mean()

plt.figure(figsize=(10, 6))
bars = plt.bar(improvement.keys(), improvement.values())
plt.axhline(y=0, color="black", linestyle="--", alpha=0.5)
plt.ylabel("Melhoria na Accuracy")
plt.title("Impacto da Normalização nos Modelos")
plt.xticks(rotation=45)

# Colorir barras
for i, bar in enumerate(bars):
    if improvement[list(improvement.keys())[i]] > 0:
        bar.set_color("green")
    else:
        bar.set_color("red")

plt.tight_layout()
plt.show()

## 3. Tipos de Scalers


In [None]:
# Criar dados com outliers para demonstrar diferentes scalers
np.random.seed(42)
n_samples = 200
normal_data = np.random.normal(10, 2, n_samples)

# Adicionar outliers
outliers = np.random.choice(n_samples, size=20, replace=False)
normal_data[outliers] += np.random.normal(0, 10, 20)

data = normal_data.reshape(-1, 1)

# Aplicar diferentes scalers
scalers = {
    "Original": None,
    "StandardScaler": StandardScaler(),
    "MinMaxScaler": MinMaxScaler(),
    "RobustScaler": RobustScaler(),
}

scaled_data = {}
for name, scaler in scalers.items():
    if scaler is None:
        scaled_data[name] = data.flatten()
    else:
        scaled_data[name] = scaler.fit_transform(data).flatten()

# Visualizar resultados
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
axes = axes.flatten()

for i, (name, data_scaled) in enumerate(scaled_data.items()):
    axes[i].hist(data_scaled, bins=30, alpha=0.7, edgecolor="black")
    axes[i].set_title(f"{name}\nMédia: {np.mean(data_scaled):.2f}, Std: {np.std(data_scaled):.2f}")
    axes[i].axvline(np.mean(data_scaled), color="red", linestyle="--", label="Média")
    axes[i].legend()

plt.tight_layout()
plt.show()

# Mostrar fórmulas
print("Fórmulas dos Scalers:")
print("StandardScaler: (x - μ) / σ")
print("MinMaxScaler: (x - min) / (max - min)")
print("RobustScaler: (x - median) / IQR")
print("\nQuando usar cada um:")
print("- StandardScaler: Distribuição normal, poucos outliers")
print("- MinMaxScaler: Range específico [0,1], distribuição uniforme")
print("- RobustScaler: Muitos outliers, distribuições assimétricas")

## 4. Tratamento de Variáveis Categóricas


In [None]:
# Criar dataset com variáveis categóricas
np.random.seed(42)
n_samples = 1000

# Variáveis categóricas
cities = np.random.choice(["São Paulo", "Rio de Janeiro", "Belo Horizonte", "Salvador"], n_samples)
education = np.random.choice(["Fundamental", "Médio", "Superior", "Pós-graduação"], n_samples)
size = np.random.choice(["Pequeno", "Médio", "Grande"], n_samples)

# Variável numérica
income = np.random.normal(5000, 1500, n_samples)

# Target sintético
city_effect = {"São Paulo": 0.3, "Rio de Janeiro": 0.2, "Belo Horizonte": 0.1, "Salvador": 0}
edu_effect = {"Fundamental": 0, "Médio": 0.1, "Superior": 0.3, "Pós-graduação": 0.5}
size_effect = {"Pequeno": 0, "Médio": 0.2, "Grande": 0.4}

y_prob = (
    np.array([city_effect[c] for c in cities])
    + np.array([edu_effect[e] for e in education])
    + np.array([size_effect[s] for s in size])
    + (income - 5000) / 10000
)

y = (y_prob + np.random.normal(0, 0.1, n_samples)) > 0.5

# Criar DataFrame
df_cat = pd.DataFrame(
    {"city": cities, "education": education, "company_size": size, "income": income, "target": y.astype(int)}
)

print("Dataset com variáveis categóricas:")
print(df_cat.head())
print("\nTipos de dados:")
print(df_cat.dtypes)
print("\nValores únicos:")
for col in ["city", "education", "company_size"]:
    print(f"{col}: {df_cat[col].unique()}")

### 4.1 One-Hot Encoding


In [None]:
# One-Hot Encoding para variáveis nominais
print("=== One-Hot Encoding ===")

# Método 1: pandas get_dummies
df_dummies = pd.get_dummies(df_cat, columns=["city", "education"], prefix=["city", "edu"])
print(f"Colunas após get_dummies: {df_dummies.shape[1]}")
print("Primeiras colunas:", df_dummies.columns[:10].tolist())

# Método 2: sklearn OneHotEncoder
encoder = OneHotEncoder(drop="first", sparse_output=False)  # drop='first' evita multicolinearidade
city_encoded = encoder.fit_transform(df_cat[["city", "education"]])
feature_names = encoder.get_feature_names_out(["city", "education"])

print(f"\nShape após OneHotEncoder: {city_encoded.shape}")
print(f"Nomes das features: {feature_names}")

# Demonstrar problema da "curse of dimensionality"
print(f"\nDimensionalidade:")
print(f"Original: {df_cat.shape[1]} features")
print(f"Após One-Hot: {len(feature_names) + 2} features")  # +2 para income e company_size

# Visualizar encoding de uma variável
sample_cities = df_cat["city"].head(10)
sample_encoded = encoder.transform(df_cat[["city", "education"]].head(10))

print("\nExemplo de encoding:")
for i in range(5):
    print(f"{sample_cities.iloc[i]} -> {sample_encoded[i][:4]}...")  # Mostrar só primeiras 4 colunas

### 4.2 Label Encoding e Ordinal Encoding


In [None]:
# Label Encoding (cuidado com variáveis nominais!)
print("=== Label Encoding ===")

le_city = LabelEncoder()
city_labels = le_city.fit_transform(df_cat["city"])

print("Mapeamento cidade -> número:")
for i, city in enumerate(le_city.classes_):
    print(f"{city} -> {i}")

print(f"\nExemplos: {df_cat['city'].head().tolist()} -> {city_labels[:5].tolist()}")

# Ordinal Encoding (para variáveis ordinais)
print("\n=== Ordinal Encoding ===")

# Education tem ordem natural
education_order = [["Fundamental", "Médio", "Superior", "Pós-graduação"]]
ordinal_encoder = OrdinalEncoder(categories=education_order)
education_ordinal = ordinal_encoder.fit_transform(df_cat[["education"]])

print("Mapeamento educação (ordinal):")
for i, edu in enumerate(education_order[0]):
    print(f"{edu} -> {i}")

# Company size também tem ordem
size_order = [["Pequeno", "Médio", "Grande"]]
size_encoder = OrdinalEncoder(categories=size_order)
size_ordinal = size_encoder.fit_transform(df_cat[["company_size"]])

print("\nMapeamento tamanho empresa:")
for i, size in enumerate(size_order[0]):
    print(f"{size} -> {i}")

## 5. Tratamento de Dados Faltantes


In [None]:
# Criar dataset com valores faltantes
np.random.seed(42)
n_samples = 500

# Dataset base
age = np.random.normal(35, 10, n_samples)
income = np.random.normal(50000, 15000, n_samples)
score = np.random.normal(0.7, 0.1, n_samples)
category = np.random.choice(["A", "B", "C"], n_samples)

# Introduzir valores faltantes de forma realística
# Missing at Random (MAR) - probabilidade depende de outras variáveis
missing_prob_age = 0.1 + 0.1 * (income < 30000)  # Mais missings em renda baixa
missing_prob_income = 0.05 + 0.15 * (age > 50)  # Mais missings em idade alta
missing_prob_score = 0.08  # Missing Completely at Random (MCAR)

age[np.random.random(n_samples) < missing_prob_age] = np.nan
income[np.random.random(n_samples) < missing_prob_income] = np.nan
score[np.random.random(n_samples) < missing_prob_score] = np.nan

# Missing em categoria (Not Missing at Random - NMAR)
category[np.random.choice(n_samples, size=30, replace=False)] = None

df_missing = pd.DataFrame({"age": age, "income": income, "score": score, "category": category})

print("Dataset com valores faltantes:")
print(df_missing.info())
print("\nPercentual de missings por coluna:")
missing_pct = (df_missing.isnull().sum() / len(df_missing)) * 100
print(missing_pct)

# Visualizar padrão de missings
import missingno as msno

try:
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    # Heatmap de correlação de missings
    missing_df = df_missing.isnull()
    correlation_missing = missing_df.corr()

    sns.heatmap(correlation_missing, annot=True, cmap="coolwarm", center=0, ax=axes[0])
    axes[0].set_title("Correlação entre Padrões de Missing")

    # Contagem de missings por linha
    missing_counts = missing_df.sum(axis=1)
    axes[1].hist(missing_counts, bins=5, edgecolor="black")
    axes[1].set_xlabel("Número de features faltantes por amostra")
    axes[1].set_ylabel("Frequência")
    axes[1].set_title("Distribuição de Missings por Linha")

    plt.tight_layout()
    plt.show()

except ImportError:
    print("Biblioteca missingno não instalada. Criando visualização alternativa...")

    # Visualização alternativa
    plt.figure(figsize=(10, 6))
    missing_matrix = df_missing.isnull()
    plt.imshow(missing_matrix.T, cmap="RdYlBu", aspect="auto")
    plt.yticks(range(len(df_missing.columns)), df_missing.columns)
    plt.xlabel("Amostras")
    plt.title("Padrão de Valores Faltantes\n(Azul = Presente, Vermelho = Faltante)")
    plt.colorbar()
    plt.show()

### 5.1 Estratégias de Imputação


In [None]:
# Diferentes estratégias de imputação
print("=== Estratégias de Imputação ===")

# 1. Simple Imputer - estratégias básicas
imputers = {
    "Mean": SimpleImputer(strategy="mean"),
    "Median": SimpleImputer(strategy="median"),
    "Most Frequent": SimpleImputer(strategy="most_frequent"),
    "Constant": SimpleImputer(strategy="constant", fill_value=0),
}

# Testar apenas em colunas numéricas
numeric_cols = ["age", "income", "score"]
X_missing = df_missing[numeric_cols]

imputed_results = {}
for name, imputer in imputers.items():
    if name == "Most Frequent" and any(df_missing[numeric_cols].dtypes == "object"):
        continue  # Pular se não houver categóricas

    X_imputed = imputer.fit_transform(X_missing)
    imputed_results[name] = X_imputed

    # Calcular estatísticas
    print(f"\n{name}:")
    for i, col in enumerate(numeric_cols):
        original_mean = df_missing[col].mean()
        imputed_mean = X_imputed[:, i].mean()
        print(f"  {col}: {original_mean:.2f} -> {imputed_mean:.2f}")

# 2. KNN Imputer - imputação baseada em vizinhos
print("\n=== KNN Imputer ===")
knn_imputer = KNNImputer(n_neighbors=5)
X_knn_imputed = knn_imputer.fit_transform(X_missing)

print("KNN (k=5):")
for i, col in enumerate(numeric_cols):
    original_mean = df_missing[col].mean()
    knn_mean = X_knn_imputed[:, i].mean()
    print(f"  {col}: {original_mean:.2f} -> {knn_mean:.2f}")

# Visualizar comparação das estratégias
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
axes = axes.flatten()

strategies = ["Mean", "Median", "Constant"]
strategies.append("KNN")

for i, strategy in enumerate(strategies[:4]):
    if strategy == "KNN":
        data_to_plot = X_knn_imputed[:, 0]  # Age column
    else:
        data_to_plot = imputed_results[strategy][:, 0]  # Age column

    axes[i].hist(data_to_plot, bins=30, alpha=0.7, edgecolor="black")
    axes[i].set_title(f"Age - {strategy} Imputation")
    axes[i].axvline(np.mean(data_to_plot), color="red", linestyle="--", label="Média")
    axes[i].legend()

plt.tight_layout()
plt.show()

## 6. ColumnTransformer - Pré-processamento Completo


In [None]:
# Criar dataset complexo que precisa de múltiplos tipos de pré-processamento
np.random.seed(42)
n_samples = 1000

# Features numéricas com escalas diferentes
age = np.random.normal(35, 10, n_samples)
salary = np.random.normal(50000, 20000, n_samples)
experience = np.random.normal(8, 5, n_samples)

# Features categóricas
department = np.random.choice(["IT", "Marketing", "Sales", "HR"], n_samples)
education = np.random.choice(["Bachelor", "Master", "PhD"], n_samples)
city = np.random.choice(["SP", "RJ", "BH"], n_samples)

# Introduzir alguns valores faltantes
age[np.random.choice(n_samples, 50, replace=False)] = np.nan
salary[np.random.choice(n_samples, 30, replace=False)] = np.nan

# Target
y = (
    0.3 * (age - 35) / 10
    + 0.4 * (salary - 50000) / 20000
    + 0.3 * (experience - 8) / 5
    + np.random.normal(0, 0.5, n_samples)
) > 0

df_complex = pd.DataFrame(
    {
        "age": age,
        "salary": salary,
        "experience": experience,
        "department": department,
        "education": education,
        "city": city,
        "target": y.astype(int),
    }
)

print("Dataset complexo:")
print(df_complex.info())
print("\nPrimeiras linhas:")
print(df_complex.head())

In [None]:
# Definir transformações específicas para cada tipo de coluna
print("=== ColumnTransformer Completo ===")

# Separar features por tipo
numeric_features = ["age", "salary", "experience"]
categorical_features = ["department", "education", "city"]

# Pipeline para features numéricas
numeric_transformer = Pipeline(steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())])

# Pipeline para features categóricas
categorical_transformer = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(drop="first", handle_unknown="ignore")),
    ]
)

# Combinar tudo com ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ],
    remainder="drop",  # Dropar colunas não especificadas
)

# Dividir dados
X = df_complex.drop("target", axis=1)
y = df_complex["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Aplicar transformações
X_train_transformed = preprocessor.fit_transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

print(f"Shape original: {X_train.shape}")
print(f"Shape após transformação: {X_train_transformed.shape}")

# Obter nomes das features transformadas
feature_names = numeric_features + list(
    preprocessor.named_transformers_["cat"]["onehot"].get_feature_names_out(categorical_features)
)

print(f"\nFeatures finais ({len(feature_names)}):")
for i, name in enumerate(feature_names):
    print(f"{i+1:2d}. {name}")

## 7. Pipeline Completo de ML


In [None]:
# Criar pipeline completo: pré-processamento + modelo
print("=== Pipeline Completo de ML ===")

# Pipeline com Random Forest
rf_pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", RandomForestClassifier(random_state=42))])

# Pipeline com SVM
svm_pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", SVC(random_state=42))])

# Avaliar pipelines
pipelines = {"Random Forest": rf_pipeline, "SVM": svm_pipeline}

for name, pipeline in pipelines.items():
    scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring="accuracy")
    print(f"{name:15}: {scores.mean():.3f} ± {scores.std():.3f}")

# Treinar melhor modelo e avaliar
best_pipeline = rf_pipeline
best_pipeline.fit(X_train, y_train)

# Avaliar no conjunto de teste
train_score = best_pipeline.score(X_train, y_train)
test_score = best_pipeline.score(X_test, y_test)

print(f"\nMelhor modelo (Random Forest):")
print(f"Score treino: {train_score:.3f}")
print(f"Score teste:  {test_score:.3f}")
print(f"Diferença:    {train_score - test_score:.3f}")

# Mostrar importância das features
feature_importance = best_pipeline.named_steps["classifier"].feature_importances_
importance_df = pd.DataFrame({"feature": feature_names, "importance": feature_importance}).sort_values(
    "importance", ascending=False
)

print("\nTop 10 features mais importantes:")
print(importance_df.head(10))

# Visualizar importância
plt.figure(figsize=(10, 6))
top_features = importance_df.head(10)
plt.barh(range(len(top_features)), top_features["importance"])
plt.yticks(range(len(top_features)), top_features["feature"])
plt.xlabel("Importância")
plt.title("Top 10 Features Mais Importantes")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

## 8. Validação do Pipeline


In [None]:
# Testar pipeline com novos dados
print("=== Validação do Pipeline ===")

# Criar novos dados de exemplo
new_data = pd.DataFrame(
    {
        "age": [28, 45, np.nan, 35],
        "salary": [45000, np.nan, 70000, 55000],
        "experience": [3, 15, 8, 7],
        "department": ["IT", "Marketing", "Sales", "HR"],
        "education": ["Bachelor", "Master", "PhD", "Bachelor"],
        "city": ["SP", "RJ", "BH", "SP"],
    }
)

print("Novos dados (com missings):")
print(new_data)

# Fazer predições
predictions = best_pipeline.predict(new_data)
probabilities = best_pipeline.predict_proba(new_data)

print("\nPredições:")
for i in range(len(new_data)):
    print(f"Amostra {i+1}: Classe {predictions[i]} (prob: {probabilities[i][1]:.3f})")

# Verificar se pipeline funciona com dados "sujos"
dirty_data = pd.DataFrame(
    {
        "age": [25, 60, -5, 200],  # Idades irreais
        "salary": [30000, 150000, -1000, 1000000],  # Salários extremos
        "experience": [2, 40, -2, 100],  # Experiência irreal
        "department": ["IT", "Finance", "Unknown", "IT"],  # Departamento novo
        "education": ["Bachelor", "Master", "High School", "Bachelor"],  # Educação nova
        "city": ["SP", "RJ", "NYC", "SP"],  # Cidade nova
    }
)

print("\n=== Teste com Dados 'Sujos' ===")
print(dirty_data)

try:
    dirty_predictions = best_pipeline.predict(dirty_data)
    print("\nPredições para dados sujos:")
    print(dirty_predictions)
    print("✓ Pipeline lidou bem com dados problemáticos!")
except Exception as e:
    print(f"✗ Erro ao processar dados sujos: {e}")

## 9. Resumo e Boas Práticas

### Checklist de Pré-processamento:

1. **Análise Exploratória**

   - Tipos de dados
   - Valores faltantes
   - Distribuições
   - Outliers

2. **Features Numéricas**

   - StandardScaler: distribuição normal
   - MinMaxScaler: range específico
   - RobustScaler: muitos outliers

3. **Features Categóricas**

   - OneHotEncoder: variáveis nominais
   - OrdinalEncoder: variáveis ordinais
   - Cuidado com alta cardinalidade

4. **Dados Faltantes**

   - Entender o mecanismo (MCAR, MAR, NMAR)
   - SimpleImputer: estratégias básicas
   - KNNImputer: valores baseados em similaridade

5. **Pipeline**
   - Sempre usar pipelines para evitar data leakage
   - ColumnTransformer para diferentes tipos de features
   - Testar com dados novos/sujos

### Cuidados Importantes:

- **Nunca** fazer fit do scaler em todo o dataset
- **Sempre** aplicar mesmas transformações em treino/validação/teste
- Tratar dados faltantes **antes** de dividir treino/teste
- Validar pipeline com dados "reais" e problemáticos
