In [2]:
import sys, os, multiprocessing as mp
from typing import List, Literal, Dict, Tuple, Optional
import numpy as np
import pandas as pd
from pvlib.iotools import read_epw
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed

# === LOGS IMEDIATOS E EVITAR OVER-SUBSCRIPTION DE THREADS ===
try:
    sys.stdout.reconfigure(line_buffering=True)
except Exception:
    pass
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("NUMEXPR_MAX_THREADS", "1")

# -----------------------------
# Seleção de potência nominal
# -----------------------------
def select_nominal_power(
    required_kw: float,
    available_kw: List[float],
    method: Literal["ceil", "nearest"] = "ceil"
) -> float:
    available_kw = sorted(available_kw)
    if method == "ceil":
        for p in available_kw:
            if p >= required_kw:
                return p
        return available_kw[-1]
    else:  # nearest
        return min(available_kw, key=lambda x: abs(x - required_kw))

def _select_nominal_power_vectorized(
    req: np.ndarray,
    available: np.ndarray,
    method: str
) -> np.ndarray:
    available = np.sort(available)
    if method == "ceil":
        idx = np.searchsorted(available, req, side="left")
        idx = np.clip(idx, 0, len(available) - 1)
        return available[idx]
    else:  # nearest
        diffs = np.abs(req[:, None] - available[None, :])
        idx = diffs.argmin(axis=1)
        return available[idx]

# -----------------------------
# Carga da tabela do IPT e prep
# -----------------------------
def load_shower_reference(
    ipt_csv_path: str,
    ref_minutes_per_day: float = 8.0,
    min_flow_lpm_filter: float = 4.0
) -> Dict[str, object]:
    chuveiros = pd.read_csv(ipt_csv_path, encoding="latin-1", sep=";", decimal=",")
    chuveiros.columns = [c.strip() for c in chuveiros.columns]

    cols_esperadas = {
        "VAZÃO PARA CONSUMO MENSAL MÍNIMO (l/min)",
        "POTÊNCIA\n(W)",
        "CONSUMO MENSAL MÍNIMO\n(kWh/mês)",
        "CONSUMO MENSAL MÁXIMO\n(kWh/mês)",
        "ELEVAÇÃO DE\nTEMPERATURA PARA CONSUMO MENSAL MÁXIMO\n(ºC)",
    }
    if not cols_esperadas.issubset(set(chuveiros.columns)):
        raise ValueError("As colunas do CSV de chuveiros não estão corretamente nomeadas.")

    chuveiros = chuveiros[
        chuveiros["VAZÃO PARA CONSUMO MENSAL MÍNIMO (l/min)"] >= min_flow_lpm_filter
    ]

    potencias_W = sorted(list(chuveiros["POTÊNCIA\n(W)"].unique()))
    lista_potencias_kW = [w / 1000 for w in potencias_W]

    grupo = chuveiros[
        [
            "POTÊNCIA\n(W)",
            "CONSUMO MENSAL MÍNIMO\n(kWh/mês)",
            "CONSUMO MENSAL MÁXIMO\n(kWh/mês)",
            "ELEVAÇÃO DE\nTEMPERATURA PARA CONSUMO MENSAL MÁXIMO\n(ºC)",
        ]
    ].groupby("POTÊNCIA\n(W)").mean()

    grupo.index = grupo.index / 1000  # índice em kW
    grupo.index.name = "Potencia (kW)"

    grupo["CONSUMO MENSAL MÍNIMO\n(kWh/mês)"] /= (30.42 * ref_minutes_per_day)
    grupo["CONSUMO MENSAL MÁXIMO\n(kWh/mês)"] /= (30.42 * ref_minutes_per_day)

    grupo.columns = [
        "consumo_min_kWh_min",
        "consumo_max_kWh_min",
        "tmax_ref_C",
    ]
    return {
        "lista_potencias_kW": lista_potencias_kW,
        "curva_ref": grupo,
    }

# -----------------------------
# Núcleo vetorizado por EPW
# -----------------------------
def _process_one_epw_all_combos(
    file_path: str,
    hours: List[int],
    thresholds: List[int],
    bath_minutes: float,
    flow_lpm: float,
    efficiency: float,
    temp_usage_mode: str,
    temp_fixed_c: float,
    delta_min_c: float,
    curva_ref: pd.DataFrame,
    lista_potencias_kW: List[float],
    power_selection: str,
    north_northeast_states: set,
) -> Tuple[List[Dict[str, object]], Optional[str]]:
    try:
        # Parsing de nome do arquivo
        file = os.path.basename(file_path)
        parts = file.replace(".epw", "").split("_")
        state = parts[1] if len(parts) > 1 else "UF?"
        year_info = parts[-1]

        if "TMYx" in year_info:
            year = "2021"
            scenario = "Baseline"
        else:
            if len(parts) >= 3:
                scenario = parts[-2]
                year = parts[-1]
            else:
                scenario = "Desconhecido"
                year = "Desconhecido"

        # Temperatura de uso (alvo)
        if temp_usage_mode.upper() == "INI-R":
            temp_uso = 38.0 if state in north_northeast_states else 40.0
        elif temp_usage_mode.upper() == "FIXED":
            temp_uso = float(temp_fixed_c)
        else:
            raise ValueError("temp_usage_mode deve ser 'INI-R' ou 'FIXED'.")

        epw_data, meta = read_epw(file_path)
        city = meta.get("city", "Unknown")
        latitude = meta.get("latitude", None)
        longitude = meta.get("longitude", None)

        # Pré-processo
        idx = epw_data.index
        hours_col = idx.hour
        dates_col = idx.normalize()
        temps = epw_data["temp_air"].to_numpy()
        unique_days = pd.Index(np.unique(dates_col))

        available = np.array(sorted(lista_potencias_kW), dtype=float)

        rows_out: List[Dict[str, object]] = []

        # Para cada hora, pré-computar vetores por dia
        for h in hours:
            mask_h = (hours_col == h)
            if not np.any(mask_h):
                for tlim in thresholds:
                    rows_out.append({
                        "city": city, "state": state, "latitude": latitude, "longitude": longitude,
                        "year": year, "scenario": scenario, "bath_hours": str(h),
                        "bath_minutes": bath_minutes, "flow_lpm": flow_lpm, "efficiency": efficiency,
                        "temp_limit_c": tlim, "temp_usage_mode": temp_usage_mode,
                        "temp_usage_target_c": temp_uso, "power_selection": power_selection,
                        "consumo_anual_kWh": 0.0, "consumo_dia_min_kWh": 0.0, "consumo_dia_max_kWh": 0.0,
                        "required_kw_p95": 0.0, "nominal_kw_p95": 0.0, "required_kw_max": 0.0, "nominal_kw_max": 0.0,
                    })
                continue

            df_h = pd.DataFrame({"date": dates_col[mask_h], "temp": temps[mask_h]})
            per_day = df_h.groupby("date", sort=True)["temp"].mean()
            per_day = per_day.reindex(unique_days)
            temp_vec = per_day.to_numpy()

            # Potências requeridas e nominais (vetorizado)
            delta_T = (temp_uso - temp_vec)
            req_kw = (flow_lpm * 60.0 * delta_T) / (860.0 * efficiency)
            req_kw = np.nan_to_num(req_kw, nan=0.0)
            req_kw = np.maximum(req_kw, 0.0)

            nom_kw = _select_nominal_power_vectorized(req_kw, available, power_selection)

            ref_rows = curva_ref.reindex(pd.Series(nom_kw).values)
            cons_min = ref_rows["consumo_min_kWh_min"].to_numpy()
            cons_max = ref_rows["consumo_max_kWh_min"].to_numpy()
            t_max  = ref_rows["tmax_ref_C"].to_numpy()

            delta_for_cons = np.clip(delta_T, delta_min_c, None)
            delta_for_cons = np.minimum(delta_for_cons, t_max)
            use_linear = (t_max > delta_min_c)
            cons_per_min = np.where(
                use_linear,
                ((delta_for_cons - delta_min_c) * (cons_max - cons_min) / (t_max - delta_min_c)) + cons_min,
                cons_min
            )
            cons_per_min = np.nan_to_num(cons_per_min, nan=0.0)

            for tlim in thresholds:
                mask_banho = (temp_vec < tlim)
                daily_cons = np.where(mask_banho, bath_minutes * cons_per_min, 0.0)
                req_vec    = np.where(mask_banho, req_kw, 0.0)
                nom_vec    = np.where(mask_banho, nom_kw, 0.0)

                rows_out.append({
                    "city": city, "state": state, "latitude": latitude, "longitude": longitude,
                    "year": year, "scenario": scenario, "bath_hours": str(h),
                    "bath_minutes": bath_minutes, "flow_lpm": flow_lpm, "efficiency": efficiency,
                    "temp_limit_c": tlim, "temp_usage_mode": temp_usage_mode,
                    "temp_usage_target_c": temp_uso, "power_selection": power_selection,
                    "consumo_anual_kWh": round(float(daily_cons.sum()), 3),
                    "consumo_dia_min_kWh": round(float(daily_cons.min()) if daily_cons.size else 0.0, 4),
                    "consumo_dia_max_kWh": round(float(daily_cons.max()) if daily_cons.size else 0.0, 4),
                    "required_kw_p95": round(float(np.quantile(req_vec, 0.95)) if req_vec.size else 0.0, 3),
                    "nominal_kw_p95": round(float(np.quantile(nom_vec, 0.95)) if nom_vec.size else 0.0, 3),
                    "required_kw_max": round(float(req_vec.max()) if req_vec.size else 0.0, 3),
                    "nominal_kw_max": round(float(nom_vec.max()) if nom_vec.size else 0.0, 3),
                })

        return rows_out, None
    except Exception as e:
        return [], f"Erro ao processar {os.path.basename(file_path)}: {e}"

# -----------------------------
# Versão original (mantida)
# -----------------------------
def process_epw_files(
    epw_folder: str,
    output_csv_name: str = "consolidated",
    output_folder: str = "Output",
    bath_hours: List[int] = [18],
    bath_minutes: float = 8.0,
    flow_lpm: float = 4.0,
    efficiency: float = 0.95,
    temp_limit_c: float = 25.0,
    temp_usage_mode: Literal["INI-R", "FIXED"] = "INI-R",
    temp_fixed_c: float = 40.0,
    delta_min_c: float = 10.0,
    ipt_csv_path: str = "IPT_Chuveiros_Eletricos.csv",
    ref_minutes_per_day: float = 8.0,
    min_flow_lpm_filter: float = 4.0,
    power_selection: Literal["ceil", "nearest"] = "ceil",
) -> pd.DataFrame:
    north_northeast_states = {
        "AC","AP","AM","PA","RO","RR","TO",
        "AL","BA","CE","MA","PB","PE","PI","RN","SE"
    }

    ref = load_shower_reference(
        ipt_csv_path=ipt_csv_path,
        ref_minutes_per_day=ref_minutes_per_day,
        min_flow_lpm_filter=min_flow_lpm_filter,
    )
    lista_potencias_kW = ref["lista_potencias_kW"]
    curva_ref = ref["curva_ref"]

    epw_files = [f for f in os.listdir(epw_folder) if f.lower().endswith(".epw")]
    total_files = len(epw_files)

    os.makedirs(output_folder, exist_ok=True)

    output_rows = []
    start_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    print(f"Início: {start_ts} | EPWs: {total_files}")

    for idx, file in enumerate(epw_files, start=1):
        file_path = os.path.join(epw_folder, file)
        try:
            rows, err = _process_one_epw_all_combos(
                file_path=file_path,
                hours=bath_hours,
                thresholds=[int(temp_limit_c)],
                bath_minutes=bath_minutes,
                flow_lpm=flow_lpm,
                efficiency=efficiency,
                temp_usage_mode=temp_usage_mode,
                temp_fixed_c=temp_fixed_c,
                delta_min_c=delta_min_c,
                curva_ref=curva_ref,
                lista_potencias_kW=lista_potencias_kW,
                power_selection=power_selection,
                north_northeast_states=north_northeast_states,
            )
            if err:
                print(err, flush=True)
                continue
            output_rows.extend(rows)
            if idx % 25 == 0 or idx == total_files:
                print(f"[{idx}/{total_files}] {file} ok", flush=True)
        except Exception as e:
            print(f"Erro ao processar {file}: {e}", flush=True)
            continue

    out_df = pd.DataFrame(output_rows)
    out_path = os.path.join(output_folder, f"{output_csv_name}.csv")
    out_df.to_csv(out_path, index=False)
    end_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    print(f"Fim: {end_ts}")
    print(f"Dados salvos em {out_path}")
    return out_df

# -----------------------------
# Versão TURBO (paralela + grade de horas/thresholds)
# -----------------------------
def process_epw_files_grid(
    epw_folder: str,
    hours: List[int],
    thresholds: List[int],
    output_folder: str = "Output",
    base_output_name: str = "shower",
    bath_minutes: float = 1.0,
    flow_lpm: float = 4.0,
    efficiency: float = 0.95,
    temp_usage_mode: Literal["INI-R", "FIXED"] = "INI-R",
    temp_fixed_c: float = 40.0,
    delta_min_c: float = 10.0,
    ipt_csv_path: str = "IPT_Chuveiros_Eletricos.csv",
    ref_minutes_per_day: float = 8.0,
    min_flow_lpm_filter: float = 4.0,
    power_selection: Literal["ceil", "nearest"] = "ceil",
    n_workers: Optional[int] = None,
    verbose: bool = True,
    backend: Literal["auto","process","thread"] = "auto",
) -> pd.DataFrame:
    """
    Lê cada EPW uma única vez e calcula TODAS as combinações (hour x threshold),
    em paralelo por arquivo. Escreve CSVs separados por combinação.
    Feedback por cidade, e fallback automático para threads em Windows/py3.13.
    """
    north_northeast_states = {
        "AC","AP","AM","PA","RO","RR","TO",
        "AL","BA","CE","MA","PB","PE","PI","RN","SE"
    }

    ref = load_shower_reference(
        ipt_csv_path=ipt_csv_path,
        ref_minutes_per_day=ref_minutes_per_day,
        min_flow_lpm_filter=min_flow_lpm_filter,
    )
    lista_potencias_kW = ref["lista_potencias_kW"]
    curva_ref = ref["curva_ref"]

    epw_files = [os.path.join(epw_folder, f) for f in os.listdir(epw_folder) if f.lower().endswith(".epw")]
    total_files = len(epw_files)
    if total_files == 0:
        raise FileNotFoundError("Nenhum arquivo .epw encontrado.")

    os.makedirs(output_folder, exist_ok=True)

    # Define número de workers (metade dos núcleos por padrão)
    if n_workers is None:
        cpu = os.cpu_count() or 2
        n_workers = max(1, cpu // 2)

    # Backend: por padrão, usa threads no Windows ou Python >= 3.13
    if backend == "auto":
        if os.name == "nt" or sys.version_info >= (3, 13):
            backend = "thread"
        else:
            backend = "process"

    start_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    if verbose:
        preview = ", ".join([os.path.basename(p) for p in epw_files[:3]])
        if len(epw_files) > 3:
            preview += ", ..."
        print(f"Início: {start_ts} | EPWs: {total_files} | workers: {n_workers} | backend: {backend}", flush=True)
        print(f"Exemplos de arquivos: {preview}", flush=True)

    all_rows: List[Dict[str, object]] = []
    errors: List[str] = []

    def run_with_executor(use_threads: bool) -> bool:
        """Executa com ThreadPool se True; senão, ProcessPool. Retorna True se OK."""
        try:
            if use_threads:
                Executor = ThreadPoolExecutor
                ex_kwargs = {"max_workers": n_workers}
            else:
                ctx = mp.get_context("spawn")
                Executor = ProcessPoolExecutor
                ex_kwargs = {"max_workers": n_workers, "mp_context": ctx}

            with Executor(**ex_kwargs) as ex:
                future_to_path = {}
                for j, file_path in enumerate(epw_files, start=1):
                    fut = ex.submit(
                        _process_one_epw_all_combos,
                        file_path,
                        hours,
                        thresholds,
                        bath_minutes,
                        flow_lpm,
                        efficiency,
                        temp_usage_mode,
                        temp_fixed_c,
                        delta_min_c,
                        curva_ref,
                        lista_potencias_kW,
                        power_selection,
                        north_northeast_states
                    )
                    future_to_path[fut] = file_path
                    if verbose and (j % 10 == 0 or j == total_files):
                        print(f"Submetidos {j}/{total_files} arquivos...", flush=True)

                total = len(future_to_path)
                for i, fut in enumerate(as_completed(future_to_path), start=1):
                    file_path = future_to_path[fut]
                    try:
                        rows, err = fut.result()
                    except Exception as e:
                        if verbose:
                            print(f"[{i}/{total}] ERRO ao processar {os.path.basename(file_path)}: {e}", flush=True)
                        errors.append(f"ERRO {os.path.basename(file_path)}: {e}")
                        continue

                    if err:
                        errors.append(err)
                        if verbose:
                            print(err, flush=True)
                    else:
                        all_rows.extend(rows)
                        if verbose:
                            if rows:
                                city = rows[0].get("city", "Unknown")
                                state = rows[0].get("state", "??")
                            else:
                                city = os.path.basename(file_path)
                                state = "??"
                            print(f"[{i}/{total}] {city} ({state}) ok", flush=True)
            return True
        except Exception as e:
            if verbose:
                print(f"[executor] Falhou com {'threads' if use_threads else 'processos'}: {e}", flush=True)
            return False

    # Executa conforme backend definido / fallback
    ok = False
    if backend == "thread":
        ok = run_with_executor(use_threads=True)
    elif backend == "process":
        ok = run_with_executor(use_threads=False)
        if not ok and os.name == "nt":
            if verbose:
                print("Trocando para threads por estabilidade no Windows...", flush=True)
            ok = run_with_executor(use_threads=True)
    else:  # auto
        # já resolvido lá em cima; aqui por segurança
        ok = run_with_executor(use_threads=(os.name == "nt" or sys.version_info >= (3,13)))

    if not ok:
        raise RuntimeError("Falhou ao executar tanto em processos quanto em threads.")

    if errors and verbose:
        print("\nOcorreram erros em alguns arquivos:", flush=True)
        for e in errors:
            print(" -", e, flush=True)

    big_df = pd.DataFrame(all_rows)

    # Salva em CSVs separados por (hora, threshold)
    total_combos = len(hours) * len(thresholds)
    saved = 0
    for h in hours:
        for tlim in thresholds:
            df_ht = big_df[(big_df["bath_hours"] == str(h)) & (big_df["temp_limit_c"] == tlim)].copy()
            out_name = f"{base_output_name}_h{int(h):02d}_t{int(tlim)}_1min.csv"
            out_path = os.path.join(output_folder, out_name)
            df_ht.to_csv(out_path, index=False)
            saved += 1
            if verbose and (saved % 40 == 0 or saved == total_combos):
                print(f"Salvos {saved}/{total_combos} arquivos de combinação.", flush=True)

    end_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    if verbose:
        print(f"Fim: {end_ts}", flush=True)
        print(f"Arquivos salvos em: {output_folder}", flush=True)

    return big_df


# -----------------------------
# Exemplo de uso RÁPIDO
# -----------------------------
if __name__ == "__main__":
    # Suporte a Windows/Notebook
    try:
        mp.freeze_support()
        try:
            mp.set_start_method("spawn", force=False)
        except RuntimeError:
            pass
    except Exception:
        pass

    epw_folder = "EPW_files"
    output_folder = "Output"
    os.makedirs(output_folder, exist_ok=True)

    # Todas as horas cheias (0..23) e thresholds (15..40)
    todas_as_horas = list(range(24))
    todos_thresholds = list(range(15, 41))

    banho_minutos = 1.0

    # Metade dos núcleos por padrão
    half_cores = max(1, (os.cpu_count() or 2) // 2)

    process_epw_files_grid(
        epw_folder=epw_folder,
        hours=todas_as_horas,
        thresholds=todos_thresholds,
        output_folder=output_folder,
        base_output_name="shower",
        bath_minutes=banho_minutos,
        flow_lpm=4.0,
        efficiency=0.95,
        temp_usage_mode="INI-R",
        temp_fixed_c=40.0,
        delta_min_c=10.0,
        ipt_csv_path="IPT_Chuveiros_Eletricos.csv",
        ref_minutes_per_day=8.0,
        min_flow_lpm_filter=4.0,
        power_selection="ceil",
        n_workers=half_cores,
        verbose=True,
        backend="auto",   # em Windows/py3.13 vira 'thread' automaticamente
    )


Início: 20250809_112114 | EPWs: 5202 | workers: 10 | backend: thread
Exemplos de arquivos: BRA_AC_Cruzeiro.do.Sul.Intl.AP.827040_TMYx.2007-2021.epw, BRA_AC_Cruzeiro.do.Sul.Intl.AP_Ensemble_ssp126_2050.epw, BRA_AC_Cruzeiro.do.Sul.Intl.AP_Ensemble_ssp126_2080.epw, ...
Submetidos 10/5202 arquivos...
Submetidos 20/5202 arquivos...
Submetidos 30/5202 arquivos...
Submetidos 40/5202 arquivos...
Submetidos 50/5202 arquivos...
Submetidos 60/5202 arquivos...
Submetidos 70/5202 arquivos...
Submetidos 80/5202 arquivos...
Submetidos 90/5202 arquivos...
Submetidos 100/5202 arquivos...
Submetidos 110/5202 arquivos...
Submetidos 120/5202 arquivos...
Submetidos 130/5202 arquivos...
Submetidos 140/5202 arquivos...
Submetidos 150/5202 arquivos...
Submetidos 160/5202 arquivos...
Submetidos 170/5202 arquivos...
Submetidos 180/5202 arquivos...
Submetidos 190/5202 arquivos...
Submetidos 200/5202 arquivos...
Submetidos 210/5202 arquivos...
Submetidos 220/5202 arquivos...
Submetidos 230/5202 arquivos...
Submet