
## Análise de Drift Univariada - Método Conformal
### Modelo Numerários
 


Este notebook implementa a detecção de drift usando o método conformal baseado na diferença entre valores preditos e reais.
**Referência**: https://arxiv.org/abs/2102.10439

### 1. Importação de Bibliotecas


In [19]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from datetime import datetime, timedelta

### 2. Configurações de visualização

In [20]:


plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10



### 3. Funções do Método Conformal

 Implementações baseadas no código wine.ipynb

#### 3.1 Cálculo dos p-valores conformais baseados na diferença entre valores reais e preditos

    Args:
        y_true: Valores reais
        y_pred: Valores preditos
        rtol, atol: Tolerâncias para comparação de valores
        
    Returns:
        p_det: P-valores determinísticos
        p_rnd: P-valores randomizados

In [21]:

def calcular_pvalores_pred(y_true, y_pred, rtol=1e-03, atol=1e-03):

    # Calcula os scores de não-conformidade (erro)
    alpha = y_true - y_pred
    N = len(alpha)
    p_det = np.zeros(N)
    p_rnd = np.zeros(N)
    
    for n in tqdm(range(N), desc="Calculando p-valores"):
        alpha_n = alpha[n]
        # Considera todos os scores até o momento atual (incluindo o atual)
        anteriores = alpha[:n+1]
        
        if n == 0:
            p_det[0] = 1
            p_rnd[0] = 1
            continue
        
        # P-valor determinístico: proporção de scores maiores ou iguais
        p_det[n] = np.mean(anteriores >= alpha_n)
        
        # P-valor randomizado: trata empates de forma aleatória
        countG = np.sum(anteriores > alpha_n)
        countE = np.sum(np.isclose(anteriores, alpha_n, rtol=rtol, atol=atol))
        u = np.random.uniform() if countE > 0 else 0
        p_rnd[n] = (countG + u * countE) / (n + 1)
    
    return p_det, p_rnd


#### 3.2 Cálculo do Power Martingale a partir de p-valores

    Args:
        p_values: Array de p-valores
        epsilon: Parâmetro do power martingale (0 < epsilon < 1)
        
    Returns:
        M: Array com valores do martingale

In [22]:

def power_martingale(p_values, epsilon=0.92):

    # Evita p-valores zero (proteção contra log(0))
    p_values = np.maximum(p_values, 1e-10)
    
    # Power martingale: M_n = ∏(epsilon * p_i^(epsilon-1))
    betting = epsilon * (p_values ** (epsilon - 1))
    M = np.cumprod(betting)
    
    return M


#### 3.3 Simple Jumper Martingale

    Args:
        p_values: Array de p-valores (0 < p <= 1)
        J: Probabilidade de mudar de estratégia
        
    Returns:
        capital: Array com valores do martingale

In [23]:

def simple_jumper_martingale(p_values, J=0.01):


    n = len(p_values)
    capital = np.zeros(n + 1)
    capital[0] = 1.0  # S0 = 1
    
    # Capital inicial para cada estratégia (3 estratégias: conservadora, neutra, agressiva)
    C = {epsilon: 1/3 for epsilon in [-1, 0, 1]}  # C_{-1}, C_0, C_1
    
    for i in range(n):
        # Etapa 1: transição (Markov chain) - possibilidade de mudar de estratégia
        C_new = {}
        total = sum(C.values())
        for epsilon in [-1, 0, 1]:
            # (1-J) mantém estratégia atual, J/3 muda para qualquer estratégia
            C_new[epsilon] = (1 - J) * C[epsilon] + J * total / 3
        
        # Etapa 2: update capital com função de aposta
        p = p_values[i]
        for epsilon in [-1, 0, 1]:
            # Função de aposta: f(p) = 1 + ε(p - 0.5)
            f_eps = 1 + epsilon * (p - 0.5)
            C_new[epsilon] *= f_eps
        
        # Soma dos capitais de todas as estratégias
        capital[i + 1] = sum(C_new.values())
        C = C_new
    
    return capital[1:]  # Retorna S_1 até S_n


##### Calculando o score por outros métodos
tirar aspas para rodar

In [24]:
"""
ef calcular_score_nao_conformidade(y_true, y_pred, metodo='absoluto', janela_historico=30, epsilon=1e-8):
    
    # Calcula diferentes tipos de scores de não-conformidade.
    
    #Args:
     #   y_true: Valores reais
     #   y_pred: Valores preditos
     #   metodo: Tipo de score ('absoluto', 'relativo', 'z_score', 'mad', 'dist_min', 'dist_media')
     #   janela_historico: Tamanho da janela para métodos que usam histórico
     #   epsilon: Termo para evitar divisão por zero
        
  #  Returns:
   #     scores: Array com scores de não-conformidade
        
    # Erro bruto (base para vários métodos)
    erros = y_true - y_pred
    n = len(erros)
    scores = np.zeros(n)
    
    if metodo == 'absoluto':
        # A. Erro Absoluto Simples
        scores = np.abs(erros)
        
    elif metodo == 'relativo':
        # B. Erro Relativo
        scores = np.abs(erros) / (np.abs(y_true) + epsilon)
        
    elif metodo == 'z_score':
        # C. Erro Padronizado (Z-score)
        for i in range(n):
            if i < janela_historico:
                # Para início da série, usar dados disponíveis
                if i > 0:
                    std_hist = np.std(erros[:i])
                    if std_hist > 0:
                        scores[i] = np.abs(erros[i]) / std_hist
                    else:
                        scores[i] = np.abs(erros[i])
                else:
                    scores[i] = 0
            else:
                # Usar janela deslizante
                janela_erros = erros[i-janela_historico:i]
                std_hist = np.std(janela_erros)
                if std_hist > 0:
                    scores[i] = np.abs(erros[i]) / std_hist
                else:
                    scores[i] = np.abs(erros[i])
                    
    elif metodo == 'mad':
        # D. Erro Normalizado por MAD
        for i in range(n):
            if i < janela_historico:
                if i > 0:
                    janela_erros = erros[:i]
                    mad = np.median(np.abs(janela_erros - np.median(janela_erros)))
                    # Fator 1.4826 converte MAD para estimativa de desvio padrão
                    scores[i] = np.abs(erros[i]) / (1.4826 * mad + epsilon)
                else:
                    scores[i] = 0
            else:
                janela_erros = erros[i-janela_historico:i]
                mad = np.median(np.abs(janela_erros - np.median(janela_erros)))
                scores[i] = np.abs(erros[i]) / (1.4826 * mad + epsilon)
                
    elif metodo == 'dist_min':
        # E. Distância Mínima (como no conformal_whine)
        for i in range(n):
            if i > 0:
                # Distância mínima aos erros anteriores
                historico = erros[:i]
                distancias = np.abs(erros[i] - historico)
                scores[i] = np.min(distancias)
            else:
                scores[i] = 0
                
    elif metodo == 'dist_media':
        # F. Distância Média
        for i in range(n):
            if i > 0:
                # Distância média aos erros anteriores
                historico = erros[:i]
                distancias = np.abs(erros[i] - historico)
                scores[i] = np.mean(distancias)
            else:
                scores[i] = 0
    
    else:
        raise ValueError(f"Método '{metodo}' não reconhecido. Use: 'absoluto', 'relativo', 'z_score', 'mad', 'dist_min', 'dist_media'")
    
    return scores


def calcular_pvalores_pred_com_score(y_true, y_pred, metodo_score='absoluto', 
                                    janela_historico=30, rtol=1e-03, atol=1e-03):

#    Calcula p-valores conformais usando diferentes scores de não-conformidade.
    
 #   Esta é uma versão da função # Calcular p-valores conformais p_det, p_rnd = calcular_pvalores_pred(y_real, y_pred) 
 # que permite escolher o método de score
  
    
    # Calcular scores de não-conformidade usando o método escolhido
    scores = calcular_score_nao_conformidade(y_true, y_pred, metodo=metodo_score, 
                                            janela_historico=janela_historico)
    
    N = len(scores)
    p_det = np.zeros(N)
    p_rnd = np.zeros(N)
    
    for n in tqdm(range(N), desc=f"Calculando p-valores ({metodo_score})"):
        score_n = scores[n]
        # Considera todos os scores até o momento atual (incluindo o atual)
        anteriores = scores[:n+1]
        
        if n == 0:
            p_det[0] = 1
            p_rnd[0] = 1
            continue
        
        # P-valor determinístico: proporção de scores maiores ou iguais
        p_det[n] = np.mean(anteriores >= score_n)
        
        # P-valor randomizado: trata empates de forma aleatória
        countG = np.sum(anteriores > score_n)
        countE = np.sum(np.isclose(anteriores, score_n, rtol=rtol, atol=atol))
        u = np.random.uniform() if countE > 0 else 0
        p_rnd[n] = (countG + u * countE) / (n + 1)
    
    return p_det, p_rnd

"""


'\nef calcular_score_nao_conformidade(y_true, y_pred, metodo=\'absoluto\', janela_historico=30, epsilon=1e-8):\n\n    # Calcula diferentes tipos de scores de não-conformidade.\n\n    #Args:\n     #   y_true: Valores reais\n     #   y_pred: Valores preditos\n     #   metodo: Tipo de score (\'absoluto\', \'relativo\', \'z_score\', \'mad\', \'dist_min\', \'dist_media\')\n     #   janela_historico: Tamanho da janela para métodos que usam histórico\n     #   epsilon: Termo para evitar divisão por zero\n\n  #  Returns:\n   #     scores: Array com scores de não-conformidade\n\n    # Erro bruto (base para vários métodos)\n    erros = y_true - y_pred\n    n = len(erros)\n    scores = np.zeros(n)\n\n    if metodo == \'absoluto\':\n        # A. Erro Absoluto Simples\n        scores = np.abs(erros)\n\n    elif metodo == \'relativo\':\n        # B. Erro Relativo\n        scores = np.abs(erros) / (np.abs(y_true) + epsilon)\n\n    elif metodo == \'z_score\':\n        # C. Erro Padronizado (Z-score)\n

### 4.  Input e preparação das tabelas

In [25]:

def carrega_prepara_dados():
    
    # Carregar features (valores reais)
    features_df = pd.read_parquet('features_numerario.parquet')
    print(f"features: {features_df.shape}")
    
    # Carregar previsões
    previsoes_df = pd.read_csv('previsoes_numerario_pre_pos_pandemia.csv')
    print(f"Previsões: {previsoes_df.shape}")
    
    # Padronizar nomes de colunas das previsões para combinar com features
    previsoes_df = previsoes_df.rename(columns={
        'DEP_CEI': 'DEPCEI',
        'DEPOSITO': 'DEP',
        'SAQUE': 'SAQ',
        'SAQUE_CEI': 'SAQCEI'
    })
    
    # Converter colunas de data
    features_df['DATA'] = pd.to_datetime(features_df['DATA'])
    previsoes_df['DATA'] = pd.to_datetime(previsoes_df['DATA'])
    
    # Mostrar agências disponíveis
    agencias_features = features_df['AGENCIA'].unique()
    agencias_previsoes = previsoes_df['AGENCIA'].unique()
    agencias_comuns = np.intersect1d(agencias_features, agencias_previsoes)
    
    print(f"\n Resumo - agências:")
    print(f"   - Agências em features: {len(agencias_features)}")
    print(f"   - Agências em previsões: {len(agencias_previsoes)}")
    print(f"   - Agências em comum: {len(agencias_comuns)}")
    
    return features_df, previsoes_df, sorted(agencias_comuns)


In [26]:

# Carregar dados
features_df, previsoes_df, agencias_disponiveis = carrega_prepara_dados()


FileNotFoundError: [Errno 2] No such file or directory: 'features_numerario.parquet'


### 5. Seleção de Agência para Análise


In [None]:
print(f"Agências disponíveis: {agencias_disponiveis[:20]}...")  
print(f"Total de agências: {len(agencias_disponiveis)}")

In [None]:
# Escolher agência 
AGENCIA_ESCOLHIDA = agencias_disponiveis[0]  

In [None]:
AGENCIA_ESCOLHIDA = 20  # Substitua pelo ID agência 

### 6. Merge dos Dados para a Agência Selecionada

merge entre valores reais e previstos

In [None]:
def preparar_dados_agencia(features_df, previsoes_df, agencia):
    
    # Filtrar dados da agência
    features_agencia = features_df[features_df['AGENCIA'] == agencia].copy()
    previsoes_agencia = previsoes_df[previsoes_df['AGENCIA'] == agencia].copy()
    
    print(f"   - Registros em features: {len(features_agencia)}")
    print(f"   - Registros em previsões: {len(previsoes_agencia)}")
    
    # Variáveis alvo
    variaveis_alvo = ['SAQ', 'DEP', 'SAQCEI', 'DEPCEI']
    
    # Criar DataFrames separados para cada variável
    dfs_merged = {}
    
    for var in variaveis_alvo:
        # Selecionar colunas necessárias
        df_real = features_agencia[['DATA', var]].copy()
        df_real.columns = ['DATA', f'{var}_REAL'] #Adiciona label _REAL na frente
        
        df_pred = previsoes_agencia[['DATA', var]].copy()
        df_pred.columns = ['DATA', f'{var}_PRED'] #Adiciona label _PRED na frente
        
        # Merge por DATA
        df_merged = pd.merge(df_real, df_pred, on='DATA', how='inner')
        
        # Ordenar por data
        df_merged = df_merged.sort_values('DATA').reset_index(drop=True)
        
        # Calcular erro (diferença real - predito)
        df_merged['ERRO'] = df_merged[f'{var}_REAL'] - df_merged[f'{var}_PRED']
        
        dfs_merged[var] = df_merged
        
        print(f"- {var}: {len(df_merged)} registros após merge")
    
    # Verificar período de dados
    primeira_data = min(df['DATA'].min() for df in dfs_merged.values())
    ultima_data = max(df['DATA'].max() for df in dfs_merged.values())
    print(f"\n Período de análise: {primeira_data.date()} a {ultima_data.date()}")
    
    return dfs_merged


In [None]:
# Preparar dados da agência escolhida
dfs_agencia = preparar_dados_agencia(features_df, previsoes_df, AGENCIA_ESCOLHIDA)

### 7. Análise Conformal por Target

Função para realizar a análise de drift em cada uma das targets separadamente

In [None]:

def analisar_drift_variavel(df_variavel, nome_variavel, epsilon=0.92):

    print(f" TARGET: {nome_variavel}")
    
    # Extrair valores reais e preditos
    y_real = df_variavel[f'{nome_variavel}_REAL'].values
    y_pred = df_variavel[f'{nome_variavel}_PRED'].values
    
    # Estatísticas básicas
    erro_medio = np.mean(y_real - y_pred)
    mae = np.mean(np.abs(y_real - y_pred))
    rmse = np.sqrt(np.mean((y_real - y_pred)**2))
    
    print(f"\n Estatísticas do modelo:")
    print(f"   - Erro médio: {erro_medio:.2f}")
    print(f"   - MAE: {mae:.2f}")
    print(f"   - RMSE: {rmse:.2f}")
    
    # Calcular p-valores
    p_det, p_rnd = calcular_pvalores_pred(y_real, y_pred)
    
    # Calcular martingales
    mart_power_det = power_martingale(p_det, epsilon)
    mart_power_rnd = power_martingale(p_rnd, epsilon)
    mart_jumper_det = simple_jumper_martingale(p_det)
    mart_jumper_rnd = simple_jumper_martingale(p_rnd)
    
    # Detectar pontos de mudança (threshold = 20 corresponde a α = 0.05)
    threshold = 20
    mudancas_power = np.where(mart_power_rnd > threshold)[0]
    mudancas_jumper = np.where(mart_jumper_rnd > threshold)[0]
    
    print(f"   - Power Martingale: {len(mudancas_power)} pontos detectados")
    if len(mudancas_power) > 0:
        print(f"     Primeira detecção: índice {mudancas_power[0]} ({df_variavel.iloc[mudancas_power[0]]['DATA'].date()})")
    
    print(f"   - Simple Jumper: {len(mudancas_jumper)} pontos detectados")
    if len(mudancas_jumper) > 0:
        print(f"     Primeira detecção: índice {mudancas_jumper[0]} ({df_variavel.iloc[mudancas_jumper[0]]['DATA'].date()})")
    
    # Retornar resultados
    return {
        'df': df_variavel,
        'p_valores': {'det': p_det, 'rnd': p_rnd},
        'martingales': {
            'power_det': mart_power_det,
            'power_rnd': mart_power_rnd,
            'jumper_det': mart_jumper_det,
            'jumper_rnd': mart_jumper_rnd
        },
        'deteccoes': {
            'power': mudancas_power,
            'jumper': mudancas_jumper
        },
        'metricas': {
            'erro_medio': erro_medio,
            'mae': mae,
            'rmse': rmse
        }
    }

##### aplica os diferentes métodos de score e compara resultados

In [None]:
"""
def analisar_drift_variavel(df_variavel, nome_variavel, features_agencia, epsilon=0.92):

    #Analisa drift para uma variável usando múltiplos scores de não-conformidade.
    
    #Args:
      #  df_variavel: DataFrame com dados da variável (real, pred, erro)
      #  nome_variavel: Nome da variável sendo analisada
      #  features_agencia: DataFrame com features da agência (já filtrado)
      #  epsilon: Parâmetro do power martingale
    
    # Definir métodos a comparar
    metodos_scores = ['absoluto', 'relativo', 'z_score', 'mad', 
                      'dist_min', 'dist_media', 'contexto', 'multi_contexto']
    
    resultados_por_metodo = {}
    
    # Comparar cada método
    for metodo in metodos_scores:
        print(f"\n Testando método: {metodo.upper()}")
        
        # Para métodos contextuais, precisamos das features
        if metodo in ['contexto', 'multi_contexto']:
            # Fazer merge com features de calendário
            df_contexto = pd.merge(df_variavel, df_features_completo, 
                                 on=['AGENCIA', 'DATA'], how='left')
            
            # Selecionar features relevantes
            features_contexto = ['DIA_SEMANA', 'SEMANA_QUINTO_DU', 'DIA_FERIADO', 
                               'DIA_UTIL', 'EMENDA', 'DIA_ADJACENTE_FERIADO']
            contexto_df = df_contexto[features_contexto]
            tipo_dia = df_contexto['DIA_SEMANA'].values
        else:
            contexto_df = None
            tipo_dia = None
        
        # Calcular p-valores com o método específico
        p_det, p_rnd = calcular_pvalores_pred_com_score(
            y_real, y_pred, 
            metodo_score=metodo,
            janela_historico=30,
            contexto_features=contexto_df,
            tipo_dia=tipo_dia
        )
        
        # Calcular martingales
        mart_power = power_martingale(p_rnd, epsilon)
        mart_jumper = simple_jumper_martingale(p_rnd)
        
        # Detectar mudanças
        threshold = 20
        mudancas_power = np.where(mart_power > threshold)[0]
        mudancas_jumper = np.where(mart_jumper > threshold)[0]
        
        # Armazenar resultados
        resultados_por_metodo[metodo] = {
            'p_valores': {'det': p_det, 'rnd': p_rnd},
            'martingales': {
                'power': mart_power,
                'jumper': mart_jumper
            },
            'deteccoes': {
                'power': mudancas_power,
                'jumper': mudancas_jumper
            },
            'primeira_deteccao': mudancas_power[0] if len(mudancas_power) > 0 else None,
            'max_martingale': np.max(mart_power)
        }
        
        # Imprimir resumo
        print(f"   - Detecções (Power): {len(mudancas_power)}")
        if len(mudancas_power) > 0:
            print(f"   - Primeira detecção: índice {mudancas_power[0]} " +
                  f"({df_variavel.iloc[mudancas_power[0]]['DATA'].date()})")
        print(f"   - Max martingale: {np.max(mart_power):.2f}")
    
    
    return {
        'metodos': resultados_por_metodo,
        'df': df_variavel,
        'estatisticas': {
            'erro_medio': np.mean(y_real - y_pred),
            'mae': np.mean(np.abs(y_real - y_pred)),
            'rmse': np.sqrt(np.mean((y_real - y_pred)**2))
        }
    }
"""

### 8. Executar loop para rodar todas as targets

In [None]:
resultados = {}
variaveis = ['SAQ', 'DEP', 'SAQCEI', 'DEPCEI']

for var in variaveis:
    if var in dfs_agencia and len(dfs_agencia[var]) > 0:
        resultados[var] = analisar_drift_variavel(dfs_agencia[var], var, features_df)

"""para executar múltiplos métodos"""

elif metodo == 'contexto':
    # G. Score Contextual por Tipo de Dia
    # Precisa receber contexto_features e tipo_dia como parâmetros
    for i in range(n):
        if i < janela_historico:
            scores[i] = np.abs(erros[i])  # Fallback simples no início
        else:
            # Encontrar dias similares no histórico
            tipo_atual = tipo_dia[i]
            mask_similar = tipo_dia[i-janela_historico:i] == tipo_atual
            
            if np.any(mask_similar):
                erros_similares = erros[i-janela_historico:i][mask_similar]
                if len(erros_similares) > 0:
                    mad = np.median(np.abs(erros_similares - np.median(erros_similares)))
                    scores[i] = np.abs(erros[i]) / (1.4826 * mad + epsilon)
                else:
                    scores[i] = np.abs(erros[i])
            else:
                scores[i] = np.abs(erros[i])

elif metodo == 'multi_contexto':
    # H. Score Multi-contextual (necessita scipy)
    from scipy.spatial.distance import mahalanobis
    from scipy.linalg import pinv
    # ... implementação com distância de Mahalanobis


### 9. Visualizações


Função gerar gráfico

In [None]:
def plotar_analise_completa(resultados, variavel):

    if variavel not in resultados:
        print(f" Variável {variavel} não encontrada nos resultados!")
        return
    
    res = resultados[variavel]
    df = res['df']
    
    # Criar figura com subplots
    fig, axes = plt.subplots(4, 1, figsize=(14, 16))
    
    # 1. Valores Reais vs Preditos
    ax1 = axes[0]
    ax1.plot(df['DATA'], df[f'{variavel}_REAL'], label='Real', alpha=0.7, linewidth=1.5)
    ax1.plot(df['DATA'], df[f'{variavel}_PRED'], label='Predito', alpha=0.7, linewidth=1.5)
    ax1.set_title(f'{variavel} - Valores Reais vs Preditos', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Data')
    ax1.set_ylabel('Valor')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Marcar detecções
    for idx in res['deteccoes']['power'][:5]:  # Primeiras 5 detecções
        ax1.axvline(df.iloc[idx]['DATA'], color='red', linestyle='--', alpha=0.5)
    
    # 2. Erro ao longo do tempo
    ax2 = axes[1]
    ax2.plot(df['DATA'], df['ERRO'], color='orange', alpha=0.7)
    ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    ax2.fill_between(df['DATA'], df['ERRO'], 0, alpha=0.3, color='orange')
    ax2.set_title('Erro de Predição (Real - Predito)', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Data')
    ax2.set_ylabel('Erro')
    ax2.grid(True, alpha=0.3)
    
    # 3. P-valores ao longo do tempo
    ax3 = axes[2]
    ax3.plot(df['DATA'], res['p_valores']['det'], label='P-valor Det.', alpha=0.7)
    ax3.plot(df['DATA'], res['p_valores']['rnd'], label='P-valor Rnd.', alpha=0.7)
    ax3.axhline(y=0.05, color='red', linestyle='--', alpha=0.5, label='α=0.05')
    ax3.set_title('P-valores Conformais', fontsize=14, fontweight='bold')
    ax3.set_xlabel('Data')
    ax3.set_ylabel('P-valor')
    ax3.set_ylim([0, 1])
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    
    # 4. Martingales (escala log)
    ax4 = axes[3]
    ax4.plot(df['DATA'], res['martingales']['power_rnd'], 
             label='Power Martingale', color='blue', linewidth=2)
    ax4.plot(df['DATA'], res['martingales']['jumper_rnd'], 
             label='Simple Jumper', color='green', linewidth=2)
    ax4.axhline(y=20, color='red', linestyle='--', alpha=0.5, label='Threshold (α=0.05)')
    ax4.set_yscale('log')
    ax4.set_title('Martingales para Detecção de Drift', fontsize=14, fontweight='bold')
    ax4.set_xlabel('Data')
    ax4.set_ylabel('Martingale (log scale)')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    # Marcar pontos de mudança detectados
    for idx in res['deteccoes']['power'][:5]:
        ax4.axvline(df.iloc[idx]['DATA'], color='red', linestyle=':', alpha=0.7)
        ax4.text(df.iloc[idx]['DATA'], ax4.get_ylim()[1]*0.8, 'Drift!', 
                rotation=90, verticalalignment='bottom', color='red')
    
    plt.suptitle(f'Análise de Drift - Agência {AGENCIA_ESCOLHIDA}', 
                 fontsize=16, fontweight='bold')
    plt.tight_layout()
    plt.show()


gerar gráfico para cada target

In [None]:
for var in variaveis:
    if var in resultados:
        plotar_analise_completa(resultados, var)

para múltiplos métodos: 

def plotar_comparacao_metodos(resultados_variavel):
    """
    Compara os diferentes métodos de score para uma variável.
    """
    metodos = resultados_variavel['metodos']
    
    fig, ax = plt.subplots(figsize=(12, 8))
    
    for metodo, res in metodos.items():
        ax.plot(res['martingales']['power'], label=f'{metodo}', alpha=0.7)
    
    ax.set_yscale('log')
    ax.axhline(y=20, color='red', linestyle='--', alpha=0.5)
    ax.legend()
    ax.set_title('Comparação de Métodos de Score')
    plt.show()


### 10. Análise Comparativa entre Variáveis

visualização comparando os targets 

1. **P-valores baixos** indicam observações não-conformes (possível drift)
2. **Martingales crescentes** sinalizam mudança sistemática no comportamento
3. **Threshold = 20** corresponde a nível de significância α = 0.05

In [None]:
def plotar_comparacao_variaveis(resultados):

    fig, axes = plt.subplots(2, 2, figsize=(16, 10))
    axes = axes.flatten()
    
    for idx, (var, res) in enumerate(resultados.items()):
        if idx >= 4:
            break
            
        ax = axes[idx]
        df = res['df']
        
        # Plotar martingale Power (mais sensível)
        ax.plot(df['DATA'], res['martingales']['power_rnd'], 
                linewidth=2, label='Power Martingale')
        
        # Linha de threshold
        ax.axhline(y=20, color='red', linestyle='--', alpha=0.5)
        
        # Marcar detecções
        for det_idx in res['deteccoes']['power'][:3]:
            ax.axvline(df.iloc[det_idx]['DATA'], color='red', linestyle=':', alpha=0.7)
        
        ax.set_yscale('log')
        ax.set_title(f'{var} - Power Martingale', fontweight='bold')
        ax.set_xlabel('Data')
        ax.set_ylabel('Martingale (log)')
        ax.grid(True, alpha=0.3)
        
        # Adicionar texto com estatísticas
        texto_stats = f"MAE: {res['metricas']['mae']:.2f}\n"
        texto_stats += f"Detecções: {len(res['deteccoes']['power'])}"
        ax.text(0.02, 0.98, texto_stats, transform=ax.transAxes, 
                verticalalignment='top', bbox=dict(boxstyle='round', 
                facecolor='wheat', alpha=0.5))
    
    plt.suptitle(f'Comparação de Drift entre Variáveis - Agência {AGENCIA_ESCOLHIDA}', 
                 fontsize=16, fontweight='bold')
    plt.tight_layout()
    plt.show()


In [None]:

plotar_comparacao_variaveis(resultados)

para drif de dias consecutivos:

def detectar_drift_com_filtro(martingale, threshold=20, min_consecutivos=3):
    """
    Detecta drift exigindo múltiplas detecções consecutivas.
    """
    deteccoes_brutas = martingale > threshold
    deteccoes_filtradas = []
    
    contador = 0
    for i, detectado in enumerate(deteccoes_brutas):
        if detectado:
            contador += 1
            if contador >= min_consecutivos:
                # Adiciona o primeiro índice da sequência
                deteccoes_filtradas.append(i - min_consecutivos + 1)
                contador = 0  # Reset para evitar múltiplas detecções
        else:
            contador = 0
    
    return np.array(deteccoes_filtradas)


### 11. Resumo da Análise


In [None]:
def gerar_resumo_analise(resultados):

    print("\n" + "="*70)
    print(f" RESUMO DA ANÁLISE DE DRIFT - AGÊNCIA {AGENCIA_ESCOLHIDA}")
    print("="*70)
    
    # Criar DataFrame com resumo
    resumo_data = []
    
    for var, res in resultados.items():
        df = res['df']
        primeira_data = df['DATA'].min()
        ultima_data = df['DATA'].max()
        
        # Detecções Power Martingale
        det_power = res['deteccoes']['power']
        primeira_det_power = df.iloc[det_power[0]]['DATA'] if len(det_power) > 0 else None
        
        # Detecções Simple Jumper
        det_jumper = res['deteccoes']['jumper']
        primeira_det_jumper = df.iloc[det_jumper[0]]['DATA'] if len(det_jumper) > 0 else None
        
        resumo_data.append({
            'Variável': var,
            'Período': f"{primeira_data.date()} a {ultima_data.date()}",
            'Registros': len(df),
            'MAE': f"{res['metricas']['mae']:.2f}",
            'RMSE': f"{res['metricas']['rmse']:.2f}",
            'Detecções Power': len(det_power),
            '1ª Det. Power': primeira_det_power.date() if primeira_det_power else 'Nenhuma',
            'Detecções Jumper': len(det_jumper),
            '1ª Det. Jumper': primeira_det_jumper.date() if primeira_det_jumper else 'Nenhuma'
        })
    
    df_resumo = pd.DataFrame(resumo_data)
    
    # Exibir tabela
    print("\n Tabela Resumo:")
    print(df_resumo.to_string(index=False))
    
    # Análise de período pandemia (se aplicável)
    print("\n Análise de Período Especial (Pandemia):")
    data_inicio_pandemia = pd.to_datetime('2020-03-01')
    
    for var, res in resultados.items():
        df = res['df']
        
        # Verificar se há dados no período da pandemia
        mask_pandemia = df['DATA'] >= data_inicio_pandemia
        if mask_pandemia.any():
            idx_pandemia = df[mask_pandemia].index[0]
            
            # Verificar detecções próximas ao início da pandemia
            det_power = res['deteccoes']['power']
            det_proximas = det_power[(det_power >= idx_pandemia - 30) & 
                                   (det_power <= idx_pandemia + 90)]
            
            if len(det_proximas) > 0:
                primeira_det = df.iloc[det_proximas[0]]['DATA']
                dias_apos_pandemia = (primeira_det - data_inicio_pandemia).days
                print(f"\n   {var}: Drift detectado {dias_apos_pandemia} dias após início da pandemia")
                print(f"         Data da detecção: {primeira_det.date()}")
            else:
                print(f"\n   {var}: Nenhum drift detectado no período da pandemia")
    
    print("\n" + "="*70)
    
    return df_resumo


In [None]:

resumo_final = gerar_resumo_analise(resultados)


### 12. Exportar Resultados


In [None]:
def exportar_resultados(resultados, agencia):

    import os
    
    # Criar diretório para resultados
    dir_resultados = f'resultados_drift_agencia_{agencia}'
    if not os.path.exists(dir_resultados):
        os.makedirs(dir_resultados)
    
    for var, res in resultados.items():
        # Exportar DataFrame com análise
        df_export = res['df'].copy()
        
        # Adicionar colunas de p-valores e martingales
        df_export['p_valor_det'] = res['p_valores']['det']
        df_export['p_valor_rnd'] = res['p_valores']['rnd']
        df_export['martingale_power'] = res['martingales']['power_rnd']
        df_export['martingale_jumper'] = res['martingales']['jumper_rnd']
        
        # Marcar detecções
        df_export['drift_detectado_power'] = False
        df_export.loc[res['deteccoes']['power'], 'drift_detectado_power'] = True
        
        df_export['drift_detectado_jumper'] = False
        df_export.loc[res['deteccoes']['jumper'], 'drift_detectado_jumper'] = True
        
        # Salvar arquivo
        filename = f'{dir_resultados}/analise_drift_{var}_agencia_{agencia}.csv'
        df_export.to_csv(filename, index=False)
        print(f"Resultados de {var} salvos em: {filename}")