# An√°lise de Machine Learning com Spark - Previs√£o de Situa√ß√£o Cadastral (Dataset Silver)

Este notebook apresenta uma an√°lise explorat√≥ria completa (EDA) e o processo de treinamento do modelo de ML usando **Apache Spark** para processamento distribu√≠do de dados e prever a situa√ß√£o cadastral de empresas baseado em dados temporais enriquecidos com vari√°veis macroecon√¥micas e dados de empresas.

## Objetivo
Prever a `situacao_cadastral` de empresas usando dados hist√≥ricos temporais (cnpj + ano_mes) incluindo:
- Dados cadastrais (CNAE, UF, natureza jur√≠dica, capital social, porte)
- Vari√°veis macroecon√¥micas (SELIC, IPCA, c√¢mbio, desemprego)
- Dados de PGFN (d√≠vidas fiscais)
- Hist√≥rico temporal da situa√ß√£o cadastral

## Diferen√ßas da vers√£o Spark
- Usa **PySpark** para processamento distribu√≠do de dados
- Processamento escal√°vel para grandes volumes
- Modelo final salvo em formato **pickle** para f√°cil deploy


In [None]:
# Importa√ß√µes
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
import pickle
import os
warnings.filterwarnings('ignore')

# Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Modelos e m√©tricas (usaremos sklearn para o modelo final)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import (
    classification_report, 
    confusion_matrix, 
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
    roc_curve
)
from sklearn.ensemble import RandomForestClassifier
try:
    import xgboost as xgb
    XGBOOST_AVAILABLE = True
except ImportError:
    XGBOOST_AVAILABLE = False
    print("XGBoost n√£o dispon√≠vel. Usando RandomForest.")

# Inicializa Spark Session
spark = SparkSession.builder \
    .appName("SituacaoCadastralPrediction") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configura√ß√£o de visualiza√ß√£o
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10

print("‚úÖ Bibliotecas importadas com sucesso!")
print(f"‚úÖ Spark Session criada: {spark.version}")


## 1. Carregamento e Explora√ß√£o Inicial dos Dados


In [None]:
# Carrega os dados usando Spark
df_spark = spark.read.csv(
    'dataset_metrics_silver.csv',
    header=True,
    inferSchema=True
)

# Remove coluna de √≠ndice se existir
if 'Unnamed: 0' in df_spark.columns:
    df_spark = df_spark.drop('Unnamed: 0')

# Converte ano_mes para date
df_spark = df_spark.withColumn(
    'ano_mes',
    F.to_date(F.col('ano_mes'), 'yyyy-MM')
)

# Ordena por cnpj e data
df_spark = df_spark.orderBy(['cnpj', 'ano_mes'])

# Cache para melhor performance
df_spark.cache()

# Estat√≠sticas b√°sicas
total_registros = df_spark.count()
cnpjs_unicos = df_spark.select('cnpj').distinct().count()
periodo_min = df_spark.agg(F.min('ano_mes').alias('min_date')).collect()[0]['min_date']
periodo_max = df_spark.agg(F.max('ano_mes').alias('max_date')).collect()[0]['max_date']

print(f"üìä Dados carregados: {total_registros:,} registros")
print(f"üè¢ CNPJs √∫nicos: {cnpjs_unicos:,}")
print(f"üìÖ Per√≠odo: {periodo_min.strftime('%Y-%m')} a {periodo_max.strftime('%Y-%m')}")
print(f"\nColunas dispon√≠veis ({len(df_spark.columns)}):")
print(df_spark.columns)

# Mostra primeiras linhas
print(f"\nPrimeiras linhas:")
df_spark.show(5, truncate=False)


In [None]:
# Informa√ß√µes gerais do dataset
print("="*60)
print("INFORMA√á√ïES GERAIS DO DATASET")
print("="*60)

print(f"\nShape: ({df_spark.count()}, {len(df_spark.columns)})")
print(f"\nTipos de dados:")
df_spark.printSchema()

# Valores faltantes
print(f"\nValores faltantes:")
missing_stats = []
for col in df_spark.columns:
    missing_count = df_spark.filter(F.col(col).isNull()).count()
    if missing_count > 0:
        missing_pct = (missing_count / total_registros) * 100
        missing_stats.append({
            'Coluna': col,
            'Faltantes': missing_count,
            'Percentual': missing_pct
        })

if missing_stats:
    missing_df = pd.DataFrame(missing_stats).sort_values('Faltantes', ascending=False)
    print(missing_df.to_string(index=False))
else:
    print("   Nenhum valor faltante encontrado!")

# Estat√≠sticas descritivas (converte para pandas para visualiza√ß√£o)
print(f"\nEstat√≠sticas descritivas:")
df_spark.describe().show()


## 2. An√°lise da Vari√°vel Target (Situa√ß√£o Cadastral)


In [None]:
# An√°lise da distribui√ß√£o da situa√ß√£o cadastral
print("="*60)
print("AN√ÅLISE DA VARI√ÅVEL TARGET")
print("="*60)

# Mapeamento das situa√ß√µes cadastrais
situacao_map = {
    1: 'NULA',
    2: 'ATIVA',
    3: 'SUSPENSA',
    4: 'INAPTA',
    8: 'BAIXADA'
}

# Distribui√ß√£o usando Spark
distribuicao_spark = df_spark.groupBy('situacao_cadastral').count().orderBy('situacao_cadastral')
distribuicao_pd = distribuicao_spark.toPandas()
distribuicao_pd['percentual'] = (distribuicao_pd['count'] / distribuicao_pd['count'].sum()) * 100

print("\nüìä Distribui√ß√£o da Situa√ß√£o Cadastral:")
print("-" * 60)
for _, row in distribuicao_pd.iterrows():
    situacao = int(row['situacao_cadastral'])
    count = int(row['count'])
    pct = row['percentual']
    nome = situacao_map.get(situacao, f'Desconhecida ({situacao})')
    print(f"  {situacao} - {nome:12s}: {count:6,} registros ({pct:5.2f}%)")

# Visualiza√ß√£o (converte para pandas)
distribuicao_pd = distribuicao_pd.set_index('situacao_cadastral')

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Gr√°fico de barras
distribuicao_pd['count'].plot(kind='bar', ax=axes[0], color='steelblue', edgecolor='black')
axes[0].set_title('Distribui√ß√£o da Situa√ß√£o Cadastral (Contagem)', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Situa√ß√£o Cadastral', fontsize=12)
axes[0].set_ylabel('Quantidade de Registros', fontsize=12)
axes[0].set_xticklabels([f"{k}\n{situacao_map.get(k, '?')}" for k in distribuicao_pd.index], rotation=0)
axes[0].grid(axis='y', alpha=0.3)

# Gr√°fico de pizza
distribuicao_pd['percentual'].plot(kind='pie', ax=axes[1], autopct='%1.1f%%', startangle=90, 
                      colors=['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#ff99cc'])
axes[1].set_title('Distribui√ß√£o da Situa√ß√£o Cadastral (%)', fontsize=14, fontweight='bold')
axes[1].set_ylabel('')

plt.tight_layout()
plt.show()

print(f"\n‚ö†Ô∏è  Observa√ß√£o: Classes desbalanceadas detectadas!")
minority_classes = distribuicao_pd[distribuicao_pd['count'] < 100]
if len(minority_classes) > 0:
    for situacao, row in minority_classes.iterrows():
        nome = situacao_map.get(situacao, f'Desconhecida ({situacao})')
        print(f"   Classe {situacao} ({nome}) tem apenas {int(row['count'])} registros ({row['percentual']:.2f}%)")


In [None]:
# An√°lise temporal da situa√ß√£o cadastral
print("="*60)
print("AN√ÅLISE TEMPORAL DA SITUA√á√ÉO CADASTRAL")
print("="*60)

# Agrupa por ano_mes e situa√ß√£o cadastral usando Spark
temp_analysis_spark = df_spark.groupBy(
    F.year('ano_mes').alias('ano'),
    F.month('ano_mes').alias('mes'),
    'situacao_cadastral'
).count().orderBy('ano', 'mes', 'situacao_cadastral')

# Converte para pandas para visualiza√ß√£o
temp_analysis_pd = temp_analysis_spark.toPandas()
temp_analysis_pd['ano_mes'] = pd.to_datetime(
    temp_analysis_pd['ano'].astype(str) + '-' + temp_analysis_pd['mes'].astype(str).str.zfill(2)
)
temp_analysis_pd = temp_analysis_pd.pivot_table(
    index='ano_mes', 
    columns='situacao_cadastral', 
    values='count', 
    fill_value=0
)

# Visualiza√ß√£o temporal
fig, axes = plt.subplots(2, 1, figsize=(14, 10))

# Gr√°fico de linha temporal
for col in temp_analysis_pd.columns:
    axes[0].plot(temp_analysis_pd.index, temp_analysis_pd[col], 
                marker='o', label=f"{col} - {situacao_map.get(col, '?')}", linewidth=2, markersize=4)
axes[0].set_title('Evolu√ß√£o Temporal da Situa√ß√£o Cadastral', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Per√≠odo (Ano-M√™s)', fontsize=12)
axes[0].set_ylabel('Quantidade de Registros', fontsize=12)
axes[0].legend(loc='best', fontsize=10)
axes[0].grid(alpha=0.3)
axes[0].tick_params(axis='x', rotation=45)

# Gr√°fico de √°rea empilhada (propor√ß√µes)
temp_pct = temp_analysis_pd.div(temp_analysis_pd.sum(axis=1), axis=0) * 100
axes[1].stackplot(temp_pct.index, *[temp_pct[col] for col in temp_pct.columns],
                  labels=[f"{col} - {situacao_map.get(col, '?')}" for col in temp_pct.columns],
                  alpha=0.7)
axes[1].set_title('Propor√ß√£o Temporal da Situa√ß√£o Cadastral (%)', fontsize=14, fontweight='bold')
axes[1].set_xlabel('Per√≠odo (Ano-M√™s)', fontsize=12)
axes[1].set_ylabel('Propor√ß√£o (%)', fontsize=12)
axes[1].legend(loc='upper left', fontsize=9)
axes[1].grid(alpha=0.3)
axes[1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()


## 3. Cria√ß√£o de Features para o Modelo


In [None]:
# Fun√ß√£o para criar features temporais usando Spark
def create_temporal_features_spark(df):
    """Cria features temporais a partir de ano_mes usando Spark"""
    df = df.withColumn('ano', F.year('ano_mes'))
    df = df.withColumn('mes', F.month('ano_mes'))
    df = df.withColumn('trimestre', F.quarter('ano_mes'))
    df = df.withColumn('semestre', F.when(F.col('mes') <= 6, 1).otherwise(2))
    df = df.withColumn('mes_sin', F.sin(2 * F.pi() * F.col('mes') / 12))
    df = df.withColumn('mes_cos', F.cos(2 * F.pi() * F.col('mes') / 12))
    
    # N√∫mero de meses desde o in√≠cio
    min_date = df.agg(F.min('ano_mes').alias('min_date')).collect()[0]['min_date']
    df = df.withColumn(
        'meses_desde_inicio',
        (F.year('ano_mes') - F.year(F.lit(min_date))) * 12 + 
        (F.month('ano_mes') - F.month(F.lit(min_date)))
    )
    return df

# Fun√ß√£o para criar features de lag usando Spark Window Functions
def create_lag_features_spark(df, lag_periods=[1, 2, 3, 6, 12]):
    """Cria features de lag para cada empresa usando Spark"""
    window_spec = Window.partitionBy('cnpj').orderBy('ano_mes')
    
    for lag in lag_periods:
        df = df.withColumn(
            f'situacao_cadastral_lag_{lag}',
            F.lag('situacao_cadastral', lag).over(window_spec)
        )
    
    return df

# Fun√ß√£o para criar features de rolling usando Spark Window Functions
def create_rolling_features_spark(df, windows=[3, 6, 12]):
    """Cria features de rolling statistics por empresa usando Spark"""
    window_spec = Window.partitionBy('cnpj').orderBy('ano_mes')
    
    for window in windows:
        rolling_window = window_spec.rowsBetween(-window + 1, 0)
        
        df = df.withColumn(
            f'situacao_cadastral_rolling_mean_{window}',
            F.avg('situacao_cadastral').over(rolling_window)
        )
        df = df.withColumn(
            f'situacao_cadastral_rolling_std_{window}',
            F.stddev('situacao_cadastral').over(rolling_window)
        )
    
    # Preenche NaN com 0
    for window in windows:
        df = df.fillna(0, subset=[f'situacao_cadastral_rolling_mean_{window}',
                                  f'situacao_cadastral_rolling_std_{window}'])
    
    return df

# Fun√ß√£o para criar features agregadas usando Spark
def create_aggregated_features_spark(df):
    """Cria features agregadas por empresa usando Spark"""
    empresa_stats = df.groupBy('cnpj').agg(
        F.avg('situacao_cadastral').alias('situacao_cadastral_mean_empresa'),
        F.stddev('situacao_cadastral').alias('situacao_cadastral_std_empresa'),
        F.min('situacao_cadastral').alias('situacao_cadastral_min_empresa'),
        F.max('situacao_cadastral').alias('situacao_cadastral_max_empresa'),
        F.count('situacao_cadastral').alias('situacao_cadastral_count_empresa'),
        F.first('tempo_atividade_anos').alias('tempo_atividade_anos_first'),
        F.first('capital_social').alias('capital_social_first')
    )
    
    df = df.join(empresa_stats, on='cnpj', how='left')
    
    # Posi√ß√£o temporal
    window_spec = Window.partitionBy('cnpj').orderBy('ano_mes')
    df = df.withColumn('posicao_temporal', F.row_number().over(window_spec) - 1)
    
    # Total de registros por empresa
    total_registros_empresa = df.groupBy('cnpj').agg(
        F.count('*').alias('total_registros_empresa')
    )
    df = df.join(total_registros_empresa, on='cnpj', how='left')
    
    # Posi√ß√£o relativa
    df = df.withColumn(
        'posicao_relativa',
        F.col('posicao_temporal') / F.col('total_registros_empresa')
    )
    
    return df

print("‚úÖ Fun√ß√µes de cria√ß√£o de features definidas!")


In [None]:
# Cria todas as features usando Spark
print("Criando features temporais...")
df_features = create_temporal_features_spark(df_spark)

print("Criando features de lag...")
df_features = create_lag_features_spark(df_features)

print("Criando features de rolling...")
df_features = create_rolling_features_spark(df_features)

print("Criando features agregadas...")
df_features = create_aggregated_features_spark(df_features)

# Remove cache antigo e recache
df_spark.unpersist()
df_features.cache()

print(f"\n‚úÖ Features criadas!")
print(f"   Colunas: {len(df_features.columns)}")
print(f"   Registros: {df_features.count():,}")

# Mostra algumas das novas features
new_features = [col for col in df_features.columns if col not in df_spark.columns]
print(f"\nüìã Novas features criadas ({len(new_features)}):")
for i, feat in enumerate(new_features[:25], 1):
    print(f"   {i:2d}. {feat}")
if len(new_features) > 25:
    print(f"   ... e mais {len(new_features) - 25} features")


## 4. Prepara√ß√£o dos Dados para Treinamento


In [None]:
# Seleciona features para o modelo
feature_cols = [
    # Temporais
    'ano', 'mes', 'trimestre', 'semestre', 
    'mes_sin', 'mes_cos', 'meses_desde_inicio',
    
    # Categ√≥ricas (ser√£o codificadas)
    'cnae_fiscal_principal', 'uf', 'natureza_juridica', 'porte_empresa',
    
    # Num√©ricas base
    'tempo_atividade_anos',
    'situacao_cadastral_t_minus_1',
    
    # Vari√°veis macroecon√¥micas
    'selic_meta_mensal_t_minus_1',
    'ipca_acumulado_12m_t_minus_1',
    'ipca_mensal_t_minus_1',
    'cambio_dolar_media_mensal_t_minus_1',
    'taxa_desemprego_t_minus_1',
    
    # Dados de empresa
    'capital_social',
    
    # Dados PGFN
    'pgfn_fgts_valor_acumulado_t_minus_1',
    'pgfn_naoprev_valor_acumulado_t_minus_1',
    'pgfn_prev_valor_acumulado_t_minus_1',
    'pgfn_fgts_ajuizados_t_minus_1',
    
    # Lag features
    'situacao_cadastral_lag_1', 'situacao_cadastral_lag_2', 
    'situacao_cadastral_lag_3', 'situacao_cadastral_lag_6',
    
    # Rolling features
    'situacao_cadastral_rolling_mean_3',
    'situacao_cadastral_rolling_mean_6',
    'situacao_cadastral_rolling_mean_12',
    'situacao_cadastral_rolling_std_3',
    'situacao_cadastral_rolling_std_6',
    
    # Agregadas
    'posicao_relativa',
    'situacao_cadastral_mean_empresa',
    'situacao_cadastral_std_empresa',
]

# Filtra apenas colunas que existem
available_cols = df_features.columns
feature_cols = [col for col in feature_cols if col in available_cols]

# Remove linhas com NaN nas features selecionadas
df_clean = df_features.select(feature_cols + ['situacao_cadastral', 'cnpj', 'ano_mes'])

# Remove NaN
for col in feature_cols:
    df_clean = df_clean.filter(F.col(col).isNotNull())

# Recache
df_clean.cache()

total_clean = df_clean.count()
print(f"üìä Dados ap√≥s limpeza:")
print(f"   Registros: {total_clean:,}")
print(f"   Features selecionadas: {len(feature_cols)}")
print(f"\nüìã Features selecionadas:")
for i, feat in enumerate(feature_cols, 1):
    print(f"   {i:2d}. {feat}")


In [None]:
# Separa√ß√£o treino/valida√ß√£o/teste (temporal) usando Spark
print("="*60)
print("SEPARA√á√ÉO TREINO/VALIDA√á√ÉO/TESTE (TEMPORAL)")
print("="*60)

# Adiciona um √≠ndice de linha ordenado para fazer a divis√£o temporal
# Usa Window Function para criar um √≠ndice sequencial mantendo ordem temporal
window_spec = Window.orderBy('ano_mes', 'cnpj')
df_clean = df_clean.withColumn('row_index', F.row_number().over(window_spec))

# Calcula o total de registros
total_count = df_clean.count()
split_train_idx = int(total_count * 0.7)
split_val_idx = int(total_count * 0.9)

print(f"üìä Total de registros: {total_count:,}")
print(f"   √çndice de corte treino: {split_train_idx:,} (70%)")
print(f"   √çndice de corte valida√ß√£o: {split_val_idx:,} (90%)")

# Divide os dados usando Spark
df_train = df_clean.filter(F.col('row_index') <= split_train_idx)
df_val = df_clean.filter((F.col('row_index') > split_train_idx) & (F.col('row_index') <= split_val_idx))
df_test = df_clean.filter(F.col('row_index') > split_val_idx)

# Remove a coluna row_index
df_train = df_train.drop('row_index')
df_val = df_val.drop('row_index')
df_test = df_test.drop('row_index')

# Cache os DataFrames divididos
df_train.cache()
df_val.cache()
df_test.cache()

# Conta os registros
train_count = df_train.count()
val_count = df_val.count()
test_count = df_test.count()

print(f"\nüìä Divis√£o dos dados:")
print(f"   Treino:     {train_count:,} registros ({train_count/total_count*100:.1f}%)")
print(f"   Valida√ß√£o:  {val_count:,} registros ({val_count/total_count*100:.1f}%)")
print(f"   Teste:      {test_count:,} registros ({test_count/total_count*100:.1f}%)")

# Distribui√ß√£o do target usando Spark
print(f"\nüìä Distribui√ß√£o do target:")
print(f"\n   TREINO:")
train_dist = df_train.groupBy('situacao_cadastral').count().orderBy('situacao_cadastral').show()

print(f"\n   VALIDA√á√ÉO:")
val_dist = df_val.groupBy('situacao_cadastral').count().orderBy('situacao_cadastral').show()

print(f"\n   TESTE:")
test_dist = df_test.groupBy('situacao_cadastral').count().orderBy('situacao_cadastral').show()


In [None]:
# Codifica features categ√≥ricas usando Spark StringIndexer
print("="*60)
print("CODIFICA√á√ÉO DE FEATURES CATEG√ìRICAS")
print("="*60)

categorical_cols = ['cnae_fiscal_principal', 'uf', 'natureza_juridica', 'porte_empresa']
categorical_cols = [col for col in categorical_cols if col in feature_cols]

# Prepara dados para obter todos os valores √∫nicos (usando union)
all_data_for_encoding = df_train.select(categorical_cols).union(
    df_val.select(categorical_cols)
).union(
    df_test.select(categorical_cols)
).distinct()

# Cria StringIndexers para cada coluna categ√≥rica
indexers = {}
indexer_models = {}
indexed_cols = []

for col in categorical_cols:
    # Pega todos os valores √∫nicos usando Spark
    unique_values = all_data_for_encoding.select(col).distinct().orderBy(col).collect()
    unique_list = [str(row[col]) for row in unique_values]
    
    # Cria StringIndexer
    indexer = StringIndexer(
        inputCol=col,
        outputCol=col + '_encoded',
        handleInvalid='keep'
    )
    
    indexers[col] = indexer
    indexed_cols.append(col + '_encoded')
    
    print(f"‚úÖ {col} preparado para codifica√ß√£o: {len(unique_list)} valores √∫nicos")

# Aplica os indexers
print("\nüîÑ Aplicando codifica√ß√£o...")

# Treina os indexers usando todos os dados e salva os modelos
for col in categorical_cols:
    indexer = indexers[col]
    # Ajusta usando todos os dados para garantir consist√™ncia
    indexer_model = indexer.fit(all_data_for_encoding.select(col))
    indexer_models[col] = indexer_model  # Salva o modelo treinado
    
    # Aplica nos tr√™s conjuntos
    df_train = indexer_model.transform(df_train)
    df_val = indexer_model.transform(df_val)
    df_test = indexer_model.transform(df_test)
    
    # Remove coluna original
    df_train = df_train.drop(col)
    df_val = df_val.drop(col)
    df_test = df_test.drop(col)

# Atualiza feature_cols
for col in categorical_cols:
    feature_cols = [c for c in feature_cols if c != col] + [col + '_encoded']

# Preenche NaN com 0
df_train = df_train.fillna(0, subset=feature_cols)
df_val = df_val.fillna(0, subset=feature_cols)
df_test = df_test.fillna(0, subset=feature_cols)

# Seleciona apenas as features necess√°rias
df_train_features = df_train.select(feature_cols + ['situacao_cadastral'])
df_val_features = df_val.select(feature_cols + ['situacao_cadastral'])
df_test_features = df_test.select(feature_cols + ['situacao_cadastral'])

# Recache
df_train_features.cache()
df_val_features.cache()
df_test_features.cache()

print(f"\n‚úÖ Features finais: {len(feature_cols)}")
print(f"   Treino:     {df_train_features.count():,} registros")
print(f"   Valida√ß√£o:  {df_val_features.count():,} registros")
print(f"   Teste:      {df_test_features.count():,} registros")


## 5. Treinamento do Modelo


In [None]:
# Treina modelo Random Forest
print("="*60)
print("TREINAMENTO DO MODELO")
print("="*60)

# Converte para pandas apenas para o treinamento (sklearn requer pandas/numpy)
# Para datasets muito grandes, considere usar Spark MLlib ao inv√©s de sklearn
print("üîÑ Convertendo dados de treino para pandas...")
train_pd = df_train_features.toPandas()
X_train = train_pd[feature_cols].copy()
y_train = train_pd['situacao_cadastral'].copy()

print("üîÑ Convertendo dados de valida√ß√£o para pandas...")
val_pd = df_val_features.toPandas()
X_val = val_pd[feature_cols].copy()
y_val = val_pd['situacao_cadastral'].copy()

print("üîÑ Convertendo dados de teste para pandas...")
test_pd = df_test_features.toPandas()
X_test = test_pd[feature_cols].copy()
y_test = test_pd['situacao_cadastral'].copy()

# Libera mem√≥ria do Spark
df_train_features.unpersist()
df_val_features.unpersist()
df_test_features.unpersist()

# Calcula pesos das classes para balanceamento
class_counts = y_train.value_counts()
total = len(y_train)
class_weights = {cls: total / (len(class_counts) * count) for cls, count in class_counts.items()}

print(f"\n‚öñÔ∏è  Pesos das classes para balanceamento:")
for cls, weight in sorted(class_counts.items()):
    nome = situacao_map.get(cls, f'Desconhecida ({cls})')
    print(f"   Classe {cls} ({nome:12s}): {class_counts[cls]:6,} registros, peso: {class_weights[cls]:.4f}")

model = RandomForestClassifier(
    n_estimators=200,
    max_depth=15,
    min_samples_split=10,
    min_samples_leaf=5,
    class_weight=class_weights,
    random_state=42,
    n_jobs=-1,
    verbose=1
)

print("\nüîÑ Treinando modelo...")
model.fit(X_train, y_train)
print("‚úÖ Modelo treinado!")

# Faz previs√µes em valida√ß√£o e teste
print("\nüìä Fazendo previs√µes...")
y_pred_val = model.predict(X_val)
y_pred_proba_val = model.predict_proba(X_val)

y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)

print(f"   Previs√µes em valida√ß√£o: {len(y_pred_val)} registros")
print(f"   Previs√µes em teste:     {len(y_pred)} registros")


## 6. Avalia√ß√£o do Modelo


In [None]:
# Calcula m√©tricas
print("="*60)
print("M√âTRICAS DE AVALIA√á√ÉO")
print("="*60)

# M√©tricas no conjunto de VALIDA√á√ÉO
print(f"\nüìä M√âTRICAS NO CONJUNTO DE VALIDA√á√ÉO:")
print("-" * 60)
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_macro_val = f1_score(y_val, y_pred_val, average='macro')
f1_weighted_val = f1_score(y_val, y_pred_val, average='weighted')
precision_macro_val = precision_score(y_val, y_pred_val, average='macro')
recall_macro_val = recall_score(y_val, y_pred_val, average='macro')

print(f"   Acur√°cia:        {accuracy_val:.4f} ({accuracy_val*100:.2f}%)")
print(f"   F1-Score (macro): {f1_macro_val:.4f}")
print(f"   F1-Score (weighted): {f1_weighted_val:.4f}")
print(f"   Precis√£o (macro): {precision_macro_val:.4f}")
print(f"   Recall (macro):   {recall_macro_val:.4f}")

# M√©tricas no conjunto de TESTE
print(f"\nüìä M√âTRICAS NO CONJUNTO DE TESTE:")
print("-" * 60)
accuracy = accuracy_score(y_test, y_pred)
f1_macro = f1_score(y_test, y_pred, average='macro')
f1_weighted = f1_score(y_test, y_pred, average='weighted')
precision_macro = precision_score(y_test, y_pred, average='macro')
recall_macro = recall_score(y_test, y_pred, average='macro')

print(f"   Acur√°cia:        {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f"   F1-Score (macro): {f1_macro:.4f}")
print(f"   F1-Score (weighted): {f1_weighted:.4f}")
print(f"   Precis√£o (macro): {precision_macro:.4f}")
print(f"   Recall (macro):   {recall_macro:.4f}")

# Classification Report - VALIDA√á√ÉO
print(f"\nüìã Classification Report - VALIDA√á√ÉO:")
print("-" * 60)
print(classification_report(y_val, y_pred_val, 
                            target_names=[f"{k}-{situacao_map.get(k, '?')}" for k in sorted(y_val.unique())]))

# Classification Report - TESTE
print(f"\nüìã Classification Report - TESTE:")
print("-" * 60)
print(classification_report(y_test, y_pred, 
                            target_names=[f"{k}-{situacao_map.get(k, '?')}" for k in sorted(y_test.unique())]))

# Confusion Matrix - VALIDA√á√ÉO
cm_val = confusion_matrix(y_val, y_pred_val)
print(f"\nüìä Matriz de Confus√£o - VALIDA√á√ÉO:")
print("-" * 60)
print(cm_val)

# Confusion Matrix - TESTE
cm = confusion_matrix(y_test, y_pred)
print(f"\nüìä Matriz de Confus√£o - TESTE:")
print("-" * 60)
print(cm)


In [None]:
# Visualiza√ß√£o das Matrizes de Confus√£o
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# VALIDA√á√ÉO - Normalizada
cm_val_normalized = cm_val.astype('float') / cm_val.sum(axis=1)[:, np.newaxis]
sns.heatmap(cm_val_normalized, annot=True, fmt='.2%', cmap='Blues', 
            xticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_val.unique())],
            yticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_val.unique())],
            ax=axes[0, 0], cbar_kws={'label': 'Propor√ß√£o'})
axes[0, 0].set_title('Matriz de Confus√£o Normalizada - VALIDA√á√ÉO', fontsize=14, fontweight='bold')
axes[0, 0].set_xlabel('Predi√ß√£o', fontsize=12)
axes[0, 0].set_ylabel('Valor Real', fontsize=12)

# VALIDA√á√ÉO - Valores Absolutos
sns.heatmap(cm_val, annot=True, fmt='d', cmap='Blues',
            xticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_val.unique())],
            yticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_val.unique())],
            ax=axes[0, 1], cbar_kws={'label': 'Quantidade'})
axes[0, 1].set_title('Matriz de Confus√£o (Valores Absolutos) - VALIDA√á√ÉO', fontsize=14, fontweight='bold')
axes[0, 1].set_xlabel('Predi√ß√£o', fontsize=12)
axes[0, 1].set_ylabel('Valor Real', fontsize=12)

# TESTE - Normalizada
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
sns.heatmap(cm_normalized, annot=True, fmt='.2%', cmap='Blues', 
            xticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_test.unique())],
            yticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_test.unique())],
            ax=axes[1, 0], cbar_kws={'label': 'Propor√ß√£o'})
axes[1, 0].set_title('Matriz de Confus√£o Normalizada - TESTE', fontsize=14, fontweight='bold')
axes[1, 0].set_xlabel('Predi√ß√£o', fontsize=12)
axes[1, 0].set_ylabel('Valor Real', fontsize=12)

# TESTE - Valores Absolutos
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_test.unique())],
            yticklabels=[f"{k}\n{situacao_map.get(k, '?')}" for k in sorted(y_test.unique())],
            ax=axes[1, 1], cbar_kws={'label': 'Quantidade'})
axes[1, 1].set_title('Matriz de Confus√£o (Valores Absolutos) - TESTE', fontsize=14, fontweight='bold')
axes[1, 1].set_xlabel('Predi√ß√£o', fontsize=12)
axes[1, 1].set_ylabel('Valor Real', fontsize=12)

plt.tight_layout()
plt.show()


## 7. An√°lise de Import√¢ncia das Features


In [None]:
# Import√¢ncia das features
print("="*60)
print("IMPORT√ÇNCIA DAS FEATURES")
print("="*60)

feature_importance = pd.DataFrame({
    'feature': feature_cols,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False)

print(f"\nüìä Top 30 Features Mais Importantes:")
print("-" * 70)
for i, row in feature_importance.head(30).iterrows():
    print(f"   {i+1:2d}. {row['feature']:45s}: {row['importance']:.4f}")

# Visualiza√ß√£o
fig, ax = plt.subplots(figsize=(12, 12))
top_n = 30
top_features = feature_importance.head(top_n)

ax.barh(range(len(top_features)), top_features['importance'], color='steelblue', edgecolor='black')
ax.set_yticks(range(len(top_features)))
ax.set_yticklabels(top_features['feature'], fontsize=10)
ax.set_xlabel('Import√¢ncia', fontsize=12)
ax.set_title(f'Top {top_n} Features Mais Importantes', fontsize=14, fontweight='bold')
ax.grid(axis='x', alpha=0.3)
ax.invert_yaxis()

plt.tight_layout()
plt.show()

# An√°lise por categoria de features
print(f"\nüìä Import√¢ncia por Categoria de Features:")
print("-" * 70)

categories = {
    'Temporais': ['ano', 'mes', 'trimestre', 'semestre', 'mes_sin', 'mes_cos', 'meses_desde_inicio'],
    'Categ√≥ricas': [c for c in feature_cols if '_encoded' in c],
    'Macroecon√¥micas': ['selic_meta_mensal_t_minus_1', 'ipca_acumulado_12m_t_minus_1', 
                        'ipca_mensal_t_minus_1', 'cambio_dolar_media_mensal_t_minus_1', 
                        'taxa_desemprego_t_minus_1'],
    'PGFN': ['pgfn_fgts_valor_acumulado_t_minus_1', 'pgfn_naoprev_valor_acumulado_t_minus_1',
             'pgfn_prev_valor_acumulado_t_minus_1', 'pgfn_fgts_ajuizados_t_minus_1'],
    'Lag': [c for c in feature_cols if 'lag' in c],
    'Rolling': [c for c in feature_cols if 'rolling' in c],
    'Empresa': ['tempo_atividade_anos', 'capital_social', 'situacao_cadastral_t_minus_1'],
    'Agregadas': [c for c in feature_cols if 'empresa' in c or 'posicao' in c]
}

for category, features in categories.items():
    cat_features = [f for f in features if f in feature_importance['feature'].values]
    if cat_features:
        cat_importance = feature_importance[feature_importance['feature'].isin(cat_features)]['importance'].sum()
        print(f"   {category:20s}: {cat_importance:.4f} ({len(cat_features)} features)")


## 8. Salvamento do Modelo em Pickle


In [None]:
# Salva o modelo e os indexers em pickle
print("="*60)
print("SALVAMENTO DO MODELO")
print("="*60)

# Cria diret√≥rio para modelos se n√£o existir
model_dir = 'models_pickle'
os.makedirs(model_dir, exist_ok=True)

# Salva o modelo
model_path = os.path.join(model_dir, 'random_forest_model.pkl')
with open(model_path, 'wb') as f:
    pickle.dump(model, f)
print(f"‚úÖ Modelo salvo em: {model_path}")

# Salva os mapeamentos dos indexers do Spark
# Nota: StringIndexer models do Spark n√£o s√£o diretamente serializ√°veis com pickle
# Vamos salvar um mapeamento dos valores √∫nicos para recriar os indexers
indexers_path = os.path.join(model_dir, 'spark_indexers_mapping.pkl')
indexers_mapping = {}

for col in categorical_cols:
    # Obt√©m os valores √∫nicos e seus √≠ndices do modelo treinado
    # O StringIndexer ordena por frequ√™ncia, ent√£o precisamos obter o mapeamento correto
    unique_values = all_data_for_encoding.select(col).distinct().orderBy(col).collect()
    unique_list = [str(row[col]) for row in unique_values]
    # Cria um mapeamento valor -> √≠ndice (o StringIndexer usa ordem de frequ√™ncia, mas salvamos ordem alfab√©tica)
    indexers_mapping[col] = {val: idx for idx, val in enumerate(unique_list)}

with open(indexers_path, 'wb') as f:
    pickle.dump(indexers_mapping, f)
print(f"‚úÖ Mapeamento dos indexers do Spark salvo em: {indexers_path}")

# Salva a lista de features
features_path = os.path.join(model_dir, 'feature_columns.pkl')
with open(features_path, 'wb') as f:
    pickle.dump(feature_cols, f)
print(f"‚úÖ Lista de features salva em: {features_path}")

# Salva informa√ß√µes do modelo
model_info = {
    'accuracy_val': accuracy_val,
    'accuracy_test': accuracy,
    'f1_macro_val': f1_macro_val,
    'f1_macro_test': f1_macro,
    'f1_weighted_val': f1_weighted_val,
    'f1_weighted_test': f1_weighted,
    'n_features': len(feature_cols),
    'n_train': len(X_train),
    'n_val': len(X_val),
    'n_test': len(X_test),
    'model_type': 'RandomForestClassifier',
    'n_estimators': 200,
    'max_depth': 15,
    'categorical_cols': categorical_cols
}

info_path = os.path.join(model_dir, 'model_info.pkl')
with open(info_path, 'wb') as f:
    pickle.dump(model_info, f)
print(f"‚úÖ Informa√ß√µes do modelo salvas em: {info_path}")

print(f"\nüì¶ Todos os arquivos salvos no diret√≥rio: {model_dir}/")
print(f"   - random_forest_model.pkl")
print(f"   - spark_indexers_mapping.pkl")
print(f"   - feature_columns.pkl")
print(f"   - model_info.pkl")


## 9. Exemplo de Carregamento e Uso do Modelo


In [None]:
# Exemplo de como carregar e usar o modelo salvo
print("="*60)
print("EXEMPLO DE CARREGAMENTO DO MODELO")
print("="*60)

# Carrega o modelo
with open(model_path, 'rb') as f:
    loaded_model = pickle.load(f)

# Carrega os mapeamentos dos indexers
with open(indexers_path, 'rb') as f:
    loaded_indexers_mapping = pickle.load(f)

# Carrega as features
with open(features_path, 'rb') as f:
    loaded_features = pickle.load(f)

print("‚úÖ Modelo e componentes carregados com sucesso!")

# Exemplo de predi√ß√£o com dados de teste
print(f"\nüìä Testando predi√ß√£o com {len(X_test)} amostras do conjunto de teste...")
sample_predictions = loaded_model.predict(X_test.head(10))
sample_proba = loaded_model.predict_proba(X_test.head(10))

print(f"\n   Primeiras 10 predi√ß√µes:")
for i, (pred, proba) in enumerate(zip(sample_predictions, sample_proba)):
    proba_max = proba.max()
    situacao_nome = situacao_map.get(pred, f'Desconhecida ({pred})')
    print(f"   Amostra {i+1}: Predi√ß√£o = {pred} ({situacao_nome}), Confian√ßa = {proba_max:.4f}")

print(f"\n‚úÖ Modelo funcionando corretamente!")


## 10. Resumo Final


In [None]:
# Resumo final
print("="*80)
print("                    RESUMO FINAL DA AN√ÅLISE                     ")
print("="*80)

print(f"\nüìä Dataset:")
print(f"   Total de registros: {total_registros:,}")
print(f"   CNPJs √∫nicos: {cnpjs_unicos:,}")
print(f"   Per√≠odo: {periodo_min.strftime('%Y-%m')} a {periodo_max.strftime('%Y-%m')}")
print(f"   Registros ap√≥s limpeza: {total_clean:,}")

print(f"\nüìä Divis√£o dos Dados:")
print(f"   Treino:     {len(X_train):,} registros (70.0%)")
print(f"   Valida√ß√£o:  {len(X_val):,} registros (20.0%)")
print(f"   Teste:      {len(X_test):,} registros (10.0%)")

print(f"\nüîß Features:")
# Calcula n√∫mero de features criadas (colunas adicionais)
n_features_created = len(df_features.columns) - 19  # 19 √© o n√∫mero original de colunas do dataset
print(f"   Features criadas: {n_features_created}")
print(f"   Features finais utilizadas: {len(feature_cols)}")

print(f"\nü§ñ Modelo:")
print(f"   Algoritmo: Random Forest")
print(f"   Processamento: Apache Spark")
print(f"   Formato de salvamento: Pickle")
print(f"\n   M√©tricas - VALIDA√á√ÉO:")
print(f"      Acur√°cia:        {accuracy_val:.4f} ({accuracy_val*100:.2f}%)")
print(f"      F1-Score (macro): {f1_macro_val:.4f}")
print(f"      F1-Score (weighted): {f1_weighted_val:.4f}")
print(f"\n   M√©tricas - TESTE:")
print(f"      Acur√°cia:        {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f"      F1-Score (macro): {f1_macro:.4f}")
print(f"      F1-Score (weighted): {f1_weighted:.4f}")

print(f"\nüìà Top 5 Features Mais Importantes:")
for i, row in feature_importance.head(5).iterrows():
    print(f"   {i+1}. {row['feature']:45s}: {row['importance']:.4f}")

print(f"\nüíæ Modelo Salvo:")
print(f"   Diret√≥rio: {model_dir}/")
print(f"   Arquivos: random_forest_model.pkl, spark_indexers_mapping.pkl, feature_columns.pkl, model_info.pkl")

print(f"\n‚úÖ An√°lise conclu√≠da!")
print("="*80)


In [None]:
# Encerra a Spark Session
spark.stop()
print("‚úÖ Spark Session encerrada.")
