## Imports e configuração de paths do projeto.

In [8]:
import os
import json
import time
import math
import random
import inspect
import argparse
from dataclasses import dataclass
from typing import Dict, List, Tuple, Callable, Any
from pathlib import Path
from datetime import datetime

import numpy as np

# ------------------------------------------------------------------------------
# Ajuste de PATH para importar o pacote do zip hifdm_optimization
# ------------------------------------------------------------------------------
HERE = Path(os.getcwd())  # D:\TCCII\Dados\tcc_rafael
ROOT = HERE.parent  # D:\TCCII\Dados
if str(ROOT) not in os.sys.path:
    os.sys.path.insert(0, str(ROOT))

# Pasta Utilities (para o módulo st.py)
UTILS_DIR = ROOT / "hifdm_optimization" / "Utilities"
if str(UTILS_DIR) not in os.sys.path:
    os.sys.path.insert(0, str(UTILS_DIR))

# Importações do seu projeto
from hifdm_optimization.MetodoNunes2022.hifdm import hifdm as hifdm_2022_impl
from hifdm_optimization.MetodoNunes2024.hifdm import hifdm as hifdm_2024_impl
from hifdm_optimization.OpenPL4.openPL4 import readPL4, convertType
from hifdm_optimization.Utilities import st as _st  # para DIAG e ranking

# ------------------------------------------------------------------------------
# Configurações gerais
# ------------------------------------------------------------------------------
DATA_DIR = Path(r"D:\TCCII\Dados\hifdm_optimization\data\sinais_para_otimizar_v2")
RESULTS_DIR = HERE / "results"
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

## Implementação de Algoritmo Genético

### Método de 2022

In [9]:
'''
Estrutura de Algoritmo Genético (GA)
BEGIN 
    INITIALIZE population
    EVALUATE each candidate in population
    WHILE termination criteria not met DO
        SELECT parents from population
        RECOMBINE parents to produce offspring
        MUTATE offspring
        EVALUATE each candidate in offspring
        SELECT individuals for next generation from population and offspring
    END WHILE
END
'''

'\nEstrutura de Algoritmo Genético (GA)\nBEGIN \n    INITIALIZE population\n    EVALUATE each candidate in population\n    WHILE termination criteria not met DO\n        SELECT parents from population\n        RECOMBINE parents to produce offspring\n        MUTATE offspring\n        EVALUATE each candidate in offspring\n        SELECT individuals for next generation from population and offspring\n    END WHILE\nEND\n'

### Geração da população inicial


In [10]:
# Definição da população inicial (geração 0)

# Limites dos parâmetros do método 2022
# Ordem: [gamma, alpha, beta, beta_diff]
PARAM_BOUNDS_2022: dict[str, Tuple[float, float] | Tuple[int, int]] = {
    "gamma":     (0.0005, 0.0100),  # sensibilidade do limiar diferencial
    "alpha":     (1.01,   1.20  ),  # fator dos limiares adaptativos
    "beta":      (10,     60    ),  # peso de energias (inteiro)
    "beta_diff": (1.05,   2.50  ),  # peso do limiar diferencial
}

# Duas configurações "sementes" úteis 
SEEDS_2022: List[List[float]] = [
    [0.0040, 1.05, 40, 1.50],  # semente do artigo
    [0.0035, 1.04, 30, 1.30],  # semente mais sensível
]

def _sample_individual_2022(rng: random.Random) -> List[float]:
    g_lo, g_hi = PARAM_BOUNDS_2022["gamma"]
    a_lo, a_hi = PARAM_BOUNDS_2022["alpha"]
    b_lo, b_hi = PARAM_BOUNDS_2022["beta"]
    d_lo, d_hi = PARAM_BOUNDS_2022["beta_diff"]

    gamma = round(rng.uniform(g_lo, g_hi), 6)     # 6 casas para granularidade fina
    alpha = round(rng.uniform(a_lo, a_hi), 3)     # 3 casas é suficiente
    beta  = int(rng.randint(int(b_lo), int(b_hi)))  # inteiro
    bdiff = round(rng.uniform(d_lo, d_hi), 3)

    return [gamma, alpha, beta, bdiff]


def init_population_2022(
    pop_size: int = 10,
    seed: int | None = 42,
    include_seeds: bool = True,
) -> List[List[float]]:
    """
    Gera população inicial para GA do método 2022.
    - pop_size: tamanho da população
    - seed: para reprodutibilidade
    - include_seeds: inclui as duas sementes fixas antes do sorteio aleatório

    Retorna: lista de indivíduos, cada um sendo [gamma, alpha, beta, beta_diff]
    """
    assert pop_size >= 2, "população muito pequena (>=2)"
    rng = random.Random(seed)

    pop: List[List[float]] = []
    seen: set[Tuple[float, float, int, float]] = set()

    if include_seeds:
        for s in SEEDS_2022:
            ind = [round(float(s[0]), 6), round(float(s[1]), 3), int(s[2]), round(float(s[3]), 3)]
            key = (ind[0], ind[1], ind[2], ind[3])
            if key not in seen:
                pop.append(ind)
                seen.add(key)

    while len(pop) < pop_size:
        ind = _sample_individual_2022(rng)
        key = (ind[0], ind[1], ind[2], ind[3])
        if key not in seen:
            pop.append(ind)
            seen.add(key)

    return pop

In [11]:
# Testa a geração da população inicial
pop = init_population_2022(pop_size=10, seed=123, include_seeds=True)
for i, ind in enumerate(pop, 1):
    print(f"ind {i:02d}: {ind}")

ind 01: [0.004, 1.05, 40, 1.5]
ind 02: [0.0035, 1.04, 30, 1.3]
ind 03: [0.000997, 1.027, 36, 1.437]
ind 04: [0.008466, 1.176, 34, 1.827]
ind 05: [0.003656, 1.172, 20, 1.246]
ind 06: [0.005829, 1.143, 20, 1.052]
ind 07: [0.004645, 1.027, 48, 1.598]
ind 08: [0.000563, 1.149, 16, 2.363]
ind 09: [0.001381, 1.037, 60, 2.365]
ind 10: [0.003272, 1.092, 40, 1.435]


### Avaliação de cada candidato

#### Funções auxiliares

In [None]:
# ------------------------------------------------------------------------------
# Adaptadores de chamada aos detectores
# ------------------------------------------------------------------------------
@dataclass
class DetectResult:
    detected: bool
    det_cycles: float | None  # ciclos até detecção (se disponível)
    meta: Dict[str, Any]

# ===== Novo utilitário: amostras por ciclo a partir do vetor de tempo =====
def samples_per_cycle_from_time(sig: Dict[str, Any], default_f0: float = 60.0) -> int:
    """
    Calcula de forma robusta o número de amostras por ciclo elétrico, preferindo o vetor 'time'.
    Fallback: se 'time' faltar ou vier ruim, usa fs/f0 dos metadados.
    """
    import numpy as _np

    f0 = float(sig.get("f0", default_f0))

    # 1) Tente pelo time[]
    t = sig.get("time", None)
    if t is not None:
        # mediana do passo elimina outliers ocasionais no início/fim
        dt = float(_np.median(_np.diff(_np.asarray(t, dtype=float))))
        if dt > 0:
            fs_t = 1.0 / dt
            one_cycle_t = int(round(fs_t / f0))
            if one_cycle_t > 0:
                return one_cycle_t  # << normalmente 128 nos seus arquivos

    # 2) Fallback para metadados
    fs_meta = float(sig.get("fs", 0.0))
    one_cycle_meta = int(round(fs_meta / f0)) if (fs_meta > 0 and f0 > 0) else 0
    return one_cycle_meta if one_cycle_meta > 0 else 128  # último fallback


# ------------------------------------------------------------------------------
# Leitura PL4 (helper)
# ------------------------------------------------------------------------------
def openpl4(path: str) -> Dict[str, Any]:
    dfHEAD, data, miscData = readPL4(path)
    dfHEAD = convertType(dfHEAD)
    out = {}
    out["__dfHEAD__"] = dfHEAD
    out["__data__"] = data
    out["__meta__"] = {**miscData, "filename": str(path)}
    for idx, row in dfHEAD.iterrows():
        key = f"{row['TYPE']}:{row['FROM']}-{row['TO']}"
        out[key] = data[:, idx + 1]
    out["time"] = data[:, 0]
    out["fs"] = 1.0 / miscData["deltat"]
    out["f0"] = 60.0
    return out

# ------------------------------------------------------------------------------
# Dataset helpers
# ------------------------------------------------------------------------------
def iter_dataset_pl4(data_dir: Path) -> List[Tuple[Path, int]]:
    """
    Retorna lista [(caminho_pl4, label)], label=1 para HIF (FAI*) e 0 para não-HIF (NFAI).
    """
    data_dir = Path(data_dir)
    positives = []
    for sub in ["FAI", "FAI_com_forno", "FAI_com_gd", "FAI_retificador"]:
        d = data_dir / sub
        if d.exists():
            positives += list(d.rglob("*.pl4"))
    negatives = list((data_dir / "NFAI").glob("*.pl4"))
    items = [(p, 1) for p in positives] + [(p, 0) for p in negatives]
    return items

# ------------------------------------------------------------------------------
# Descoberta de triplets do tipo "BUSx-MEDy" a partir das chaves I-bran:FROM-TO
# ------------------------------------------------------------------------------
def extract_triplet_bases(sig: Dict[str, Any], max_items: int | None = 5) -> List[str]:
    """
    Varre as chaves 'I-bran:FROM-TO' e retorna bases únicas 'BUSx-MEDy' (sem a fase A/B/C).
    Ex.: 'I-bran:BUS10A-MED4A' -> base 'BUS10-MED4'
    """
    bases = []
    for k in list(sig.keys()):
        if not isinstance(k, str):
            continue
        if not k.startswith("I-bran:"):
            continue
        try:
            after = k.split("I-bran:")[1]
            FROM, TO = after.split("-")
            base_from = FROM[:-1] if FROM and FROM[-1].isalpha() else FROM
            base_to = TO[:-1] if TO and TO[-1].isalpha() else TO
            base = f"{base_from}-{base_to}"
            if base not in bases:
                bases.append(base)
        except Exception:
            continue
    if max_items is not None:
        bases = bases[:max_items]
    return bases

def inject_triplet_phases(sig: Dict[str, Any], base: str) -> Dict[str, Any]:
    """
    Dado 'BUSx-MEDy', injeta IA/IB/IC no dicionário a partir de:
      I-bran:BUSxA-MEDyA, I-bran:BUSxB-MEDyB, I-bran:BUSxC-MEDyC
    """
    from_id, to_id = base.split("-")
    keyA = f"I-bran:{from_id}A-{to_id}A"
    keyB = f"I-bran:{from_id}B-{to_id}B"
    keyC = f"I-bran:{from_id}C-{to_id}C"
    IA = sig.get(keyA)
    IB = sig.get(keyB)
    IC = sig.get(keyC)
    if IA is not None:
        sig["IA"] = IA
    if IB is not None:
        sig["IB"] = IB
    if IC is not None:
        sig["IC"] = IC
    if "IN" not in sig and IA is not None and IB is not None and IC is not None:
        sig["IN"] = IA + IB + IC
    return sig

def _build_hifdm_kwargs(
    impl: Callable, signal_dict: Dict[str, Any], parametros: List[float]
) -> tuple[tuple, dict]:
    """
    Retorna (args, kwargs) corretos p/ assinatura:
      - 2022: hifdm(sinal_1d, janela, parametros[, show])
      - 2024: hifdm(Ia, Ib, Ic, amostras, parametros)
    """
    import numpy as _np

    sig = inspect.signature(impl)
    params = list(sig.parameters.values())
    names = [p.name for p in params]

    fs = float(signal_dict.get("fs", 0.0))
    f0 = float(signal_dict.get("f0", 60.0))
    one_cycle = int(round(fs / f0)) if (fs > 0 and f0 > 0) else 0
    if one_cycle <= 0:
        raise ValueError(
            f"Não foi possível calcular amostras por ciclo (fs={fs}, f0={f0})."
        )

    IA = signal_dict.get("IA")
    IB = signal_dict.get("IB")
    IC = signal_dict.get("IC")

    # ====== 2022: hifdm(sinal, janela, parametros) — precisa de vetor 1D ======
    if names and names[0] == "sinal":
        candidates = [(IA, "IA"), (IB, "IB"), (IC, "IC")]
        candidates = [(v, n) for v, n in candidates if v is not None]
        if not candidates:
            raise ValueError("Faltam IA/IB/IC para chamar o método 2022.")
        rms_vals = [
            (float(_np.sqrt(_np.mean(_np.square(c)))), n, c) for c, n in candidates
        ]
        rms_vals.sort(reverse=True)
        sinal_1d = rms_vals[0][2]
        args = [sinal_1d]
        kwargs = {"janela": one_cycle, "parametros": parametros}
        if "show" in names:
            kwargs["show"] = False
        return tuple(args), kwargs

    # ====== 2024 original: hifdm(Ia, Ib, Ic, amostras, parametros) ======
    if names and names[0] == "Ia":
        if IA is None or IB is None or IC is None:
            raise ValueError("IA/IB/IC ausentes para chamar hifdm 2024.")
        args = [IA, IB, IC, one_cycle, parametros]
        kwargs = {}
        return tuple(args), kwargs

    # Fallback genérico (se houver variações)
    kwargs = {}
    if "parametros" in names:
        kwargs["parametros"] = parametros
    if "janela" in names:
        kwargs["janela"] = one_cycle
    if "amostras" in names:
        kwargs["amostras"] = one_cycle
    if "Ia" in names and IA is not None:
        kwargs["Ia"] = IA
    if "Ib" in names and IB is not None:
        kwargs["Ib"] = IB
    if "Ic" in names and IC is not None:
        kwargs["Ic"] = IC
    if "show" in names:
        kwargs["show"] = False
    return tuple(), kwargs

def _call_hifdm_impl(
    impl: Callable, signal_dict: Dict[str, Any], parametros: List[float]
) -> DetectResult:
    try:
        args, kwargs = _build_hifdm_kwargs(impl, signal_dict, parametros)
        print(f"Chamando {impl.__name__} com args={args} kwargs={kwargs}")
        out = impl(*args, **kwargs)
        print(f"{impl.__name__} retornou: {out}")
        if isinstance(out, tuple) and len(out) >= 1:
            detected = bool(out[0])
            det_cycles = float(out[1]) if len(out) > 1 and out[1] is not None else None
            return DetectResult(detected, det_cycles, {"raw": out})

        if isinstance(out, dict):
            detected = bool(
                out.get("detected")
                or out.get("is_hif")
                or out.get("resultado")
                or out.get("trip", False)
            )
            det_cycles = (
                out.get("cycles") or out.get("n_ciclos") or out.get("tempo_ciclos")
            )
            det_cycles = float(det_cycles) if det_cycles is not None else None
            return DetectResult(detected, det_cycles, out)

        return DetectResult(bool(out), None, {"raw": out})

    except Exception as e:
        meta = signal_dict.get("__meta__", {})
        fname = meta.get("filename") or signal_dict.get("source") or "?"
        print(f"[ERR] {impl.__name__} on {fname}: {e}")
        return DetectResult(False, None, {"error": str(e)})
# ------------------------------------------------------------------------------
# WRAPPER 2024 com 5 parâmetros [alpha, zeta, eta, C, N]
# ------------------------------------------------------------------------------
def hifdm_2024_config(
    Ia, Ib, Ic, amostras: int, parametros: List[float]
) -> Tuple[int, int]:
    """
    Implementa o método 2024 permitindo otimizar:
      parametros = [alfa, zeta, eta, C, N]
        - alfa: fator do limiar de energia da fundamental
        - zeta: fator do limiar de rugosidade (3º harmônico)
        - eta: janela (em ciclos) para cálculo da rugosidade
        - C: ciclos de espera após ruptura antes de iniciar confirmação de HIF
        - N: confirmações necessárias para trip
    Retorna (trip, ciclos_processados)
    """
    import numpy as _np
    from statistics import median, stdev

    alfa = float(parametros[0])  # ~[1.01..2.5]
    zeta = float(parametros[1])  # ~[1.01..3.0]
    eta = int(parametros[2])  # janela rugosidade (ciclos) ~[6..18]
    C = int(parametros[3])  # espera pós-ruptura ~[3..20]
    N = int(parametros[4])  # confirmações ~[3..12]

    # buffers de tamanho N (confirmações) como no original
    E1 = [0.0 for _ in range(N)]
    E3 = [0.0 for _ in range(N)]
    GAMMA = [0.0 for _ in range(N)]

    gamma_ene = 0.01
    gamma_r = 0.01

    Ia = _np.array(Ia)
    Ib = _np.array(Ib)
    Ic = _np.array(Ic)
    In = Ia + Ib + Ic

    def energia(espectro):
        return float(np_sum_abs2(espectro))

    # rugosidade a partir das últimas 'eta' energias do 3º harmônico
    def rugosidade(energias: List[float]) -> float:
        n = len(energias)
        if n < 2:
            return 1e-2
        acc = 0.0
        for i in range(1, n):
            d = energias[i] - energias[i - 1]
            acc += d * d
        return max(acc / n, 1e-2)

    tau_1 = 0
    tau_2 = 0
    cont_rupt = 0
    detect_rupt = False
    trip = 0
    ciclo = 0

    # histórico deslizante para rugosidade (eta amostras de E3)
    hist_E3: List[float] = []

    # varre janelas de 1 ciclo
    for ciclo in range(0, len(In) - amostras, amostras):
        # ST do neutro nesta janela
        espectro = _st(In[ciclo : ciclo + amostras], 2)
        fund = espectro[1]
        h3 = espectro[3]

        e1 = energia(fund)
        e3 = energia(h3)

        # atualiza históricos
        E1 = [e1] + E1[:-1]
        E3 = [e3] + E3[:-1]
        hist_E3.append(e3)
        if len(hist_E3) > eta:
            hist_E3.pop(0)

        # rugosidade do 3º harmônico nos últimos 'eta' ciclos
        R = rugosidade(hist_E3)

        # --- Atualização de limiares (igual à lógica do paper/código, mas com α, ζ variáveis)
        if e1 <= max(E1):
            med = median(E1)
            sd = stdev(E1) if len(set(E1)) > 1 else 0.0
            gamma_ene = alfa * (med + sd)

        if len(GAMMA) > 0:
            if R <= max(GAMMA) + (stdev(GAMMA) if len(set(GAMMA)) > 1 else 0.0):
                GAMMA = [R] + GAMMA[:-1]
                sdg = stdev(GAMMA) if len(set(GAMMA)) > 1 else 0.0
                gamma_r = zeta * (max(GAMMA) + sdg)

        # --- Detecção de ruptura (aumento de energia fundamental sustentado)
        if e1 >= gamma_ene:
            tau_1 += 1
        else:
            tau_1 = 0

        if tau_1 >= N:
            detect_rupt = True

        if detect_rupt:
            cont_rupt += 1

        # --- Após aguardar C ciclos, confirma HIF usando rugosidade do 3º
        if cont_rupt >= C:
            if R >= gamma_r:
                tau_2 += 1
            else:
                tau_2 = max(tau_2 - 1, 0)

            if tau_2 >= N:
                trip = 1
                break

    return trip, ciclo // amostras

# ------------------------------------------------------------------------------
# Avaliação em lote (dataset)
# ------------------------------------------------------------------------------
def evaluate_params_on_dataset(
    method: str,
    parametros: List[float],
    data_dir: Path,
    max_pos: int | None = None,
    max_neg: int | None = None,
    seed: int = 42,
) -> Dict[str, Any]:
    """
    Executa o detector em um subconjunto (ou no total) do dataset e produz métricas.
    """
    rng = random.Random(seed)
    all_items = iter_dataset_pl4(data_dir)

    pos = [(p, y) for p, y in all_items if y == 1]
    neg = [(p, y) for p, y in all_items if y == 0]
    rng.shuffle(pos)
    rng.shuffle(neg)
    if max_pos is not None:
        pos = pos[:max_pos]
    if max_neg is not None:
        neg = neg[:max_neg]

    batch = pos + neg
    rng.shuffle(batch)

    TP = FP = TN = FN = 0
    det_times = []
    errors = 0
    used = 0
    t0 = time.time()

    for path, label in batch:
        sig = openpl4(str(path))

        # Normalização mínima: se não houver neutro explícito, compute IN = IA+IB+IC quando possível
        IA = sig.get("IA")
        IB = sig.get("IB")
        IC = sig.get("IC")
        if IA is None or IB is None or IC is None:
            bases = extract_triplet_bases(sig, max_items=1)
            if bases:
                inject_triplet_phases(sig, bases[0])
                IA, IB, IC = sig.get("IA"), sig.get("IB"), sig.get("IC")
        if (
            sig.get("IN") is None
            and IA is not None
            and IB is not None
            and IC is not None
        ):
            sig["IN"] = IA + IB + IC

        fs = float(sig.get("fs", 0.0))
        f0 = float(sig.get("f0", 60.0))
        one_cycle = int(round(fs / f0)) if (fs > 0 and f0 > 0) else 0
        if one_cycle <= 0:
            errors += 1
            continue

        if method == "2024":
            # parametros = [alpha, zeta, eta, C, N]
            if len(parametros) >= 5:
                C = int(parametros[3])
                N = int(parametros[4])
                n_cycles_total = int(
                    len(sig.get("IN", IA if IA is not None else [])) // one_cycle
                )
                if n_cycles_total < (C + N + 5):
                    continue

        if method == "2022":
            print("passou aqui 1", sig)
            print("passou aqui 2", parametros)
            res = _call_hifdm_impl(hifdm_2022_impl, sig, parametros)
        elif method == "2024":
            if IA is None or IB is None or IC is None:
                errors += 1
                continue
            trip, cyc = hifdm_2024_config(IA, IB, IC, one_cycle, parametros)
            res = DetectResult(bool(trip), float(cyc), {"raw": (trip, cyc)})
        else:
            raise ValueError("method deve ser '2022' ou '2024'")

        used += 1
        detected = res.detected
        if "error" in res.meta:
            errors += 1

        if label == 1 and detected:
            TP += 1
            if res.det_cycles is not None:
                det_times.append(res.det_cycles)
        elif label == 1 and not detected:
            FN += 1
        elif label == 0 and detected:
            FP += 1
        else:
            TN += 1

    elapsed = time.time() - t0
    tot = max(1, TP + TN + FP + FN)
    acc = (TP + TN) / tot
    tmean = (sum(det_times) / len(det_times)) if det_times else None

    return {
        "TP": TP,
        "TN": TN,
        "FP": FP,
        "FN": FN,
        "accuracy": acc,
        "tmean_cycles": tmean,
        "n_errors": errors,
        "n_eval": tot,
        "n_used": used,
        "elapsed_sec": elapsed,
    }

# ------------------------------------------------------------------------------
# Função-objetivo
# ------------------------------------------------------------------------------
def objective_from_metrics(
    m: Dict[str, Any], w_fn: float = 2.0, w_fp: float = 1.0, w_time: float = 0.05
) -> float:
    """
    Penaliza mais forte os falsos negativos (perder HIF), depois falsos positivos,
    e levemente o tempo médio de detecção (em ciclos).
    """
    TP, TN, FP, FN = m["TP"], m["TN"], m["FP"], m["FN"]
    tot_pos = max(1, TP + FN)
    tot_neg = max(1, TN + FP)

    fn_rate = FN / tot_pos
    fp_rate = FP / tot_neg
    tmean = m["tmean_cycles"] if m["tmean_cycles"] is not None else 9.0

    return (w_fn * fn_rate) + (w_fp * fp_rate) + (w_time * (tmean / 9.0))

#### Função de avaliação

In [13]:
def evaluate_population_2022(
    population: List[List[float]],
    data_dir: Path,
    subset_pos: int = 12,     # HIF por avaliação (ajuste conforme tempo)
    subset_neg: int = 12,     # NFAI por avaliação
    seed: int = 0,
    weights: Tuple[float, float, float] = (2.0, 1.0, 0.05),
) -> List[Dict[str, Any]]:
    """
    Avalia cada indivíduo (parâmetros [gamma, alpha, beta, beta_diff]) da população
    no método 2022. Retorna uma lista de dicionários com:
      - 'ind': o vetor de parâmetros
      - 'cost': custo calculado por objective_from_metrics
      - 'metrics': métricas retornadas (TP, FP, FN, accuracy, tmean_cycles, etc.)
      - 'elapsed_sec': tempo gasto na avaliação desse indivíduo
    """
    results: List[Dict[str, Any]] = []
    cache: dict[Tuple[float, float, int, float], Dict[str, Any]] = {}
    
    for ind in population:
        # normaliza tipos e chave para cache
        gamma     = round(float(ind[0]), 6)
        alpha     = round(float(ind[1]), 3)
        beta      = int(ind[2])
        beta_diff = round(float(ind[3]), 3)
        key = (gamma, alpha, beta, beta_diff)

        if key in cache:
            results.append(cache[key])
            continue

        t0 = time.time()
        try:
            # roda o detector 2022 no dataset (subconjunto) para obter métricas
            m = evaluate_params_on_dataset(
                method="2022",
                parametros=[gamma, alpha, beta, beta_diff],
                data_dir=data_dir,
                max_pos=subset_pos,
                max_neg=subset_neg,
                seed=seed,
            )
            # converte métricas em custo (menor = melhor)
            c = objective_from_metrics(m, *weights)
            if not math.isfinite(c):
                c = 1e9
        except Exception as e:
            # Em caso de erro, aponta custo alto e registra erro
            m = {
                "TP": 0, "TN": 0, "FP": 0, "FN": 0,
                "accuracy": 0.0, "tmean_cycles": None,
                "n_errors": 1, "n_eval": 0, "elapsed_sec": 0.0,
                "error": str(e),
            }
            c = 1e9

        elapsed = time.time() - t0
        rec = {
            "ind": [gamma, alpha, beta, beta_diff],
            "cost": c,
            "metrics": m,
            "elapsed_sec": elapsed,
        }
        cache[key] = rec
        results.append(rec)

    return results


#### Teste avaliação da população

In [14]:
def _print_table_eval(res_list: List[Dict[str, Any]], max_rows: int | None = None):
    # ordena por custo crescente
    res_sorted = sorted(res_list, key=lambda r: r["cost"])
    if max_rows is not None:
        res_sorted = res_sorted[:max_rows]

    header = (
        "rank | gamma    alpha   beta  b_diff |  cost    |  ACC   TP  FP  FN |  tmean  errs  time(s)"
    )
    print(header)
    print("-" * len(header))
    for i, r in enumerate(res_sorted, 1):
        g, a, b, d = r["ind"]
        m = r["metrics"]
        acc = m.get("accuracy", 0.0)
        TP  = m.get("TP", 0)
        FP  = m.get("FP", 0)
        FN  = m.get("FN", 0)
        tmn = m.get("tmean_cycles", None)
        errs = m.get("n_errors", 0)
        tsec = r.get("elapsed_sec", 0.0)
        tmn_str = f"{tmn:.2f}" if isinstance(tmn, (float, int)) and tmn is not None else " - "
        print(
            f"{i:>4} | {g:>7.4f}  {a:>6.3f}  {b:>4d}  {d:>6.3f} | "
            f"{r['cost']:>7.4f} | {acc:>6.3f}  {TP:>2d}  {FP:>2d}  {FN:>2d} | "
            f"{tmn_str:>6}  {errs:>4d}  {tsec:>7.2f}"
        )


# ====== TESTE LOCAL ======
# 1) gera população inicial de 10 indivíduos
pop = init_population_2022(pop_size=10, seed=123, include_seeds=True)
print("População inicial (10 indivíduos):")

for i, ind in enumerate(pop, 1):
    print(f"  ind {i:02d}: {ind}")

# 2) avalia a população (método 2022) em um subset curto para rodar rápido
print("\nAvaliando população (subset_pos=10, subset_neg=10)...")
eval_res = evaluate_population_2022(
    population=pop,
    data_dir=DATA_DIR,
    subset_pos=1,
    subset_neg=1,
    seed=0,
    weights=(2.0, 1.0, 0.05),
)

# 3) imprime uma tabela ordenada por custo
print("\nResultado da avaliação (ordenado por custo):")
_print_table_eval(eval_res, max_rows=None)


População inicial (10 indivíduos):
  ind 01: [0.004, 1.05, 40, 1.5]
  ind 02: [0.0035, 1.04, 30, 1.3]
  ind 03: [0.000997, 1.027, 36, 1.437]
  ind 04: [0.008466, 1.176, 34, 1.827]
  ind 05: [0.003656, 1.172, 20, 1.246]
  ind 06: [0.005829, 1.143, 20, 1.052]
  ind 07: [0.004645, 1.027, 48, 1.598]
  ind 08: [0.000563, 1.149, 16, 2.363]
  ind 09: [0.001381, 1.037, 60, 2.365]
  ind 10: [0.003272, 1.092, 40, 1.435]

Avaliando população (subset_pos=10, subset_neg=10)...
passou aqui 1 {'__dfHEAD__':       TYPE    FROM      TO
0   V-node   BUS3A        
1   V-node   BUS3B        
2   V-node   BUS3C        
3   V-node  BUS10A        
4   V-node  BUS10B        
..     ...     ...     ...
64  I-bran    GERA    SUBA
65  I-bran    GERB    SUBB
66  I-bran    GERC    SUBC
67       2    TACS  XX0021
68       2    TACS     FAI

[69 rows x 3 columns], '__data__': memmap([[ 0.0000000e+00,  4.4723682e+02,  8.0061528e+03, ...,
         -5.2452063e+02,  0.0000000e+00,  0.0000000e+00],
        [ 1.3020834e-0