# Notebook de Logging, Grid Search e Visualização de Resultados

Este notebook consolida: logging estruturado em JSON, grid search e visualizações de métricas / hiperparâmetros.

## 1. Exportar Dependências para requirements.txt

Opções:
1. Congelar tudo (`pip freeze`) – menos controlado.
2. Selecionar apenas libs nucleares que importamos.

Abaixo: coleta módulos carregados relevantes + interseção com `pip freeze`. 

In [None]:
import subprocess
import sys
import json
import pathlib
from importlib import util

# Fix para subprocess e parsing de requirements

CORE = {"numpy", "pandas", "scikit-learn", "scikit-multiflow", "matplotlib", "seaborn"}

try:
    raw_freeze = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'], text=True).strip().split('\n')
    parsed = {}
    for line in raw_freeze:
        if '==' in line:
            pkg, ver = line.split('==', 1)
            parsed[pkg.lower()] = line

    selected = [parsed[p.lower()] for p in CORE if p.lower() in parsed]
    print('Selecionados:')
    print('\n'.join(sorted(selected)))

    # Create parent directory if it doesn't exist
    parent_dir = pathlib.Path('../')
    parent_dir.mkdir(exist_ok=True)

    requirements_path = parent_dir / 'requirements_minimal.txt'
    with open(requirements_path, 'w') as f:
        f.write('\n'.join(sorted(selected)) + '\n')
    print(f'Arquivo gravado em {requirements_path}')

except subprocess.CalledProcessError as e:
    print(f'Erro ao executar pip freeze: {e}')
except Exception as e:
    print(f'Erro: {e}')

## 2. Configurar Logging JSON Estruturado

Criaremos um logger com formatter que serializa dicts para JSON, incluindo campos extra.

In [None]:
import logging
import logging.config
import json
import sys
import os
import time
import pathlib
from datetime import datetime, timezone

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord):
        base = {
            'ts': datetime.now(timezone.utc).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'msg': record.getMessage(),
            'correlation_id': getattr(record, 'correlation_id', None)
        }
        # Merge extras (safe)
        excluded_attrs = {
            'msg', 'args', 'exc_info', 'exc_text', 'stack_info', 'levelno',
            'levelname', 'name', 'thread', 'threadName', 'processName',
            'process', 'created', 'msecs', 'relativeCreated', 'pathname',
            'filename', 'module', 'lineno', 'funcName'
        }

        for k, v in record.__dict__.items():
            if k.startswith('_') or k in base or k in excluded_attrs:
                continue
            try:
                json.dumps(v)  # test serializável
                base[k] = v
            except Exception:
                base[k] = repr(v)

        if record.exc_info:
            base['exception'] = self.formatException(record.exc_info)
        return json.dumps(base, ensure_ascii=False)

# Ensure results directory exists
results_dir = pathlib.Path('../results')
results_dir.mkdir(exist_ok=True)

LOG_CFG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {'()': JsonFormatter}
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'json',
            'stream': 'ext://sys.stdout'
        },
        'file': {
            'class': 'logging.FileHandler',
            'formatter': 'json',
            'filename': str(results_dir / 'notebook_log.jsonl'),
            'mode': 'a',
            'encoding': 'utf-8'
        }
    },
    'root': {
        'handlers': ['console', 'file'],
        'level': 'INFO'
    }
}

logging.config.dictConfig(LOG_CFG)
logger = logging.getLogger(__name__)
logger.info('Logger inicializado', extra={'phase': 'init'})

## 3. Adicionar Correlation ID e Context Manager
Usamos `contextvars` para propagar um correlation_id.

In [None]:
import contextvars
import uuid
import functools
import contextlib

correlation_var = contextvars.ContextVar('correlation_id', default=None)

class CorrelationFilter(logging.Filter):
    def filter(self, record):
        cid = correlation_var.get()
        if cid:
            record.correlation_id = cid
        return True

# Add correlation filter to all existing handlers
for handler in logging.getLogger().handlers:
    handler.addFilter(CorrelationFilter())

@contextlib.contextmanager
def correlation_context(cid: str = None):
    """Context manager for correlation ID"""
    if cid is None:
        cid = str(uuid.uuid4())
    token = correlation_var.set(cid)
    try:
        yield correlation_var.get()
    finally:
        correlation_var.reset(token)

# Demo usage
with correlation_context() as cid:
    logger.info('Exemplo de log com correlation id', extra={'cid_demo': True, 'correlation_id_demo': cid})

## 4. Rotação e Compressão de Logs
Pós-processamos arquivos rotacionados compactando-os em `.gz`. Exemplo simplificado abaixo.

In [None]:
import gzip, shutil, glob
from logging.handlers import TimedRotatingFileHandler

class GzipTimedRotatingFileHandler(TimedRotatingFileHandler):
    def rotate(self, source, dest):
        super().rotate(source, dest)
        if os.path.exists(dest):
            with open(dest,'rb') as f_in, gzip.open(dest + '.gz','wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
            os.remove(dest)

# (Demonstração, não substitui handler configurado previamente)

## 5. Função de Métricas e Fórmulas

$Precision=\frac{TP}{TP+FP}$  \
$Recall=\frac{TP}{TP+FN}$  \
$F1=\frac{2PR}{P+R}$

Implementação simples:

In [None]:
from typing import Dict

def compute_metrics(tp:int, fp:int, fn:int, tn:int|None=None) -> Dict[str,float]:
    precision = tp / (tp + fp) if (tp+fp)>0 else 0.0
    recall = tp / (tp + fn) if (tp+fn)>0 else 0.0
    f1 = (2*precision*recall/(precision+recall)) if (precision+recall)>0 else 0.0
    acc = None
    if tn is not None:
        acc = (tp + tn) / (tp + tn + fp + fn) if (tp+tn+fp+fn)>0 else 0.0
    return {'precision':precision,'recall':recall,'f1':f1,'accuracy':acc}

print(compute_metrics(10,5,2, tn=50))

## 6. Definir Espaço de Busca (Grid Search)
Exemplo genérico de hiperparâmetros (ilustrativo).

In [None]:
import itertools, pandas as pd
param_grid = {
  'lr':[1e-4,1e-3],
  'batch_size':[16,32],
  'hidden':[64,128]
}
rows=[]
for combo in itertools.product(*param_grid.values()):
    rows.append(dict(zip(param_grid.keys(), combo)))
param_df = pd.DataFrame(rows)
param_df

## 7. Função de Treinamento/Avaliação do Modelo (Exemplo)
Usamos LogisticRegression do scikit-learn para demonstrar.

In [None]:
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np, time

X, y = make_classification(n_samples=1500, n_features=20, n_informative=8, random_state=42)
Xtr, Xte, ytr, yte = train_test_split(X,y,test_size=0.3, random_state=42)

def train_eval(params: dict):
    start = time.time()
    model = LogisticRegression(max_iter=500, C=1/params['lr'])
    model.fit(Xtr, ytr)
    pred = model.predict(Xte)
    p = precision_score(yte, pred)
    r = recall_score(yte, pred)
    f1 = f1_score(yte, pred)
    return {**params, 'precision':p,'recall':r,'f1':f1,'duration': time.time()-start}

print(train_eval({'lr':1e-3,'batch_size':32,'hidden':64}))

## 8. Executor de Grid Search Paralelo (joblib)

In [None]:
from joblib import Parallel, delayed
results = Parallel(n_jobs=-1)(delayed(train_eval)(row) for row in rows)
len(results), results[0]

## 9. Persistir e Carregar Resultados do Grid Search
Salvamos JSONL + Parquet.

In [None]:
import json, pandas as pd
import os

# Verify pyarrow is available
try:
    import pyarrow as pa
    import pyarrow.parquet as pq
    PARQUET_AVAILABLE = True
except ImportError:
    print("Warning: pyarrow not available, skipping Parquet export")
    PARQUET_AVAILABLE = False

res_df = pd.DataFrame(results)
os.makedirs('../results', exist_ok=True)

# Save to JSONL format
with open('../results/grid_results.jsonl','w') as f:
    for rec in res_df.to_dict(orient='records'):
        f.write(json.dumps(rec) + '\n')

# Save to Parquet format if available
if PARQUET_AVAILABLE:
    table = pa.Table.from_pandas(res_df)
    pq.write_table(table, '../results/grid_results.parquet')
    print('Persistido em JSONL e Parquet.')
else:
    print('Persistido em JSONL apenas.')

## 10. Gerar Script Externo de Grid Search (arquivo .py)
Criamos um script modular reutilizável.

In [None]:
script_code = '''#!/usr/bin/env python3
import argparse
import json
import itertools
import time
import os
from joblib import Parallel, delayed
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score

def train_eval(params, data):
    """Train and evaluate model with given parameters"""
    Xtr, Xte, ytr, yte = data
    start = time.time()
    model = LogisticRegression(max_iter=500, C=1/params['lr'])
    model.fit(Xtr, ytr)
    pred = model.predict(Xte)

    precision = precision_score(yte, pred)
    recall = recall_score(yte, pred)
    f1 = f1_score(yte, pred)
    duration = time.time() - start

    return {
        **params,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'duration': duration
    }

def main():
    parser = argparse.ArgumentParser(description='Grid Search for ML model')
    parser.add_argument('--out', default='results/gs_external.jsonl', help='Output file path')
    parser.add_argument('--n-jobs', type=int, default=-1, help='Number of parallel jobs')
    args = parser.parse_args()

    # Create results directory
    os.makedirs('results', exist_ok=True)

    # Generate synthetic data
    X, y = make_classification(
        n_samples=1200,
        n_features=20,
        n_informative=8,
        random_state=42
    )
    Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.3, random_state=42)

    # Define grid
    grid = {
        'lr': [1e-4, 1e-3],
        'batch_size': [16, 32],
        'hidden': [64, 128]
    }

    # Generate parameter combinations
    combos = [
        dict(zip(grid.keys(), vals))
        for vals in itertools.product(*grid.values())
    ]

    print(f'Running grid search with {len(combos)} combinations...')

    # Run parallel grid search
    results = Parallel(n_jobs=args.n_jobs)(
        delayed(train_eval)(params, (Xtr, Xte, ytr, yte))
        for params in combos
    )

    # Save results
    with open(args.out, 'w') as f:
        for result in results:
            f.write(json.dumps(result) + '\\n')

    print(f'Results saved to {args.out}')

if __name__ == '__main__':
    main()
'''

# Write the script to file
script_path = '../grid_search_external.py'
with open(script_path, 'w') as f:
    f.write(script_code)
print(f'Script escrito em {script_path}')

## 11. Notebook de Visualização: Carregar Resultados Salvos
Carrega JSONL ou Parquet gerados anteriormente.

In [None]:
import pandas as pd, json
from pathlib import Path
jsonl_path = Path('../results/grid_results.jsonl')
rows=[]
if jsonl_path.exists():
    with open(jsonl_path) as f:
        for line in f:
            rows.append(json.loads(line))
vis_df = pd.DataFrame(rows)
vis_df.head()

## 12. Visualizações: Heatmap de Hiperparâmetros
Pivot de f1 por lr x batch_size.

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

if not vis_df.empty and 'lr' in vis_df.columns and 'batch_size' in vis_df.columns and 'f1' in vis_df.columns:
    try:
        pivot = vis_df.pivot_table(index='lr', columns='batch_size', values='f1', aggfunc='mean')
        plt.figure(figsize=(5, 4))
        sns.heatmap(pivot, annot=True, cmap='viridis', fmt='.3f')
        plt.title('F1 por lr x batch_size')
        plt.tight_layout()
        plt.show()
    except Exception as e:
        print(f'Erro ao criar heatmap: {e}')
else:
    print('DataFrame vazio ou colunas ausentes para heatmap')

## 13. Visualizações: Distribuições e Pareto

In [None]:
if not vis_df.empty and 'f1' in vis_df.columns and 'duration' in vis_df.columns:
    try:
        fig, ax = plt.subplots(1, 2, figsize=(10, 4))

        # F1 distribution histogram
        vis_df['f1'].hist(ax=ax[0], bins=10, alpha=0.7)
        ax[0].set_title('Distribuição F1')
        ax[0].set_xlabel('F1 Score')
        ax[0].set_ylabel('Frequência')

        # Pareto plot: duration vs F1
        ax[1].scatter(vis_df['duration'], vis_df['f1'], alpha=0.7)
        ax[1].set_xlabel('Duração (s)')
        ax[1].set_ylabel('F1 Score')
        ax[1].set_title('Pareto: Tempo vs F1')

        plt.tight_layout()
        plt.show()
    except Exception as e:
        print(f'Erro ao criar visualizações: {e}')
else:
    print('Sem dados ou colunas ausentes para distribuições')

## 14. Visualizações: Evolução da Métrica
Ordena execuções e mostra melhor cumulativo.

In [None]:
if not vis_df.empty and 'f1' in vis_df.columns:
    try:
        vis_df_copy = vis_df.copy().reset_index(drop=True)
        best_so_far = vis_df_copy['f1'].cummax()

        plt.figure(figsize=(8, 5))
        plt.plot(vis_df_copy.index, vis_df_copy['f1'], 'o-', label='F1 por execução', alpha=0.7)
        plt.plot(vis_df_copy.index, best_so_far, '--', label='Melhor até aqui', linewidth=2)
        plt.legend()
        plt.xlabel('Número da Execução')
        plt.ylabel('F1 Score')
        plt.title('Evolução do F1 Score')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
    except Exception as e:
        print(f'Erro ao criar gráfico de evolução: {e}')
else:
    print('Sem dados ou coluna F1 ausente para evolução')

## 15. Dashboard Interativo (Plotly)

In [None]:
try:
    import plotly.express as px
    PLOTLY_AVAILABLE = True
except ImportError:
    print("Warning: plotly not available, skipping 3D visualization")
    PLOTLY_AVAILABLE = False

if PLOTLY_AVAILABLE and not vis_df.empty:
    required_cols = ['lr', 'hidden', 'f1', 'batch_size']
    if all(col in vis_df.columns for col in required_cols):
        try:
            fig = px.scatter_3d(
                vis_df,
                x='lr',
                y='hidden',
                z='f1',
                color='batch_size',
                size='f1',
                title='Exploração 3D dos Hiperparâmetros',
                labels={'lr': 'Learning Rate', 'hidden': 'Hidden Units', 'f1': 'F1 Score'}
            )
            fig.show()
        except Exception as e:
            print(f'Erro ao criar plot 3D: {e}')
    else:
        missing_cols = [col for col in required_cols if col not in vis_df.columns]
        print(f'Colunas ausentes para plot 3D: {missing_cols}')
else:
    if not PLOTLY_AVAILABLE:
        print('Plotly não disponível')
    else:
        print('Sem dados para plot interativo')

## 16. Integração Logging + Resultados (Enriquecer Registros)
Função para logar resumo formatado.

In [None]:
def log_result(record: dict):
    """Log a result record with proper serialization"""
    try:
        # Ensure all values are JSON serializable
        serializable_record = {}
        for key, value in record.items():
            try:
                json.dumps(value)
                serializable_record[key] = value
            except (TypeError, ValueError):
                serializable_record[key] = str(value)

        logger.info('Grid search result', extra={
            'tag': 'result',
            'payload': serializable_record
        })
    except Exception as e:
        logger.error(f'Failed to log result: {e}', extra={'error_type': 'log_result_error'})

# Log best result if available
if not vis_df.empty and 'f1' in vis_df.columns:
    try:
        best_result = vis_df.sort_values('f1', ascending=False).iloc[0]
        log_result(best_result.to_dict())
        print(f"Melhor resultado logado: F1 = {best_result['f1']:.4f}")
    except Exception as e:
        logger.error(f'Failed to log best result: {e}')
        print(f'Erro ao logar melhor resultado: {e}')
else:
    logger.warning('Sem resultados para log_result', extra={'warning_type': 'no_results'})

## 17. Exportar Relatório Consolidado
Gera markdown com top configuração e estatísticas.

In [None]:
import json
from statistics import mean, pstdev

report_path = '../results/report.md'

if not vis_df.empty:
    top = vis_df.sort_values('f1', ascending=False).iloc[0]
    stats = {
        'f1_mean': vis_df['f1'].mean(),
        'f1_std': vis_df['f1'].std(),
        'f1_min': vis_df['f1'].min(),
        'f1_max': vis_df['f1'].max(),
        'n': len(vis_df)
    }

    # Generate markdown report with proper formatting
    md_lines = [
        '# Relatório Grid Search',
        '',
        '## Top Configuração',
        '```json',
        json.dumps(top.to_dict(), indent=2),
        '```',
        '',
        '## Estatísticas',
        '```json',
        json.dumps(stats, indent=2),
        '```'
    ]

    with open(report_path, 'w') as f:
        f.write('\n'.join(md_lines))

    logger.info('Relatório gerado', extra={'report_path': report_path, 'top_f1': float(top['f1'])})
    print('Gerado', report_path)
else:
    print('Sem dados para relatório')