## Importacao de Bibliotecas

Carrega todas as bibliotecas necessarias para o pipeline de imputacao. O modulo `enable_iterative_imputer` deve ser importado antes do `IterativeImputer` pois este ainda e considerado experimental no scikit-learn. O `RandomForestRegressor` sera utilizado como estimador base do imputador iterativo. As bibliotecas matplotlib e seaborn sao utilizadas para visualizacao dos resultados da avaliacao.

In [None]:
import os
import sys
from pathlib import Path
import pandas as pd
import numpy as np
import pickle
import time
import warnings
from glob import glob

warnings.filterwarnings('ignore')

from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import matplotlib.pyplot as plt
import seaborn as sns

sys.path.insert(0, str(Path('..') / 'fastapi'))
from services.mlflow_service import mlflow_service

plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')

In [None]:
EVALUATION_RATIO = 0.10
RANDOM_STATE = 42

print(f"Configuracao do Pipeline de Imputacao Automatizado")
print(f"  Ratio de avaliacao: {EVALUATION_RATIO * 100}%")
print(f"  Random state: {RANDOM_STATE}")

Dataset carregado: (526176, 27)
Total de registros: 526,176
Periodo: 2020-01-01 00:00:00 ate 2024-12-31 23:00:00
Colunas alvo: ['temperatura', 'umidade', 'velocidade_vento']
Total de features: 22


## Descoberta Automatica de Estacoes

Detecta automaticamente todas as estacoes disponiveis com base nos arquivos PKL gerados pelo notebook de tratamento. O sistema busca por arquivos no formato `dados_tratados_*.pkl` e extrai os nomes das estacoes para processamento em lote.

In [None]:
dados_files = glob('dados_tratados_*.pkl')
estacoes_disponiveis = []

for file in dados_files:
    estacao_name = file.replace('dados_tratados_', '').replace('.pkl', '')
    estacoes_disponiveis.append(estacao_name)

estacoes_disponiveis = sorted(estacoes_disponiveis)

print(f"\nEstacoes detectadas: {len(estacoes_disponiveis)}")
for i, estacao in enumerate(estacoes_disponiveis, 1):
    print(f"  {i}. {estacao.replace('_', ' ')}")

## Inicializacao do MLflow

Inicializa o servico MLflow para rastreamento de experimentos, parametros e metricas de cada estacao processada.

In [None]:
mlflow_service.initialize()

print(f"MLflow inicializado")
print(f"  Tracking URI: {mlflow_service.tracking_uri}")
print(f"  Experimento: Imputacao por Estacao")

## Funcao de Carregamento de Dados da Estacao

Define a funcao que carrega o dataset e metadados de uma estacao especifica a partir dos arquivos PKL gerados no notebook de tratamento.

In [None]:
def carregar_dados_estacao(estacao_filename):
    """
    Carrega dados e metadados de uma estacao.
    
    Args:
        estacao_filename: Nome do arquivo da estacao (ex: 'PETROLINA')
        
    Returns:
        tuple: (df, metadata)
    """
    pkl_filename = f'dados_tratados_{estacao_filename}.pkl'
    metadata_filename = f'metadata_tratamento_{estacao_filename}.pkl'
    
    df = pd.read_pickle(pkl_filename)
    with open(metadata_filename, 'rb') as f:
        metadata = pickle.load(f)
    
    return df, metadata

print("Funcao carregar_dados_estacao() definida")

## Funcao de Mascaramento para Avaliacao

Define a funcao que mascara uma fracao dos valores conhecidos para avaliar a qualidade da imputacao. Esta tecnica permite comparar valores imputados com valores reais, calculando metricas de erro objetivas.

In [None]:
def mascarar_dados(df, target_cols, evaluation_ratio, random_state):
    """
    Mascara valores para avaliacao da imputacao.
    
    Args:
        df: DataFrame com os dados
        target_cols: Lista de colunas alvo
        evaluation_ratio: Percentual de dados para mascarar
        random_state: Seed para reproducibilidade
        
    Returns:
        tuple: (df_masked, masked_data)
    """
    np.random.seed(random_state)
    df_masked = df.copy()
    masked_data = {}
    
    for col in target_cols:
        non_null_indices = df[col].dropna().index
        n_to_mask = int(len(non_null_indices) * evaluation_ratio)
        mask_indices = np.random.choice(non_null_indices, size=n_to_mask, replace=False)
        
        masked_data[col] = {
            'indices': mask_indices,
            'true_values': df_masked.loc[mask_indices, col].copy()
        }
        df_masked.loc[mask_indices, col] = np.nan
    
    return df_masked, masked_data

print("Funcao mascarar_dados() definida")

## Funcao de Configuracao do Imputador

Define a funcao que cria e configura o IterativeImputer com RandomForestRegressor como estimador base. Os parametros foram escolhidos para balancear qualidade de imputacao e tempo de processamento.

In [None]:
def criar_imputador(random_state):
    """
    Cria e configura o IterativeImputer.
    
    Args:
        random_state: Seed para reproducibilidade
        
    Returns:
        IterativeImputer configurado
    """
    rf_estimator = RandomForestRegressor(
        n_estimators=50,
        max_depth=10,
        min_samples_split=10,
        min_samples_leaf=5,
        random_state=random_state,
        n_jobs=-1
    )
    
    imputer = IterativeImputer(
        estimator=rf_estimator,
        max_iter=10,
        random_state=random_state,
        verbose=0
    )
    
    return imputer

print("Funcao criar_imputador() definida")

## Funcao de Execucao da Imputacao

Define a funcao que executa o processo de imputacao nos dados mascarados, preservando as colunas identificadoras (id, data, hora).

In [None]:
def executar_imputacao(imputer, df_masked, target_cols, feature_cols):
    """
    Executa a imputacao nos dados mascarados.
    
    Args:
        imputer: IterativeImputer configurado
        df_masked: DataFrame com valores mascarados
        target_cols: Lista de colunas alvo
        feature_cols: Lista de features
        
    Returns:
        DataFrame com valores imputados
    """
    columns_for_imputation = target_cols + feature_cols
    X_masked = df_masked[columns_for_imputation].copy()
    
    X_imputed = imputer.fit_transform(X_masked)
    
    df_imputed = pd.DataFrame(X_imputed, columns=columns_for_imputation, index=df_masked.index)
    df_imputed['id'] = df_masked['id']
    df_imputed['data'] = df_masked['data']
    df_imputed['hora'] = df_masked['hora']
    
    return df_imputed

print("Funcao executar_imputacao() definida")

## Funcao de Calculo de Metricas

Define a funcao que calcula as metricas de qualidade da imputacao comparando valores imputados com valores reais mascarados. As metricas incluem RMSE, MAE e R-quadrado.

In [None]:
def calcular_metricas(df_imputed, masked_data, target_cols):
    """
    Calcula metricas de avaliacao da imputacao.
    
    Args:
        df_imputed: DataFrame com valores imputados
        masked_data: Dicionario com valores mascarados
        target_cols: Lista de colunas alvo
        
    Returns:
        dict: Metricas de avaliacao por coluna
    """
    evaluation_results = {}
    
    for col in target_cols:
        indices = masked_data[col]['indices']
        true_values = masked_data[col]['true_values']
        imputed_values = df_imputed.loc[indices, col]
        
        rmse = np.sqrt(mean_squared_error(true_values, imputed_values))
        mae = mean_absolute_error(true_values, imputed_values)
        r2 = r2_score(true_values, imputed_values)
        
        evaluation_results[col] = {'RMSE': rmse, 'MAE': mae, 'R2': r2}
    
    return evaluation_results

print("Funcao calcular_metricas() definida")

## Funcao de Geracao de Visualizacao

Define a funcao que gera graficos de dispersao comparando valores reais versus valores imputados para cada variavel alvo. A linha diagonal representa a predicao perfeita.

In [None]:
def gerar_visualizacao(df_imputed, masked_data, target_cols, evaluation_results, estacao_nome, estacao_filename):
    """
    Gera graficos de avaliacao da imputacao.
    
    Args:
        df_imputed: DataFrame com valores imputados
        masked_data: Dicionario com valores mascarados
        target_cols: Lista de colunas alvo
        evaluation_results: Metricas de avaliacao
        estacao_nome: Nome da estacao
        estacao_filename: Nome do arquivo da estacao
        
    Returns:
        str: Caminho do arquivo de plot salvo
    """
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    fig.suptitle(f'Avaliacao da Imputacao - {estacao_nome}', fontsize=14, fontweight='bold')
    
    for idx, col in enumerate(target_cols):
        ax = axes[idx]
        indices = masked_data[col]['indices']
        true_values = masked_data[col]['true_values']
        imputed_values = df_imputed.loc[indices, col]
        
        ax.scatter(true_values, imputed_values, alpha=0.5, s=20)
        min_val = min(true_values.min(), imputed_values.min())
        max_val = max(true_values.max(), imputed_values.max())
        ax.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, alpha=0.7)
        
        ax.set_xlabel('Valor Real', fontsize=11, fontweight='bold')
        ax.set_ylabel('Valor Imputado', fontsize=11, fontweight='bold')
        ax.set_title(
            f"{col.upper()}\nRMSE: {evaluation_results[col]['RMSE']:.3f} | R2: {evaluation_results[col]['R2']:.3f}",
            fontsize=12, fontweight='bold'
        )
        ax.grid(True, alpha=0.3)
        ax.set_aspect('equal', adjustable='box')
    
    plt.tight_layout()
    output_plot = f'avaliacao_imputacao_{estacao_filename}.png'
    plt.savefig(output_plot, dpi=150, bbox_inches='tight')
    plt.close()
    
    return output_plot

print("Funcao gerar_visualizacao() definida")

## Funcao de Exportacao de Resultados

Define a funcao que exporta os resultados da imputacao para arquivos locais, incluindo o dataset completo, dados para update no banco e metricas de avaliacao.

In [None]:
def exportar_resultados(df_final, evaluation_results, estacao_filename):
    """
    Exporta resultados da imputacao para arquivos.
    
    Args:
        df_final: DataFrame com dados imputados finais
        evaluation_results: Metricas de avaliacao
        estacao_filename: Nome do arquivo da estacao
        
    Returns:
        dict: Caminhos dos arquivos exportados
    """
    df_for_update = pd.DataFrame({
        'id': df_final['id'],
        'data': df_final['data'],
        'hora': df_final['hora'],
        'temperatura': df_final['temperatura'],
        'umidade': df_final['umidade'],
        'velocidade_vento': df_final['velocidade_vento']
    })
    
    output_pkl = f'dados_imputados_{estacao_filename}.pkl'
    output_csv = f'dados_para_update_neon_{estacao_filename}.csv'
    output_metrics = f'metricas_imputacao_{estacao_filename}.pkl'
    
    df_final.to_pickle(output_pkl)
    df_for_update.to_csv(output_csv, index=False)
    with open(output_metrics, 'wb') as f:
        pickle.dump(evaluation_results, f)
    
    return {
        'pkl': output_pkl,
        'csv': output_csv,
        'metrics': output_metrics
    }

print("Funcao exportar_resultados() definida")

## Funcao Principal de Processamento

Define a funcao principal que orquestra todo o pipeline de imputacao para uma estacao: carregamento, mascaramento, imputacao, avaliacao, visualizacao, exportacao e registro no MLflow.

In [None]:
def processar_estacao(estacao_filename, evaluation_ratio, random_state):
    """
    Processa imputacao completa para uma estacao.
    
    Args:
        estacao_filename: Nome do arquivo da estacao
        evaluation_ratio: Percentual de dados para mascaramento
        random_state: Seed para reproducibilidade
        
    Returns:
        dict: Metricas de avaliacao
    """
    print("\n" + "=" * 70)
    print(f"PROCESSANDO ESTACAO: {estacao_filename.replace('_', ' ')}")
    print("=" * 70)
    
    start_time = time.time()
    
    df, metadata = carregar_dados_estacao(estacao_filename)
    target_cols = metadata['target_cols']
    feature_cols = metadata['feature_cols']
    estacao_nome = metadata['estacao']
    
    print(f"\n1. Dataset carregado: {df.shape}")
    print(f"   Registros: {len(df):,}")
    print(f"   Periodo: {metadata['date_range'][0]} ate {metadata['date_range'][1]}")
    
    df_original = df.copy()
    df_masked, masked_data = mascarar_dados(df, target_cols, evaluation_ratio, random_state)
    
    print(f"\n2. Mascaramento para avaliacao ({evaluation_ratio*100}%):")
    for col in target_cols:
        n_masked = len(masked_data[col]['indices'])
        print(f"   {col}: {n_masked:,} valores mascarados")
    
    imputer = criar_imputador(random_state)
    
    print(f"\n3. Executando imputacao...")
    df_imputed = executar_imputacao(imputer, df_masked, target_cols, feature_cols)
    
    print(f"\n4. Calculando metricas de avaliacao:")
    evaluation_results = calcular_metricas(df_imputed, masked_data, target_cols)
    for col in target_cols:
        r = evaluation_results[col]
        print(f"   {col}: RMSE={r['RMSE']:.4f}, MAE={r['MAE']:.4f}, R2={r['R2']:.4f}")
    
    print(f"\n5. Gerando visualizacao...")
    output_plot = gerar_visualizacao(df_imputed, masked_data, target_cols, evaluation_results, estacao_nome, estacao_filename)
    
    print(f"\n6. Imputacao final nos dados originais...")
    df_final = executar_imputacao(imputer, df_original, target_cols, feature_cols)
    
    print(f"\n7. Exportando resultados...")
    output_files = exportar_resultados(df_final, evaluation_results, estacao_filename)
    
    duration = time.time() - start_time
    
    print(f"\n8. Registrando no MLflow...")
    mlflow_service.log_imputation_run(
        station_name=estacao_nome,
        metrics={
            'avg_rmse': np.mean([m['RMSE'] for m in evaluation_results.values()]),
            'avg_mae': np.mean([m['MAE'] for m in evaluation_results.values()]),
            'avg_r2': np.mean([m['R2'] for m in evaluation_results.values()]),
            'temperatura_rmse': evaluation_results['temperatura']['RMSE'],
            'umidade_rmse': evaluation_results['umidade']['RMSE'],
            'velocidade_vento_rmse': evaluation_results['velocidade_vento']['RMSE'],
            'duration_seconds': duration
        },
        params={
            'n_estimators': 50,
            'max_depth': 10,
            'min_samples_split': 10,
            'min_samples_leaf': 5,
            'max_iter': 10,
            'evaluation_ratio': evaluation_ratio,
            'n_records': len(df),
            'n_features': len(feature_cols)
        },
        artifacts={
            'plot': output_plot,
            'metrics_file': output_files['metrics']
        }
    )
    
    print(f"\nEstacao {estacao_nome} processada em {duration:.2f}s")
    print("=" * 70)
    
    return evaluation_results

print("Funcao processar_estacao() definida")

## Execucao Automatica para Todas as Estacoes

Loop principal que processa automaticamente todas as estacoes detectadas, registrando metricas individuais e consolidadas no MLflow.

In [None]:
total_start = time.time()
resultados_todas_estacoes = {}

print("\n" + "=" * 70)
print(f"INICIANDO PROCESSAMENTO DE {len(estacoes_disponiveis)} ESTACOES")
print("=" * 70)

for i, estacao_filename in enumerate(estacoes_disponiveis, 1):
    print(f"\n[{i}/{len(estacoes_disponiveis)}] Processando: {estacao_filename.replace('_', ' ')}")
    
    try:
        metricas = processar_estacao(
            estacao_filename=estacao_filename,
            evaluation_ratio=EVALUATION_RATIO,
            random_state=RANDOM_STATE
        )
        resultados_todas_estacoes[estacao_filename] = metricas
        
    except Exception as e:
        print(f"ERRO ao processar {estacao_filename}: {str(e)}")
        continue

total_duration = time.time() - total_start

print("\n" + "=" * 70)
print("RESUMO GERAL DO PROCESSAMENTO")
print("=" * 70)
print(f"\nEstacoes processadas: {len(resultados_todas_estacoes)}/{len(estacoes_disponiveis)}")
print(f"Tempo total: {total_duration:.2f}s ({total_duration/60:.2f} minutos)")
print(f"\nMetricas medias por variavel:")

for var in ['temperatura', 'umidade', 'velocidade_vento']:
    rmse_values = [r[var]['RMSE'] for r in resultados_todas_estacoes.values()]
    mae_values = [r[var]['MAE'] for r in resultados_todas_estacoes.values()]
    r2_values = [r[var]['R2'] for r in resultados_todas_estacoes.values()]
    
    print(f"\n  {var.upper()}:")
    print(f"    RMSE medio: {np.mean(rmse_values):.4f} (std: {np.std(rmse_values):.4f})")
    print(f"    MAE medio: {np.mean(mae_values):.4f} (std: {np.std(mae_values):.4f})")
    print(f"    R2 medio: {np.mean(r2_values):.4f} (std: {np.std(r2_values):.4f})")

print("\n" + "=" * 70)
print("PIPELINE COMPLETO CONCLUIDO COM SUCESSO")
print("=" * 70)