** PRE PROCESSING** 
Bases from EPA test data 2020-2025 pasted together (Some rows previously deleted, but it will run either way)
Available at : https://www.epa.gov/compliance-and-fuel-economy-data/data-cars-used-testing-fuel-economy?utm_source=chatgpt.com
data cleaning & Adapting to DB Inserting

In [None]:
from pathlib import Path
import re, pandas as pd
import numpy as np

# --- CONFIG ---
ABS_DIR = Path(__file__).resolve().parent   
xlsx_path = ABS_DIR.parent / 'data' / 'vehicles' / 'testcar-2025-2020-EPA.xlsx' 
XLSX_PATH = Path(xlsx_path) # your path
DRY_RUN = True
DEBUG_PREVIEWS = 3

# DB helpers
from src.vde_core.db import insert_vde, insert_fuelcons

# --- conversões ---
LB_TO_KG = 0.45359237
LBF_TO_N = 4.4482216153
MPH_TO_KPH = 1.609344

def to_float(x):
    try: return float(x)
    except Exception: return None

def lbs_to_kg(x): return to_float(x) * LB_TO_KG if pd.notna(x) else None
def lbf_to_N(x): return to_float(x) * LBF_TO_N if pd.notna(x) else None
def lbf_mph_to_N_kph(x): return to_float(x) * (LBF_TO_N / MPH_TO_KPH) if pd.notna(x) else None
def lbf_mph2_to_N_kph2(x): return to_float(x) * (LBF_TO_N / (MPH_TO_KPH**2)) if pd.notna(x) else None
def gmi_to_gkm(x): return to_float(x) / MPH_TO_KPH if pd.notna(x) else None
def mpg_to_l100km(x):
    x = to_float(x)
    return None if (x is None or x == 0) else 235.214583 / x
def whmi_to_whkm(x): return to_float(x) / MPH_TO_KPH if pd.notna(x) else None
def hp_to_kw(x): return to_float(x) * 0.745699872 if pd.notna(x) else None

# --- inércia (classe -> massa máx) ---
def max_mass_from_inertia_class(inertia_class: float) -> float | None:
    steps = [
        (None, 346, 454), (346, 402, 510), (402, 459, 567), (459, 516, 624),
        (516, 573, 680), (573, 629, 737), (629, 686, 794), (686, 743, 850),
        (743, 799, 907), (799, 856, 964), (856, 913, 1021), (913, 969, 1077),
        (969, 1026, 1134), (1026, 1083, 1191), (1083, 1140, 1247),
        (1140, 1196, 1304), (1196, 1253, 1361), (1253, 1310, 1417),
        (1310, 1366, 1474), (1366, 1423, 1531), (1423, 1480, 1588),
        (1480, 1536, 1644), (1536, 1593, 1701), (1593, 1650, 1758),
        (1650, 1735, 1814), (1735, 1848, 1928), (1848, 1962, 2041),
        (1962, 2075, 2155), (2075, 2189, 2268), (2189, 2302, 2381),
        (2302, 2416, 2495), (2416, 2643, 2722), (2643, 2869, 2948),
        (2869, 3096, 3175), (3096, 3323, 3402), (3323, 3777, 3856),
        (3777, 4003, 4082),
    ] #2500 kg for class 2500+
    for lo, hi, cls in steps:
        if float(cls) == float(inertia_class):
            return hi
    return None

assert XLSX_PATH.exists(), f"Arquivo não encontrado: {XLSX_PATH}"
df = pd.read_excel(XLSX_PATH, engine="openpyxl")

# auxiliares
df["inertia_class"] = df["Equivalent Test Weight (lbs.)"].apply(lbs_to_kg).round()
df["mass_kg"] = df["inertia_class"].apply(max_mass_from_inertia_class)

disp = pd.to_numeric(df["Test Veh Displacement (L)"], errors="coerce").fillna(0)

# BEV: deslocamento ~0 OU sentinela muito alto (ex.: 99 L)
df["is_bev"]  = (disp <= 0.1) | (disp >= 90)   # se quiser, troque por (disp == 99.0)

# PHEV: tem ciclo CD e não é BEV
df["is_phev"] = (~df["is_bev"]) & df["Test Category"].astype(str).str.contains(r"\bCD\b", case=False, na=False)

# HEV: (sua regra) tem Bag 4 preenchido e não é BEV nem PHEV
bag4 = df["FE Bag 4"]
df["is_hev"]  = (~df["is_bev"]) & (~df["is_phev"]) & bag4.notna()

# veh_key: marca do "Represented Test Veh Make"
df["_veh_key"] = (
    df["Represented Test Veh Make"].astype(str) + "|" +
    df["Represented Test Veh Model"].astype(str) + "|" +
    df["Model Year"].astype(str)
)

# saneamento mínimo
df["RND_ADJ_FE"]        = pd.to_numeric(df["RND_ADJ_FE"], errors="coerce")
df["Rated Horsepower"]  = pd.to_numeric(df["Rated Horsepower"], errors="coerce")
df["is_bev"]            = df["is_bev"].astype(bool)

# regra: BEV, <600 hp e RND_ADJ_FE < 40  ⇒ valor está em kWh/km (corrigir p/ MPGe)
mask_fix = df["is_bev"]  & (df["RND_ADJ_FE"] < 42.1) & (df["Rated Horsepower"] < 1000)

# kWh/km → MPGe   (Wh/mi = kWh/km * 1000 * 1.60934 ;  MPGe = 33705 / Wh/mi)
def kwh100mi_to_mpge(x):
    return 33705.0 / (x *10)

df["RND_ADJ_FE_fixed"] = df["RND_ADJ_FE"]
df.loc[mask_fix, "RND_ADJ_FE_fixed"] = kwh100mi_to_mpge(df.loc[mask_fix, "RND_ADJ_FE"])

# flags úteis
df["RND_ADJ_FE_flag"] = np.where(mask_fix, "FIXED_FROM_kWh_per_km", "AS_IS")
df["RND_ADJ_FE_kWh_per_km_assumed"] = np.where(mask_fix, df["RND_ADJ_FE"], np.nan)

# (opcional) se quiser passar a usar sempre o corrigido:
df["RND_ADJ_FE"] = df["RND_ADJ_FE_fixed"]


# --- VDE payload (1 por veículo) ---
def build_vde_payload(g: pd.DataFrame) -> dict:
    return {
        "legislation": "EPA",
        "cycle_name": "FTP-75",
        "cycle_source": "standard:EPA",
        "year": g["Model Year"].iloc[0],
        "make": g["Represented Test Veh Make"].iloc[0],
        "model": g["Represented Test Veh Model"].iloc[0],
        "drive_type": g["Drive System Description"].mode().iloc[0] if not g["Drive System Description"].isna().all() else "Midsize",
        "engine_size_l": g["Test Veh Displacement (L)"].mean(),
        "transmission_type": g["Tested Transmission Type"].mode().iloc[0] if not g["Tested Transmission Type"].isna().all() else None,
        "notes": "From EPA test car dataset",
        "inertia_class": g["inertia_class"].iloc[0],
        "mass_kg": g["mass_kg"].iloc[0],
        "coast_A_N": lbf_to_N(g["Target Coef A (lbf)"].mean()),
        "coast_B_N_per_kph": lbf_mph_to_N_kph(g["Target Coef B (lbf/mph)"].mean()),
        "coast_C_N_per_kph2": lbf_mph2_to_N_kph2(g["Target Coef C (lbf/mph**2)"].mean()),
    }

# --- Combustão/HEV/PHEV-CS: usa RND_ADJ_FE; fallback por BAGs; exclui Cold CO e CD no combinado ---
def build_fc_payload_combustion(g: pd.DataFrame, electrif: str) -> dict:
    # filtra Cold CO
    proc_desc_full = g.get("Test Procedure Description", pd.Series(dtype=str)).astype(str).str.lower()
    g_eff = g.loc[~proc_desc_full.str.contains("cold co", na=False)].copy()

    proc  = g_eff.get("Test Category", pd.Series(dtype=str)).astype(str).str.lower()
    units = g_eff.get("FE_UNIT", pd.Series(dtype=str)).astype(str).str.lower()
    fe    = g_eff.get("RND_ADJ_FE", pd.Series(dtype=float))
    co2mi = g_eff.get("CO2 (g/mi)", pd.Series(dtype=float))

    m_ftp  = proc.str.contains("ftp", na=False)
    m_hw   = proc.str.contains("hwy", na=False) | proc.str.contains("hwfet", na=False)
    m_us06 = proc.str.contains("us06", na=False)
    m_sc03 = proc.str.contains("sc03", na=False)
    is_cd  = proc.str.contains("cd|charge depleting", case=False, na=False)

    def _sel(mask: pd.Series, *, exclude_cd: bool) -> pd.Series:
        m = mask.reindex(g_eff.index, fill_value=False).astype(bool)
        if exclude_cd:
            m = m & (~is_cd.reindex(g_eff.index, fill_value=False).astype(bool))
        return m

    # City/Hwy via RND_ADJ_FE (mpg), com fallback por BAGs (para FTP)
    mpg_mask = units.str.startswith("mpg")
    city_rnd = mpg_to_l100km(fe[mpg_mask & _sel(m_ftp, exclude_cd=True)].mean()) if len(g_eff) else None
    hwy_l100 = mpg_to_l100km(fe[mpg_mask & _sel(m_hw, exclude_cd=True)].mean()) if len(g_eff) else None

    city_bag = None
    if city_rnd is None:
        rows_city = g_eff[_sel(m_ftp, exclude_cd=True)]
        vals = []
        for _, r in rows_city.iterrows():
            b1 = mpg_to_l100km(r.get("FE Bag 1"))
            b2 = mpg_to_l100km(r.get("FE Bag 2"))
            b3 = mpg_to_l100km(r.get("FE Bag 3"))
            b4 = mpg_to_l100km(r.get("FE Bag 4"))
            hot = None
            hot_list = [x for x in (b3, b4) if x is not None]
            if len(hot_list) == 1: hot = hot_list[0]
            elif len(hot_list) >= 2: hot = sum(hot_list)/len(hot_list)
            if (b1 is not None) and (b2 is not None) and (hot is not None):
                vals.append((0.43*b1 + 1.00*b2 + 0.57*hot)/2.0)
        city_bag = sum(vals)/len(vals) if vals else None

    city_l100 = city_rnd if city_rnd is not None else city_bag

    # energia (Wh/km) se houver linhas Wh/mi
    def mean_energy_whkm(mask, exclude_cd=True):
        base = g_eff[_sel(mask, exclude_cd=exclude_cd)]
        if len(base) == 0:
            return None
        rows_wh = base[base["FE_UNIT"].astype(str).str.lower().str.startswith("wh")]
        return whmi_to_whkm(rows_wh["RND_ADJ_FE"].mean()) if len(rows_wh) > 0 else None

    energy_ftp  = mean_energy_whkm(m_ftp,  exclude_cd=True)
    energy_hw   = mean_energy_whkm(m_hw,   exclude_cd=True)
    energy_us06 = mean_energy_whkm(m_us06, exclude_cd=False)
    energy_sc03 = mean_energy_whkm(m_sc03, exclude_cd=False)

    # CO2 por ciclo
    def mean_co2_gkm(mask, exclude_cd=True):
        base = g_eff[_sel(mask, exclude_cd=exclude_cd)]
        if len(base) == 0 or base["CO2 (g/mi)"].isna().all():
            return None
        return gmi_to_gkm(base["CO2 (g/mi)"].mean())

    gco2_ftp  = mean_co2_gkm(m_ftp,  exclude_cd=True)
    gco2_hw   = mean_co2_gkm(m_hw,   exclude_cd=True)
    gco2_us06 = mean_co2_gkm(m_us06, exclude_cd=False)
    gco2_sc03 = mean_co2_gkm(m_sc03, exclude_cd=False)

    # combinado 2-cycle
    W_CITY, W_HWY = 0.55, 0.45
    def wavg(a, b, wa=W_CITY, wb=W_HWY):
        if a is None and b is None: return None
        if a is None: return b
        if b is None: return a
        return wa*a + wb*b

    fuel_l100_comb   = wavg(city_l100, hwy_l100)
    energy_whkm_comb = wavg(energy_ftp, energy_hw)
    gco2_comb        = wavg(gco2_ftp,  gco2_hw)
    fuel_kmpl_comb   = (100.0 / fuel_l100_comb) if (fuel_l100_comb not in (None, 0)) else None

    # performance/transmissão
    hp = g.get("Rated Horsepower", pd.Series(dtype=float)).mean() if "Rated Horsepower" in g else None
    engine_kw = hp_to_kw(hp) if hp is not None else None
    gear_count = g.get("# of Gears", pd.Series()).mode().iloc[0] if "# of Gears" in g and not g["# of Gears"].isna().all() else None
    final_drive = g.get("Axle Ratio", pd.Series(dtype=float)).mean() if "Axle Ratio" in g else None

    # BAGs como referência (mapear em campos WLTP-like)
    bag1 = to_float(g["FE Bag 1"].mean())
    bag2 = to_float(g["FE Bag 2"].mean())
    bag3 = to_float(g["FE Bag 3"].mean())
    bag4 = to_float(g["FE Bag 4"].mean())
    fuel_low  = mpg_to_l100km(bag1) if bag1 is not None else None
    fuel_mid  = mpg_to_l100km(bag2) if bag2 is not None else None
    fuel_high = mpg_to_l100km(bag3) if bag3 is not None else None
    fuel_xhigh= mpg_to_l100km(bag4) if bag4 is not None else None

    cat = g["Drive System Description"].mode().iloc[0] if not g["Drive System Description"].isna().all() else "Midsize"
    year = g["Model Year"].iloc[0]
        # até aqui tudo igual ao que você já tinha...
    # city_l100, hwy_l100, energy_ftp, energy_hw etc já calculados

    # --- conversão fuel -> energia (Wh/km) ---
    FUEL_LHV = {
        "gasoline": 32.0,    # MJ/L (ajuste se souber E10, E27 etc)
        "ethanol": 21.1,
        "e85": 25.5,
        "diesel": 36.0,
    }
    MJ_TO_Wh = 277.78  # 1 MJ = 277.78 Wh

    def l100km_to_whkm(l100, fuel_desc):
        if l100 is None: return None
        fdesc = str(fuel_desc).lower()
        if "diesel" in fdesc:
            lhv = FUEL_LHV["diesel"]
        elif "85" in fdesc:
            lhv = FUEL_LHV["e85"]
        elif "ethanol" in fdesc:
            lhv = FUEL_LHV["ethanol"]
        else:
            lhv = FUEL_LHV["gasoline"]
        # L/100km × MJ/L → MJ/100km → Wh/km
        return (l100 * lhv * MJ_TO_Wh) / 100.0

    fuel_desc = g.get("Test Fuel Type Description", pd.Series(dtype=str)).mode().iloc[0]

    energy_ftp = l100km_to_whkm(city_l100, fuel_desc)
    energy_hw  = l100km_to_whkm(hwy_l100, fuel_desc)
    energy_whkm_comb = l100km_to_whkm(fuel_l100_comb, fuel_desc)


    return {
        "electrification": electrif,
        "fuel_type": g.get("Test Fuel Type Description", pd.Series(dtype=str)).mode().iloc[0],

        "engine_max_power_kw": engine_kw,
        "engine_rpm_max_power": None,
        "engine_max_torque_nm": None,
        "engine_rpm_max_torque": None,
        "gear_count": gear_count,
        "final_drive_ratio": final_drive,

        "ambient_temp_c": 23.0,
        "ac_on": 0,
        "tire_front_psi": None,
        "tire_rear_psi": None,
        "scenario_payload_kg": None,

        "method_note": ("Combustão: City/Hwy via RND_ADJ_FE (MPG). Fallback City via Bags (0.43/1.00/0.57 com (Bag3,Bag4) hot). "
                        "Combined=0.55*City + 0.45*HWFET. CD excluído no combinado. WLTP phases = BAGs (referência)."),

        # agregados 2-cycle
        "energy_Wh_per_km": energy_whkm_comb,
        "fuel_km_per_l": fuel_kmpl_comb,
        "fuel_l_per_100km": fuel_l100_comb,
        "gco2_per_km": gco2_comb,

        # por ciclo EPA
        "energy_ftp75_Wh_per_km": energy_ftp,
        "energy_hwfet_Wh_per_km": energy_hw,
        "energy_us06_Wh_per_km":  energy_us06,
        "energy_sc03_Wh_per_km":  energy_sc03,
        "energy_coldftp_Wh_per_km": None,

        "fuel_ftp75_l_per_100km": city_l100,
        "fuel_hwfet_l_per_100km": hwy_l100,
        "fuel_us06_l_per_100km":  None,
        "fuel_sc03_l_per_100km":  None,
        "fuel_coldftp_l_per_100km": None,

        "gco2_ftp75_per_km": gco2_ftp,
        "gco2_hwfet_per_km": gco2_hw,
        "gco2_us06_per_km":  gco2_us06,
        "gco2_sc03_per_km":  gco2_sc03,
        "gco2_coldftp_per_km":  None,

        # phases (mapeadas dos bags — apenas referência)
        "energy_low_Wh_per_km":  None,
        "energy_mid_Wh_per_km":  None,
        "energy_high_Wh_per_km": None,
        "energy_xhigh_Wh_per_km":None,

        "fuel_low_l_per_100km":  fuel_low,
        "fuel_mid_l_per_100km":  fuel_mid,
        "fuel_high_l_per_100km": fuel_high,
        "fuel_xhigh_l_per_100km":fuel_xhigh,

        "label_program": "EPA",
        "label_version_year": year,
        "label_vehicle_category": cat,
        "label_cycle_set": "2-cycle",
        "label_fuel_l_per_100km": fuel_l100_comb,
        "label_gco2_per_km": gco2_comb,
    }

# --- BEV: energia via RND_ADJ_FE (Wh/mi) -> Wh/km; combinado 0.55/0.45 ---
def build_fc_payload_bev(g: pd.DataFrame) -> dict:
    proc_desc = g.get("Test Procedure Description", pd.Series(dtype=str)).astype(str).str.lower()
    fe_vals   = pd.to_numeric(g.get("RND_ADJ_FE", pd.Series(dtype=float)), errors="coerce")
    units     = g.get("FE_UNIT", pd.Series(dtype=str)).astype(str).str.lower()

    # Máscaras corretas
    m_udd = proc_desc.str.contains("udds", na=False)               # urbano
    m_hwy = proc_desc.str.contains("hwfet|highway", na=False)      # rodovia (pega HWFET)

    # performance/transmissão
    hp = g.get("Rated Horsepower", pd.Series(dtype=float)).mean() if "Rated Horsepower" in g else None
    engine_kw = hp_to_kw(hp) if hp is not None else None
    gear_count = g.get("# of Gears", pd.Series()).mode().iloc[0] if "# of Gears" in g and not g["# of Gears"].isna().all() else None
    final_drive = g.get("Axle Ratio", pd.Series(dtype=float)).mean() if "Axle Ratio" in g else None

    MPH = MPH_TO_KPH  # só pra abreviar

    def to_whkm(val, unit):
        if pd.isna(val):
            return None
        u = unit.lower()
        # Wh/mi
        if "wh/mi" in u:
            return val / MPH
        # Wh/km
        if "wh/km" in u:
            return val
        # kWh/100 mi
        if ("kwh/100" in u) and ("mi" in u):
            # kWh/100mi -> Wh/km: (val*1000/100) Wh/mi = 10*val Wh/mi -> /1.609
            return (10.0 * val) / MPH
        # kWh/100 km
        if ("kwh/100" in u) and ("km" in u):
            # kWh/100km -> Wh/km: val*1000/100 = 10*val
            return 10.0 * val
        # MPGe (e alguns BEVs vêm como "mpg" mas é MPGe implícito)
        if "mpge" in u or (("mpg" in u) and "electric" in g.get("Test Fuel Type Description", pd.Series(dtype=str)).astype(str).str.lower().str.cat(sep=" ")):
            # Wh/mi = 33705 / MPGe  -> Wh/km = (33705/MPGe)/1.609
            return (33705.0 / val) / MPH if val else None
        return None

    # valores por ciclo
    udds_vals = fe_vals[m_udd]
    hwy_vals  = fe_vals[m_hwy]
    udds_unit = units[m_udd].iloc[0] if m_udd.any() else ""
    hwy_unit  = units[m_hwy].iloc[0] if m_hwy.any() else ""

    udds_whkm = to_whkm(udds_vals.mean(), udds_unit) if m_udd.any() else None
    hwy_whkm  = to_whkm(hwy_vals.mean(),  hwy_unit)  if m_hwy.any()  else None

    # combinado 2-cycle (usa o que tiver)
    W_CITY, W_HWY = 0.55, 0.45
    def wavg(a, b):
        if (a is None) and (b is None): return None
        if a is None: return b
        if b is None: return a
        return W_CITY*a + W_HWY*b

    comb_whkm = wavg(udds_whkm, hwy_whkm)

    cat  = g["Drive System Description"].mode().iloc[0] if not g["Drive System Description"].isna().all() else "Midsize"
    year = g["Model Year"].iloc[0]

    return {
        "electrification": "BEV",
        "fuel_type": "Electric",

        "engine_max_power_kw": engine_kw if engine_kw is not None else None,
        "engine_rpm_max_power": None,
        "engine_max_torque_nm": None,
        "engine_rpm_max_torque": None,
        "gear_count": gear_count,
        "final_drive_ratio": final_drive,

        "ambient_temp_c": 23.0,
        "ac_on": 0,
        "method_note": "BEV: UDDS/HWFET via RND_ADJ_FE (converte Wh/mi, Wh/km, kWh/100mi, kWh/100km, MPGe). Combined=0.55*UDDS+0.45*HWFET (usa o que houver).",

        # agregados 2-cycle
        "energy_Wh_per_km": comb_whkm,
        "fuel_km_per_l": None,
        "fuel_l_per_100km": None,
        "gco2_per_km": 0.0 if comb_whkm is not None else None,

        # por ciclo
        "energy_ftp75_Wh_per_km": udds_whkm,  # UDDS ≈ City/FTP
        "energy_hwfet_Wh_per_km": hwy_whkm,
        "energy_us06_Wh_per_km": None,
        "energy_sc03_Wh_per_km": None,
        "energy_coldftp_Wh_per_km": None,

        "fuel_ftp75_l_per_100km": None,
        "fuel_hwfet_l_per_100km": None,
        "fuel_us06_l_per_100km": None,
        "fuel_sc03_l_per_100km": None,
        "fuel_coldftp_l_per_100km": None,

        "gco2_ftp75_per_km": 0.0 if udds_whkm is not None else None,
        "gco2_hwfet_per_km": 0.0 if hwy_whkm  is not None else None,

        "label_program": "EPA",
        "label_version_year": year,
        "label_vehicle_category": cat,
        "label_cycle_set": "2-cycle",
        "label_gco2_per_km": 0.0 if comb_whkm is not None else None,
    }

all_vde, all_fc, skipped = [], [], []
inserted = 0
preview = 0

for veh_key, g in df.groupby("_veh_key", sort=False):
    # higiene: descartar sem nenhuma info mínima
    if g[["CO2 (g/mi)", "RND_ADJ_FE", "FE Bag 1", "FE Bag 2", "FE Bag 3", "FE Bag 4"]].notna().sum().sum() == 0:
        skipped.append({"veh_key": veh_key, "reason": "no_consumption_data"})
        continue

    # VDE sempre montado
    vde_payload = build_vde_payload(g)
    
    # tipo de eletrificação
    electrif = "BEV" if g["is_bev"].any() else ("PHEV" if g["is_phev"].any() else ("HEV" if g["is_hev"].any() else "ICE"))
    fuel_desc = g.get("Test Fuel Type Description", pd.Series(dtype=str)).mode().iloc[0]
    fuel_desc_l = str(fuel_desc).lower()

    if electrif == "BEV" or fuel_desc_l == "electricity":
        fc_payload = build_fc_payload_bev(g)
    elif fuel_desc_l == "hydrogen 5":
        # ainda não tratamos FCEV
        skipped.append({"veh_key": veh_key, "reason": "pending_fcev"})
        continue
    else:
        fc_payload = build_fc_payload_combustion(g, electrif)

    all_vde.append(vde_payload)
    all_fc.append(fc_payload)

    if preview < DEBUG_PREVIEWS:
        print(f"\nVEÍCULO [{veh_key}]")
        print("  VDE:", {k: vde_payload[k] for k in ("make","model","year","mass_kg")})
        print("  FC :", {k: fc_payload.get(k) for k in ("electrification","fuel_type","fuel_ftp75_l_per_100km","fuel_hwfet_l_per_100km","fuel_l_per_100km","energy_Wh_per_km","method_note")})
        preview += 1



# export
df_vde = pd.DataFrame(all_vde)
df_fc  = pd.DataFrame(all_fc)
df_skip = pd.DataFrame(skipped)


: 

In [None]:

# === MAPAS EXATOS (tudo minúsculo, sem regex) ================================
TX_MAP_EXACT = {
    "automatic": "AT",
    "manual": "MT",
    "continuously variable": "CVT",
    "selectable continuously variable": "CVT",
    "dct": "DCT",
    "automated manual": "AMT",
    "automated manual - selectable": "AMT",
    "semi-automatic": "AMT",
    "single speed": "SS",
    "direct drive": "SS",
    "other": "OT",
}

DRIVE_MAP_EXACT = {
    "2-wheel drive, front": "FWD",
    "2-wheel drive, rear": "RWD",
    "all wheel drive": "AWD",
    "4-wheel drive": "4WD",
    "part-time 4-wheel drive": "4WD",
}

# === NORMALIZADORES (apenas strip + lower) ===================================
def _norm_exact(s):
    if not isinstance(s, str):
        return ""
    return s.strip().lower()

def normalize_transmission_db(val: str) -> str:
    return TX_MAP_EXACT.get(_norm_exact(val), "OT")

def normalize_drive_db(val: str) -> str:
    # escolha seu default para valores fora do mapa; aqui usei "AWD"
    return DRIVE_MAP_EXACT.get(_norm_exact(val), "AWD")


# === APLICANDO NO SEU DF =====================================================
df_vde["transmission_type_original"] = df_vde["transmission_type"]
df_vde["drive_type_original"]        = df_vde["drive_type"]

df_vde["transmission_type"] = df_vde["transmission_type"].apply(normalize_transmission_db)
df_vde["drive_type"]= df_vde["drive_type"].apply(normalize_drive_db)



# To be deleted before production use
df_vde['electrification'] = df_fc['electrification']

for i in range(len(df_vde)):
    if df_vde.loc[i, "electrification"] == "BEV":
        df_vde.loc[i, "transmission_type"] = "SS"
        
        
# (opcional) conferir:
print(df_vde["transmission_type"].value_counts(dropna=False))
print(df_vde["drive_type"].value_counts(dropna=False))


: 

In [None]:
from src.vde_core.services import (
    default_cycle_for_legislation, load_cycle_csv,
    compute_vde_net_mj_per_km, compute_vde_net,
    epa_city_hwy_from_phase, wltp_phases_from_phase
)

leg = "EPA"
fname = default_cycle_for_legislation(leg)
df_cycle = load_cycle_csv(fname)

vde_urb_mj, vde_urb_mj_per_km, vde_hw_mj, vde_hw_mj_per_km, vde_net_mj_per_km = [], [], [], [], []
for idx, r in df_vde.iterrows():
    A = r['coast_A_N']
    B = r['coast_B_N_per_kph']
    C = r['coast_C_N_per_kph2']
    #etw_kg = r['inertia_class']
    etw_kg = r['mass_kg'] # Mudei a função etw_kg

    res = epa_city_hwy_from_phase(df_cycle, A, B, C , etw_kg) 

    vde_urb_mj.append(res["urb_MJ"])
    vde_urb_mj_per_km.append(res["urb_MJ_km"])
    vde_hw_mj.append(res["hw_MJ"])
    vde_hw_mj_per_km.append(res["hw_MJ_km"])
    vde_net_mj_per_km.append(res["net_comb_MJ_km"])

df_vde['vde_net_mj_per_km'] = vde_net_mj_per_km
df_vde['vde_urb_mj'] = vde_urb_mj
df_vde['vde_urb_mj_per_km'] = vde_urb_mj_per_km
df_vde['vde_hw_mj'] = vde_hw_mj
df_vde['vde_hw_mj_per_km'] = vde_hw_mj_per_km


: 

In [None]:
# Ajuste Classes EPA
# 1) ajuste o caminho do XLSX da sua pasta (igual ao print)
ABS_DIR = Path(__file__).resolve().parent   
epa_path = ABS_DIR.parent / 'data' / 'vehicles' / 'vehicles_EPA_Classes.xlsx' 
EPA_XLSX = Path(xlsx_path) # your path


# === EPA category → df_vde['category'] (merge simples, sem mudar ordem) ======
import re, string

def _norm(s: str) -> str:
    s = "" if s is None else str(s)
    s = re.sub(r"\((?:[^()]*?)\)", " ", s)      # remove ( ... )
    s = re.sub(r'["“”\']', "", s)
    s = re.sub(r"[-_/]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s.lower()

def _map14(raw: str):
    if not isinstance(raw, str): return None
    s = raw.lower().replace("sport utility","suv").replace("vehicle","").replace("mid-size","midsize")
    s = re.sub(r"\b(2wd|4wd|awd)\b","",s); s = re.sub(r"\s+"," ",s).strip()
    if "two" in s and "seater" in s: return "Two Seaters"
    if "minicompact" in s: return "Minicompact Cars"
    if "subcompact" in s: return "Subcompact Cars"
    if "compact" in s and "car" in s: return "Compact Cars"
    if "midsize" in s and "car" in s: return "Midsize Cars"
    if "large" in s and "car" in s: return "Large Cars"
    if "small" in s and "wagon" in s: return "Small Station Wagons"
    if "midsize" in s and "wagon" in s: return "Midsize Station Wagons"
    if "small" in s and "suv" in s: return "Small SUVs"
    if "standard" in s and "suv" in s: return "Standard SUVs"
    if "minivan" in s: return "Minivans"
    if s == "van" or s.endswith(" van"): return "Vans"
    if "small" in s and "pickup" in s: return "Small Pickup Trucks"
    if "standard" in s and "pickup" in s: return "Standard Pickup Trucks"
    return None

# 1) ler a planilha oficial e preparar mapeamento normalizado
xl = pd.ExcelFile(EPA_XLSX)
for sh in xl.sheet_names:
    cat = xl.parse(sh)
    if cat.shape[0] and cat.shape[1]:
        break

cols = {c.lower(): c for c in cat.columns}
year_c  = cols.get("year") or cols.get("modelyear") or list(cat.columns)[0]
make_c  = cols.get("make") or list(cat.columns)[1]
model_c = cols.get("model") or list(cat.columns)[2]
class_c = next((c for c in cat.columns if re.sub(r"[^a-z]","",c.lower()) in {"vclass","epaclass","class","categoria"}), None)
assert class_c, "Não encontrei coluna de classe no vehicles_EPA_Classes.xlsx"

cat = cat[[year_c, make_c, model_c, class_c]].rename(
    columns={year_c:"year", make_c:"make", model_c:"model", class_c:"official_class"}
).copy()
cat["year"] = pd.to_numeric(cat["year"], errors="coerce").astype("Int64")
cat = cat.dropna(subset=["year"]).copy()
cat["year"] = cat["year"].astype("Int64")   # Int64 para alinhar com tmp["year"]
cat["make_n"]  = cat["make"].map(_norm)
cat["model_n"] = cat["model"].map(_norm)
cat["category"] = cat["official_class"].map(_map14)  # mapeia para as 14 categorias

# 2) normalizar chaves no df_vde
tmp = df_vde[["year","make","model"]].copy()
tmp["year"] = pd.to_numeric(tmp["year"], errors="coerce").astype("Int64")
tmp["make_n"]  = tmp["make"].map(_norm)
tmp["model_n"] = tmp["model"].map(_norm)

# 2a) garantir unicidade no catálogo por (year, make_n, model_n)
cat_key = (
    cat[["year","make_n","model_n","category"]]
      .dropna(subset=["category"])
      .sort_values(["year","make_n","model_n"])
      .groupby(["year","make_n","model_n"], as_index=False)
      .agg({"category":"first"})
)

# 2b) LEFT MERGE, validando que é m:1 (df_vde : cat_key) → escreve a categoria AGORA
match = tmp.merge(
    cat_key,
    how="left",
    on=["year","make_n","model_n"],
    validate="m:1"
)
df_vde["category"] = match["category"].values
df_vde["category_source"] = df_vde["category"].apply(lambda x: "official" if pd.notna(x) else "unmatched")

# >>> congele quem estava vazio ANTES dos fallbacks (para não sobrescrever)
orig_empty = df_vde["category"].isna().copy()

# --------------------------- Fallback 1: herdar (>=2020) ----------------------
rows_by_key = {}
for i in range(len(df_vde)):
    if not orig_empty.iat[i] and pd.isna(df_vde.at[i, "category"]):
        # continua vazio mas não era vazio "original" → não toque
        pass
    y  = tmp.loc[i, "year"]
    mk = tmp.loc[i, "make_n"]
    md = tmp.loc[i, "model_n"]
    if pd.isna(y): 
        continue
    key = (mk, md)
    rows_by_key.setdefault(key, []).append((int(y), i, df_vde.loc[i, "category"]))

for key, rows in rows_by_key.items():
    rows.sort(key=lambda t: t[0])  # (year, idx, cat)
    last_cat = None
    for year, idx, catv in rows:
        if pd.notna(catv):
            last_cat = catv
        else:
            if orig_empty.iat[idx] and year >= 2020 and last_cat is not None:
                df_vde.at[idx, "category"] = last_cat
                df_vde.at[idx, "category_source"] = "inferred_prev_2020plus"

# -------------------- Fallback 2: raiz de modelo (DB11 vs DB11 V8) --------------------
_DRIVETRAIN = r"(xdrive|4matic|quattro|awd|fwd|rwd|4wd|2wd)"
_ENGINE     = r"(v\d{1,2}\b|\d\.\d\s?l\b|\d\.\dL\b|\b\d{2,4}\s?hp\b|turbo|biturbo|twin[-\s]?turbo|supercharged|sc|tsfi|fsi|tsi|gdi|tgi|mhev|phev|hev|plug[-\s]?in)"
_TRIM       = r"(lx|le|se|sel|xle|xse|platinum|limited|sport|touring|premium|luxury|signature|rs|lt|ls|z71|srt|amg|m\s?\d{1,3}\b|type[-\s]?r)"
_BODY_HINT  = r"(sedan|saloon|hatch|hatchback|coupe|coupé|convertible|cabrio|roadster|gran\s?coupe|fastback|liftback)"
_WHEELSIZE  = r"(\b\d{2}\s?(?:\"|”|in|inch|inches)\b|\b\d{3}/\d{2}\s?R?\d{2}\b)"

def _norm_basic(s: str) -> str:
    s = "" if s is None else str(s).lower()
    s = re.sub(r"\((?:[^()]*?)\)", " ", s)
    s = s.replace("“","").replace("”","").replace('"',"").replace("'","")
    s = re.sub(r"[-_/]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def _model_core(s: str) -> str:
    s = _norm_basic(s)
    s = re.sub(_DRIVETRAIN, " ", s)
    s = re.sub(_ENGINE, " ", s)
    s = re.sub(_TRIM, " ", s)
    s = re.sub(_BODY_HINT, " ", s)
    s = re.sub(_WHEELSIZE, " ", s, flags=re.I)
    s = re.sub(r"[^\w\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def _loose_compact(s: str) -> str:
    s = _norm_basic(s)
    s = "".join(ch for ch in s if ch not in string.punctuation).replace(" ", "")
    return s

def _tokens(s: str) -> set:
    return set(_norm_basic(s).split())

# índice de doadores por marca
donors_by_make = {}
for _, r in cat_key.iterrows():
    if pd.isna(r["category"]): 
        continue
    mk = r["make_n"]; mdl = r["model_n"]
    donors_by_make.setdefault(mk, []).append({
        "core": _model_core(mdl),
        "compact": _loose_compact(mdl),
        "tokens": _tokens(mdl),
        "cat": r["category"]
    })

filled_core = 0
for i in range(len(df_vde)):
    if not orig_empty.iat[i]:
        continue  # só mexe em quem estava vazio originalmente
    if pd.notna(df_vde.at[i, "category"]):
        continue  # já preenchido por outro fallback

    mk  = tmp.at[i, "make_n"]
    mdl = tmp.at[i, "model_n"]
    if not mk or not mdl: 
        continue
    donors = donors_by_make.get(mk)
    if not donors:
        continue

    t_core    = _model_core(mdl)
    t_compact = _loose_compact(mdl)
    t_tokens  = _tokens(mdl)

    chosen = None
    # (1) core exato
    for d in donors:
        if t_core and t_core == d["core"]:
            chosen = d["cat"]; break
    # (2) compacto exato
    if not chosen:
        for d in donors:
            if t_compact and t_compact == d["compact"]:
                chosen = d["cat"]; break
    # (3) prefix/substring na raiz
    if not chosen and t_core:
        for d in donors:
            a, b = t_core, d["core"]
            if a.startswith(b) or b.startswith(a) or (a in b) or (b in a):
                chosen = d["cat"]; break
    # (4) Jaccard tokens (limiar 0.5)
    if not chosen and t_tokens:
        best_j, best_cat = 0.0, None
        for d in donors:
            if not d["tokens"]:
                continue
            inter = len(t_tokens & d["tokens"])
            union = len(t_tokens | d["tokens"])
            j = inter / union if union else 0.0
            if j > best_j:
                best_j, best_cat = j, d["cat"]
        if best_j >= 0.5:
            chosen = best_cat

    if chosen:
        df_vde.at[i, "category"] = chosen
        df_vde.at[i, "category_source"] = "inferred_model_core"
        filled_core += 1

print(f"Inferidos por raiz de modelo: {filled_core}")

# -------------------- Fallback 3: texto do MODEL (SUV / Sedan / etc.) --------------------
def _body_class_from_model(model: str) -> str | None:
    if not isinstance(model, str) or not model.strip():
        return None
    s = model.lower()
    if re.search(r"\b(mini\s?van|minivan)\b", s): return "Minivans"
    if re.search(r"\b(van|sprinter|promaster|transit|express|savana|nv200|metris)\b", s): return "Vans"
    if re.search(r"\b(wagon|avant|allroad|touring sports|sportwagen|alltrack|outback|estate)\b", s): return "Midsize Station Wagons"
    small_pickups = r"(tacoma|colorado|canyon|ranger|frontier|maverick|santa\s?cruz|gladiator)"
    std_pickups   = r"(f[\s-]?150|silverado\s?1500|sierra\s?1500|ram\s?1500|tundra|titan|super\s?duty|2500|3500)"
    if re.search(r"\b(pick[\s-]?up|pickup|truck)\b", s) or re.search(small_pickups + "|" + std_pickups, s):
        if re.search(std_pickups, s): return "Standard Pickup Trucks"
        if re.search(small_pickups, s): return "Small Pickup Trucks"
        return "Standard Pickup Trucks"
    if "suv" in s or "sport utility" in s or "crossover" in s:
        if re.search(r"\b(grand|xl|max|suburban|tahoe|sequoia|wagoneer|explorer|highlander|pilot|telluride|palisade|atlas|yukon|x5|x6|x7|q7|q8|rx|tx|gl[es]|gle|gls)\b", s): return "Standard SUVs"
        if re.search(r"\b(xc40|x3|x1|q3|q5|gla|glb|glc|nx|ux|forester|crosstrek|rav4|cr[-\s]?v|cx[-\s]?3|cx[-\s]?30|cx[-\s]?5|rogue|taos|tiguan|compass|renegade|bronco\s?sport|escape|kona|seltos|sportage|kicks|countryman|corolla\s?cross|equinox|trax|trailblazer)\b", s): return "Small SUVs"
        return "Standard SUVs"
    if re.search(r"\b(roadster|speedster|spyder|spider)\b", s): return "Two Seaters"
    if re.search(r"\b(sedan|saloon)\b", s) or re.search(r"\b(hatch|hatchback)\b", s) or \
       re.search(r"\b(coupe|coupé|convertible|cabrio)\b", s) or "gran coupe" in s or "fastback" in s or "liftback" in s:
        return "Midsize Cars"
    return None

filled_body = 0
for i in range(len(df_vde)):
    if not orig_empty.iat[i]:
        continue
    if pd.notna(df_vde.at[i, "category"]):
        continue
    bc = _body_class_from_model(df_vde.at[i, "model"])
    if bc:
        df_vde.at[i, "category"] = bc
        df_vde.at[i, "category_source"] = "inferred_bodytext"
        filled_body += 1

print(f"Inferidos por texto do model(): {filled_body}")

# 4) limpar helpers
del tmp, match, cat_key, donors_by_make, rows_by_key, orig_empty

# se a coluna existir:
df_vde["category"] = df_vde["category"].fillna("Unknown")


Inferidos por raiz de modelo: 2074
Inferidos por texto do model(): 121


In [None]:
# === Decomposição NET a partir do coastdown final já existente no df_vde ===
# Pré-requisitos no kernel: df_vde com as colunas abaixo.
# Necessário no disco: /mnt/data/vde_defaults_by_category_trans_elec_v2_1.csv

import numpy as np
# ---- Parâmetros/constantes ----

default_path = ABS_DIR.parent / 'data' / 'standards' / 'vde_defaults_by_category_trans_elec.csv' 
DEFAULTS_PATH = Path(default_path)


G = 9.80665     # m/s²
RHO = 1.2       # kg/m³
TOL_A = 5    # N
TOL_B = 0.1   # N/kph
TOL_C = 1e-1    # N/kph²



# ---- Verificações básicas ----
if 'df_vde' not in globals():
    raise NameError("df_vde não está no kernel. Carregue seu df_vde antes de rodar esta célula.")

req_cols = [
    "category","electrification","transmission_type","mass_kg",
    "coast_A_N","coast_B_N_per_kph","coast_C_N_per_kph2"
]
missing = [c for c in req_cols if c not in df_vde.columns]
if missing:
    raise KeyError(f"Faltam colunas no df_vde: {missing}")

# Coerção numérica (se vierem strings)
for c in ["mass_kg","cda_m2","coast_A_N","coast_B_N_per_kph","coast_C_N_per_kph2"]:
    if c in df_vde.columns:
        df_vde[c] = pd.to_numeric(df_vde[c], errors="coerce")

# ---- Carrega defaults ----
df_def = pd.read_csv(DEFAULTS_PATH)

def_keys = [
    "category","electrification","transmission_type",
    "cdA_default_m2","rrc_N_per_kN","crr1_frac_at_120kph",
    "brake_A_N","brake_B_Npkph","trans_A_N","trans_B_Npkph"
]
miss_def = [c for c in def_keys if c not in df_def.columns]
if miss_def:
    raise KeyError(f"Faltam colunas no defaults CSV: {miss_def}")

# ---- Seleciona apenas linhas com coastdown completo ----
mask = df_vde["coast_A_N"].notna() & df_vde["coast_B_N_per_kph"].notna() & df_vde["coast_C_N_per_kph2"].notna()
df_in = df_vde.loc[mask].copy()
if df_in.empty:
    raise ValueError("Nenhuma linha com coastdown completo encontrada em df_vde para decompor.")

# ---- Merge com defaults por (category, engine_type, transmission_type) ----
keys = ["category","transmission_type","electrification",]
dfm = df_in.merge(df_def, on=keys, how="left", suffixes=("","_def"))

# ---- Função de decomposição NET (sem transmissão no NET) ----
def decompose_row_NET_v0(r):
    # Escolhe CdA: usa do veículo se houver, senão default do catálogo
    cdA = r["cda_m2"] if pd.notna(r.get("cda_m2", np.nan)) else r["cdA_default_m2"]

    # Pneus (rrc em N/kN * carga total em kN)
    if r['mass_kg'] <= 0 or pd.isna(r['mass_kg']):
        A_rr = float(r["rrc_N_per_kN"]) * (float(r["inertia_class"]) * G / 1000.0)  # fallback para inertia_class
    else:
        A_rr = float(r["rrc_N_per_kN"]) * (float(r["mass_kg"]) * G / 1000.0)
    B_rr = A_rr * (float(r["crr1_frac_at_120kph"]) / 120.0)
    C_rr = 0.0

    # Aero (C em N/kph²)
    C_aero = 0.5 * RHO * float(cdA) * (1/3.6)**2

    # Parasitic NET = resto atribuído a brake/bearings (transmissão NÃO entra no NET)
    A_par = max(0.0, float(r["coast_A_N"]) - A_rr)
    B_par = max(0.0, float(r["coast_B_N_per_kph"]) - B_rr)
    C_par = max(0.0, float(r["coast_C_N_per_kph2"]) - C_aero)  # esperado ~0

    # Recomposição (QA)
    A_hat = A_rr + A_par
    B_hat = B_rr + B_par
    C_hat = C_aero + C_rr + C_par

    dA = A_hat - float(r["coast_A_N"])
    dB = B_hat - float(r["coast_B_N_per_kph"])
    dC = C_hat - float(r["coast_C_N_per_kph2"])
    check_ok = (abs(dA) <= TOL_A) and (abs(dB) <= TOL_B) and (abs(dC) <= TOL_C)

    # Transmissão como referência (para TOTAL no futuro)
    trans_A_ref = float(r.get("trans_A_N", 0.0))
    trans_B_ref = float(r.get("trans_B_Npkph", 0.0))

    return pd.Series({
        # Blocos NET que compõem o coastdown
        "rr_alpha_N": A_rr,
        "rr_beta_Npkph": B_rr,
        "rr_a_Npkph2": C_rr,
        "aero_C_coef_Npkph2": C_aero,
        "parasitic_A_coef_N": A_par,
        "parasitic_B_coef_Npkph": B_par,
        "parasitic_C_coef_Npkph2": C_par,
        # Referências (não entram no NET)
        "trans_A_ref_N": trans_A_ref,
        "trans_B_ref_Npkph": trans_B_ref,
        # QA
        "dA": dA, "dB": dB, "dC": dC,
        "check_ok": check_ok,
        "rl_source": "measured_decomposed_NET"
    })

def decompose_row_NET(r):
    """
    Decomposição NET (coastdown medido) com ajuste mínimo:
    - C: usa C_measured para backcalcular CdA*  => dC = 0
    - A: usa priors de parasitic (trans+brake) para backcalcular RRC* dentro de limites
    - B: idem para backcalcular frac120* dentro de limites
    - Transmissão entra no NET (é neutro girando), split informativo via priors
    """
    # ---- parâmetros / limites ----
    G = 9.80665
    RHO = 1.2
    TOL_A = 8
    TOL_B = 0.15
    TOL_C = 5e-2
    # limites plausíveis por tipo
    cat = str(r.get("category", "") or "")
    elec = str(r.get("electrification", "") or "")
    is_truckish = ("Pickup" in cat) or (cat in ("Vans","Small Pickup Trucks","Standard Pickup Trucks"))
    RRC_MIN, RRC_MAX = (9.0, 12.0) if is_truckish else (7.5, 10.5)      # N/kN
    FRAC_MIN, FRAC_MAX = 0.08, 0.14                                      # @120 km/h (típico)
    # priors de parasitic (se vierem NaN, viram 0)
    trans_A_prior = float(r.get("trans_A_N", 0) or 0)
    trans_B_prior = float(r.get("trans_B_Npkph", 0) or 0)
    brake_A_prior = float(r.get("brake_A_N", 0) or 0)
    brake_B_prior = float(r.get("brake_B_Npkph", 0) or 0)

    # ---- entradas medidas ----
    A_meas = float(r["coast_A_N"])
    B_meas = float(r["coast_B_N_per_kph"])
    C_meas = float(r["coast_C_N_per_kph2"])
    mass_kg = float(r.get("mass_kg", float("nan")))
    inertia = float(r.get("inertia_class", float("nan")))
    mass_used = mass_kg if (mass_kg == mass_kg and mass_kg > 0) else inertia

    # ===== 1) C: feche no medido (back-calc CdA*) =====
    C_aero = max(0.0, C_meas)  # parasitic_C e C_rr ~ 0, então iguala
    cdA_star = (2.0 * C_aero * (3.6**2)) / RHO  # só para log; não precisa persistir se já tiver cda_m2

    # ===== 2) A: back-calc RRC* a partir do A medido e dos priors de parasitic =====
    A_par_prior = max(0.0, trans_A_prior + brake_A_prior)
    A_rr_star = max(0.0, A_meas - A_par_prior)
    if mass_used and mass_used > 0:
        rrc_star = (A_rr_star * 1000.0) / (mass_used * G)
    else:
        rrc_star = float(r.get("rrc_N_per_kN", 9.5) or 9.5)  # fallback

    # limite plausível
    rrc_star = min(max(rrc_star, RRC_MIN), RRC_MAX)
    # recompute A_rr com rrc* limitado
    A_rr = rrc_star * (mass_used * G / 1000.0)

    # parasitic total (NET) é o resto
    A_par = max(0.0, A_meas - A_rr)

    # ===== 3) B: back-calc frac120* a partir do B medido e priors =====
    B_par_prior = max(0.0, trans_B_prior + brake_B_prior)
    B_rr_star = max(0.0, B_meas - B_par_prior)
    if A_rr > 1e-9:
        frac120_star = (B_rr_star / A_rr) * 120.0
    else:
        frac120_star = float(r.get("crr1_frac_at_120kph", 0.12) or 0.12)

    # limite plausível
    frac120_star = min(max(frac120_star, FRAC_MIN), FRAC_MAX)
    # recompute B_rr com frac* limitado
    B_rr = A_rr * (frac120_star / 120.0)

    # parasitic total (NET) para B é o resto
    B_par = max(0.0, B_meas - B_rr)

    # ===== 4) split informativo (trans vs brake) por proporção dos priors =====
    a_sum = max(1e-9, trans_A_prior + brake_A_prior)
    b_sum = max(1e-9, trans_B_prior + brake_B_prior)
    trans_A_est = A_par * (trans_A_prior / a_sum)
    brake_A_est = A_par * (brake_A_prior / a_sum)
    trans_B_est = B_par * (trans_B_prior / b_sum)
    brake_B_est = B_par * (brake_B_prior / b_sum)

    # ===== 5) QA (recomposição deve colar no medido) =====
    A_hat = A_rr + A_par
    B_hat = B_rr + B_par
    C_hat = C_aero  # = C_meas
    dA = A_hat - A_meas
    dB = B_hat - B_meas
    dC = C_hat - C_meas
    check_ok = (abs(dA) <= TOL_A) and (abs(dB) <= TOL_B) and (abs(dC) <= TOL_C)

    return pd.Series({
        # blocos NET
        "rr_alpha_N": A_rr,
        "rr_beta_Npkph": B_rr,
        "rr_a_Npkph2": 0.0,
        "aero_C_coef_Npkph2": C_aero,
        "parasitic_A_coef_N": A_par,
        "parasitic_B_coef_Npkph": B_par,
        "parasitic_C_coef_Npkph2": 0.0,
        # estimativas informativas (não usadas na recomposição)
        "trans_A_est_N": trans_A_est,
        "trans_B_est_Npkph": trans_B_est,
        "brake_A_est_N": brake_A_est,
        "brake_B_est_Npkph": brake_B_est,
        "cdA_backcalc_m2": cdA_star,
        "rrc_backcalc_N_per_kN": rrc_star,
        "frac120_backcalc": frac120_star,
        # QA
        "dA": dA, "dB": dB, "dC": dC, "check_ok": check_ok,
        "rl_source": "measured_decomposed_NET_fit"
    })



# ---- Aplica a decomposição linha a linha ----
df_blocks = dfm.apply(decompose_row_NET, axis=1)

# ---- Monta saída final (pronta para persistência) ----
cols_front = [
    "legislation","cycle_name","cycle_source","year","make","model",
    "drive_type","engine_size_l","engine_type","transmission_type","notes",
    "inertia_class","mass_kg",
    "coast_A_N","coast_B_N_per_kph","coast_C_N_per_kph2",
    "category","category_source"
]
cols_front = [c for c in cols_front if c in dfm.columns]

df_decomp = pd.concat([dfm[cols_front].reset_index(drop=True),
                       df_blocks.reset_index(drop=True)], axis=1)

# ---- Resultado: df_decomp e um CSV de apoio ----
print("Linhas processadas:", len(df_decomp), "| OK:", int(df_decomp['check_ok'].sum()))
display(df_decomp.head(20))

tire_path = ABS_DIR.parent / 'data' / 'vehicles' / 'df_vde_decomposed_NET.csv' 
OUT_CSV =Path(tire_path)



df_decomp.to_csv(OUT_CSV, index=False)
print("Arquivo salvo em:", OUT_CSV)



Linhas processadas: 4999 | OK: 2958


Unnamed: 0,legislation,cycle_name,cycle_source,year,make,model,drive_type,engine_size_l,transmission_type,notes,...,brake_A_est_N,brake_B_est_Npkph,cdA_backcalc_m2,rrc_backcalc_N_per_kN,frac120_backcalc,dA,dB,dC,check_ok,rl_source
0,EPA,FTP-75,standard:EPA,2025,Aston Martin,DB12 V8,RWD,4.0,AT,From EPA test car dataset,...,6.524348,0.034556,0.819851,10.5,0.14,0.0,0.0,0.0,True,measured_decomposed_NET_fit
1,EPA,FTP-75,standard:EPA,2025,Aston Martin,DBX 707,4WD,4.0,AT,From EPA test car dataset,...,11.0,0.0,1.38373,9.830726,0.08,0.0,1.063528,0.0,False,measured_decomposed_NET_fit
2,EPA,FTP-75,standard:EPA,2025,Aston Martin,Vanquish,RWD,5.2,AT,From EPA test car dataset,...,5.0,0.068714,0.838399,9.756379,0.14,0.0,0.0,0.0,True,measured_decomposed_NET_fit
3,EPA,FTP-75,standard:EPA,2025,Aston Martin,Vantage,RWD,4.0,AT,From EPA test car dataset,...,6.666236,0.045727,0.83098,10.5,0.14,0.0,0.0,0.0,True,measured_decomposed_NET_fit
4,EPA,FTP-75,standard:EPA,2025,BMW,230i Coupe,RWD,2.0,AMT,From EPA test car dataset,...,9.22914,0.0,0.734527,10.5,0.08,0.0,0.432742,0.0,False,measured_decomposed_NET_fit
5,EPA,FTP-75,standard:EPA,2025,BMW,230i xDrive Coupe,AWD,2.0,AMT,From EPA test car dataset,...,23.731133,0.0,0.884771,10.5,0.08,0.0,1.481445,0.0,False,measured_decomposed_NET_fit
6,EPA,FTP-75,standard:EPA,2025,BMW,330i Sedan,RWD,2.0,AMT,From EPA test car dataset,...,10.467605,0.0,0.750479,10.5,0.08,0.0,0.354041,0.0,False,measured_decomposed_NET_fit
7,EPA,FTP-75,standard:EPA,2025,BMW,330i xDrive Sedan,AWD,2.0,AMT,From EPA test car dataset,...,20.513562,0.0,0.818441,10.5,0.08,0.0,0.768088,0.0,False,measured_decomposed_NET_fit
8,EPA,FTP-75,standard:EPA,2025,BMW,430i Convertible,RWD,2.0,AMT,From EPA test car dataset,...,7.667997,0.0,0.741575,10.5,0.08,0.0,0.347979,0.0,False,measured_decomposed_NET_fit
9,EPA,FTP-75,standard:EPA,2025,BMW,430i Coupe,RWD,2.0,AMT,From EPA test car dataset,...,9.989649,0.0,0.712639,10.5,0.08,0.0,0.348513,0.0,False,measured_decomposed_NET_fit


Arquivo salvo em: C:\Users\CaioHenriqueFerreira\Downloads\From Git\projects\EcoDrive-Analyst\data\df_vde_decomposed_NET.csv


In [None]:

# 0) garantia de 1:1 por posição
assert len(df_decomp) == len(df_vde), "df_decomp e df_vde precisam ter o mesmo tamanho."
mask_ok = df_decomp["check_ok"].fillna(False)

# 1) mapeie nomes (decomp → nomes do schema vde_db)
name_map = {
    # coast_* (SE quiser só quando ok, descomente as 3 linhas abaixo)
    # "coast_A_N": "coast_A_N",
    # "coast_B_N_per_kph": "coast_B_N_per_kph",
    # "coast_C_N_per_kph2": "coast_C_N_per_kph2",

    # aero + pneus (modelo RR)
    "aero_C_coef_Npkph2": "aero_C_coef_Npkph2",
    "rr_alpha_N": "rr_alpha_N",
    "rr_beta_Npkph": "rr_beta_Npkph",
    "rr_a_Npkph2": "rr_a_Npkph2",

    # parasitic (NET)
    "parasitic_A_coef_N": "parasitic_A_coef_N",
    "parasitic_B_coef_Npkph": "parasitic_B_coef_Npkph",
    "parasitic_C_coef_Npkph2": "parasitic_C_coef_Npkph2",

    # split informativo → campos específicos do DB
    "trans_A_est_N": "trans_A_coef_N",
    "trans_B_est_Npkph": "trans_B_coef_Npkph",
    # C em NET esperado ~0; vamos preencher 0.0 quando ok
    # "trans_C_*" não vem do decomp; criaremos já-já
    "brake_A_est_N": "brake_A_coef_N",
    "brake_B_est_Npkph": "brake_B_coef_Npkph",
    "rrc_backcalc_N_per_kN": "rrc_N_per_kN"
    # idem para C de freio (0.0)
}

# 2) garanta colunas de destino no df_vde (criadas se não existirem)
for dst in list(name_map.values()) + [
    "trans_C_coef_Npkph2", "brake_C_coef_Npkph2",  # vamos setar 0.0 quando ok
    "aero_C_coef_Npkph2", "rr_alpha_N","rr_beta_Npkph","rr_a_Npkph2",
    "parasitic_A_coef_N","parasitic_B_coef_Npkph","parasitic_C_coef_Npkph2",
    "check_ok","rl_source"
]:
    if dst not in df_vde.columns:
        df_vde[dst] = np.nan

# 3) aplique SOMENTE nas linhas com check_ok=True
for src, dst in name_map.items():
    if src in df_decomp.columns:
        df_vde.loc[mask_ok, dst] = df_decomp.loc[mask_ok, src].values

# 3.1) completar campos C de trans/freio como 0.0 quando ok (NET)
df_vde.loc[mask_ok, "trans_C_coef_Npkph2"] = 0.0
df_vde.loc[mask_ok, "brake_C_coef_Npkph2"] = 0.0

# 3.2) copy de flags/metadata úteis (sem sobrescrever quando ausente)
for aux in ["check_ok","rl_source"]:
    if aux in df_decomp.columns:
        df_vde.loc[mask_ok, aux] = df_decomp.loc[mask_ok, aux].values

# 4) opcional: NÃO carregar colunas auxiliares “temporárias” do decomp
#    Se elas tiverem vindo pra df_vde via outros merges, remova:
drop_like = ("_est", "backcalc", "dA", "dB", "dC")
cols_drop = [c for c in df_vde.columns if any(tok in c for tok in drop_like)]
df_vde_final = df_vde.drop(columns=cols_drop, errors="ignore").copy()

# 5) ajuste mínimo para nomes “canônicos” do DB

# transmission_type: DB aceita texto; manter "AMT","OT","SS" não quebra o schema.

# 6) (opcional) selecione só colunas do schema vde_db para o insert
db_cols = [
    "legislation","category","make","model","year","notes",
    "engine_type","engine_model","engine_size_l","engine_aspiration",
    "transmission_type","transmission_model",
    "mass_kg","inertia_class","cda_m2","weight_dist_fr_pct","payload_kg",
    "tire_size","tire_rr_note","smerf","front_pressure_psi","rear_pressure_psi",
    "coast_A_N","coast_B_N_per_kph","coast_C_N_per_kph2",
    "trans_A_coef_N","trans_B_coef_Npkph","trans_C_coef_Npkph2",
    "brake_A_coef_N","brake_B_coef_Npkph","brake_C_coef_Npkph2",
    "aero_C_coef_Npkph2",
    "rr_alpha_N","rr_beta_Npkph","rr_a_Npkph2","rr_b_N","rr_c_Npkph","rr_load_kpa",
    "cycle_name","cycle_source", "rrc_N_per_kN",
    "vde_urb_mj","vde_hw_mj","vde_net_mj_per_km","vde_total_mj_per_km",
    "vde_low_mj_per_km","vde_mid_mj_per_km","vde_high_mj_per_km","vde_extra_high_mj_per_km",
    # campos que você disse ter adicionado depois:
    "drive_type","mro_kg","options_kg","wltp_category","vde_urb_mj_per_km","vde_hw_mj_per_km",
    "parasitic_A_coef_N","parasitic_B_coef_Npkph","parasitic_C_coef_Npkph2",
    
]

df_vde_to_insert = df_vde_final[[c for c in db_cols if c in df_vde_final.columns]].copy()


  df_vde.loc[mask_ok, aux] = df_decomp.loc[mask_ok, aux].values
 'measured_decomposed_NET_fit' ... 'measured_decomposed_NET_fit'
 'measured_decomposed_NET_fit' 'measured_decomposed_NET_fit']' has dtype incompatible with float64, please explicitly cast to a compatible dtype first.
  df_vde.loc[mask_ok, aux] = df_decomp.loc[mask_ok, aux].values


In [38]:
# %% Gráfico de dispersão: vde_net_mj_per_km vs energy_Wh_per_km (BEV)
import plotly.express as px

import pandas as pd
assert len(df_vde_to_insert) == len(df_fc)

cols = df_fc.columns
df_plot = pd.concat(
    [df_vde_to_insert.reset_index(drop=True),
     df_fc[cols].reset_index(drop=True)],
    axis=1
)


# Filtro: só BEV
df_bev = df_plot[df_plot["electrification"] == 'BEV'].copy()

fig = px.scatter(
    data_frame=df_bev,
    x="vde_net_mj_per_km",
    y="energy_Wh_per_km",
    labels={"vde_net_mj_per_km":"vde_net_mj_per_km", "energy_Wh_per_km":"energy_Wh_per_km"},
    title="BEV — vde_net_mj_per_km vs energy_Wh_per_km"
)

fig.show()

In [37]:
import numpy as np
import pandas as pd

# --- cópias locais (alinhadas 1:1 com o VDE) ---
df_fc = df_fc.copy()
vde   = df_vde_to_insert.copy()
assert len(df_fc) == len(vde), "df_fc e df_vde_to_insert precisam estar alinhados"

# --- saneamento mínimo ---
for c in ["energy_Wh_per_km","fuel_l_per_100km","fuel_km_per_l","utility_factor_pct","fuel_type","electrification"]:
    if c in df_fc.columns:
        if c in ("fuel_type","electrification"):
            df_fc[c] = df_fc[c].astype(str)
        else:
            df_fc[c] = pd.to_numeric(df_fc[c], errors="coerce")

# -----------------------------
# 1) BEV: corrigir unidades e calcular eficiência de tração
# -----------------------------
mask_bev = df_fc["electrification"].str.upper().eq("BEV")

# energia nas rodas (NET) → Wh/km
wheel_Whkm = vde["vde_net_mj_per_km"] * 277.7778

# razão observada (adimensional)
R = df_fc["energy_Wh_per_km"] / wheel_Whkm

# 1a) Correção de unidade: suspeita de Wh/mi colocado como Wh/km
mask_whmi = mask_bev & (R > 1.60) & df_fc["energy_Wh_per_km"].notna()
df_fc.loc[mask_whmi, "energy_Wh_per_km"] = df_fc.loc[mask_whmi, "energy_Wh_per_km"] / 1.60934

# recomputa R após correção
R = df_fc["energy_Wh_per_km"] / wheel_Whkm

# 1b) Classificação DC vs AC pela razão física
charger_eta = 0.92
eta_dc = 1.0 / R
eta_ac = charger_eta / R

# escolha: se DC < 0.70 e AC >= 0.70, usa AC; caso contrário, DC
eta_choice = np.where(eta_dc < 0.70, eta_ac, eta_dc)

# aplica somente em BEV; clamp físico (0.70–0.98)
bev_eff = np.where(mask_bev, eta_choice, np.nan)
bev_eff = np.clip(bev_eff, 0.70, 0.98, out=np.full_like(bev_eff, np.nan), where=mask_bev)

# flags
df_fc["bev_eff_drive"] = bev_eff
df_fc["eff_flag"] = "OK"
df_fc.loc[mask_whmi, "eff_flag"] = "FIX_WHMI"
mask_low = mask_bev & (eta_choice < 0.70)
df_fc.loc[mask_low, "bev_eff_drive"] = np.nan
df_fc.loc[mask_low, "eff_flag"] = "LOW_EFF_OUTLIER"

print("BEV corrigidos (Wh/mi→Wh/km):", int(mask_whmi.sum()))
print("BEV com eficiência <0.7 (após correções):", int(mask_low.sum()))

# -----------------------------
# 2) ICE/HEV/PHEV: eficiência média do trem (rodas ÷ tanque/bateria CS)
# -----------------------------
mask_non_bev = ~mask_bev

# usa TOTAL se existir; senão, NET com leve ajuste (~+6%)
wheel_MJpkm = (vde["vde_total_mj_per_km"]
               if "vde_total_mj_per_km" in vde.columns and vde["vde_total_mj_per_km"].notna().any()
               else vde["vde_net_mj_per_km"] * 1.06)

LHV_MJ_per_L = {
    "GASOLINE": 32.0, "E10": 31.0, "ETHANOL": 21.1, "FLEX": 30.0,
    "DIESEL": 35.8, "CNG": 21.0, "ELECTRIC": np.nan
}
lhv_vec = df_fc.get("fuel_type", "").str.upper().map(LHV_MJ_per_L).fillna(32.0)

if "fuel_l_per_100km" in df_fc.columns and df_fc["fuel_l_per_100km"].notna().any():
    L_per_100km = df_fc["fuel_l_per_100km"]
elif "fuel_km_per_l" in df_fc.columns and df_fc["fuel_km_per_l"].notna().any():
    L_per_100km = 100.0 / df_fc["fuel_km_per_l"]
else:
    L_per_100km = pd.Series(np.nan, index=df_fc.index)

fuel_MJpkm = (L_per_100km * lhv_vec) / 100.0
eta_pt = np.where((fuel_MJpkm > 0) & mask_non_bev, (wheel_MJpkm / fuel_MJpkm), np.nan)
df_fc["eta_pt_est"] = np.clip(eta_pt, 0.20, 0.55)

# -----------------------------
# 3) colunas auxiliares garantidas
# -----------------------------
if "utility_factor_pct" not in df_fc.columns:
    df_fc["utility_factor_pct"] = np.nan

# (opcional) estatísticas rápidas
if mask_bev.any():
    med_eff = np.nanmedian(df_fc.loc[mask_bev, "bev_eff_drive"])
    p10, p90 = np.nanpercentile(df_fc.loc[mask_bev, "bev_eff_drive"].dropna(), [10,90]) if df_fc.loc[mask_bev, "bev_eff_drive"].notna().any() else (np.nan, np.nan)
    print(f"BEV eficiência (mediana/p10/p90): {med_eff:.3f} / {p10:.3f} / {p90:.3f}")



BEV corrigidos (Wh/mi→Wh/km): 1
BEV com eficiência <0.7 (após correções): 3
BEV eficiência (mediana/p10/p90): 0.980 / 0.862 / 0.980


In [39]:



import plotly.express as px

import pandas as pd
assert len(df_vde_to_insert) == len(df_fc)

cols = df_fc.columns
df_plot = pd.concat(
    [df_vde_to_insert.reset_index(drop=True),
     df_fc[cols].reset_index(drop=True)],
    axis=1
)


# Filtro: só BEV
df_bev = df_plot[df_plot["electrification"] == 'BEV'].copy()

fig = px.scatter(
    data_frame=df_bev,
    x="vde_net_mj_per_km",
    y="bev_eff_drive",
    labels={"vde_net_mj_per_km":"vde_net_mj_per_km", "bev_eff_drive":"bev_eff_drive"},
    title="BEV — vde_net_mj_per_km vs eta_pt_est"
)

fig.show()

In [11]:

EXPORT_XLSX = Path("epa_grouped_output_to_DB.xlsx")
with pd.ExcelWriter(EXPORT_XLSX, engine="openpyxl") as writer:
    df_vde_to_insert.to_excel(writer, sheet_name="VDE", index=False)
    df_fc.to_excel(writer, sheet_name="FuelCons", index=False)
    if not df_skip.empty:
        df_skip.to_excel(writer, sheet_name="Skipped", index=False)

print(f"\nExportado para: {EXPORT_XLSX.resolve()}")
print(f"Resumo: VDE={len(df_vde)} | FuelCons={len(df_fc)} | Skipped={len(df_skip)}")

if not DRY_RUN:
    print(f"OK: {inserted} veículos inseridos (ordem VDE→FuelCons preservada).")
else:
    print("DRY_RUN=True → nada inserido; apenas exportado.")


Exportado para: C:\Users\CaioHenriqueFerreira\Downloads\From Git\projects\EcoDrive-Analyst\epa_grouped_output_to_DB.xlsx
Resumo: VDE=4999 | FuelCons=4999 | Skipped=1
DRY_RUN=True → nada inserido; apenas exportado.


In [40]:
# Commit único no final
# delete uma coluna (troque pelo nome que quiser)
df_fc.drop(columns=["eff_flag"], errors="ignore", inplace=True)

# copiar os valores de category para label_program (cria/atualiza a coluna)
df_fc["label_vehicle_category"] = df_vde_to_insert["category"]

DRY_RUN = False
assert len(df_vde_to_insert) == len(df_fc), \
    f"Tamanhos diferentes: VDE={len(df_vde_to_insert)} vs FC={len(df_fc)}"

if not DRY_RUN:
    for vde_row, fc_row in zip(df_vde_to_insert.to_dict("records"), df_fc.to_dict("records")):
        vde_id = insert_vde(vde_row)
        fc_row["vde_id"] = vde_id
        insert_fuelcons(fc_row)
else:
    print("DRY_RUN=True → nada inserido.")

UTILS FOR MANAGING DB

In [None]:
import sqlite3, pandas as pd
from pathlib import Path
#1) Abrir conexão (usa o mesmo caminho do seu db.py)
DB_PATH = Path("data/db/eco_drive.db") 
con = sqlite3.connect(DB_PATH)
con.execute("PRAGMA foreign_keys=ON;")

<sqlite3.Cursor at 0x25b9a518f40>

In [None]:
#2) Ver tabelas existentes
pd.read_sql("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", con)

Unnamed: 0,name
0,fuelcons_db
1,sqlite_sequence
2,vde_db


In [44]:
#3) Ver colunas de uma tabela (schema)
pd.read_sql("PRAGMA table_info(vde_db);", con)         # schema do vde_db
pd.read_sql("PRAGMA table_info(fuelcons_db);", con)    # schema do fuelcons_db

Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,id,INTEGER,0,,1
1,1,created_at,TEXT,0,CURRENT_TIMESTAMP,0
2,2,vde_id,INTEGER,1,,0
3,3,electrification,TEXT,1,,0
4,4,fuel_type,TEXT,0,,0
...,...,...,...,...,...,...
62,62,label_offcycle_energy_factor,REAL,0,,0
63,63,label_offcycle_fuel_factor,REAL,0,,0
64,64,label_fuel_l_per_100km,REAL,0,,0
65,65,label_gco2_per_km,REAL,0,,0


In [11]:
#4) Ver algumas linhas (preview)
#pd.read_sql("SELECT * FROM vde_db LIMIT 50;", con)
pd.read_sql("SELECT * FROM fuelcons_db ORDER BY id DESC LIMIT 50 ;", con)

Unnamed: 0,id,created_at,vde_id,electrification,fuel_type,eta_pt_est,bev_eff_drive,utility_factor_pct,engine_max_power_kw,engine_rpm_max_power,...,label_version_year,label_vehicle_category,label_cycle_set,label_class,label_offcycle_method,label_offcycle_energy_factor,label_offcycle_fuel_factor,label_fuel_l_per_100km,label_gco2_per_km,label_range_km
0,5015,2025-10-19 20:40:05,5033,ICE,,,,,,,...,,,,,,,,,,
1,5014,2025-10-19 20:34:55,5033,ICE,,,,,,,...,,,,,,,,,,
2,5013,2025-10-19 20:24:39,5031,HEV,,,,,,,...,,,,,,,,,,
3,5012,2025-10-19 20:16:54,5031,HEV,,,,,,,...,,,,,,,,,,
4,5011,2025-10-19 20:07:09,5031,HEV,,,,,,,...,,,,,,,,,,
5,5010,2025-10-19 08:10:09,5033,ICE,,,,,,,...,,,,,,,,,,
6,5009,2025-10-19 07:49:08,5033,ICE,,,,,,,...,,,,,,,,,,
7,5008,2025-10-19 07:33:20,5033,ICE,,,,,,,...,,,,,,,,,,
8,5007,2025-10-19 07:30:47,5033,ICE,,,,,,,...,,,,,,,,,,
9,5006,2025-10-19 07:26:35,5033,ICE,,,,,,,...,,,,,,,,,,


In [47]:
#5) quick counts
pd.read_sql("""
SELECT 'vde_db' AS table_name, COUNT(*) AS n FROM vde_db
UNION ALL
SELECT 'fuelcons_db', COUNT(*) FROM fuelcons_db;
""", con)

Unnamed: 0,table_name,n
0,vde_db,4999
1,fuelcons_db,4999


In [27]:
#6)useful counts
# últimos inseridos
pd.read_sql("SELECT * FROM vde_db ORDER BY id DESC LIMIT 20;", con)

# filtrar por marca/ano
pd.read_sql("""
SELECT id, make, model, year, legislation, vde_net_mj_per_km
FROM vde_db
WHERE make='Hyundai' AND year>=2020
ORDER BY year, model;
""", con)

# join simples VDE ↔ FC
pd.read_sql("""
SELECT v.id, v.make, v.model, v.year,
       v.vde_net_mj_per_km, f.energy_Wh_per_km, f.electrification
FROM vde_db v
JOIN fuelcons_db f ON f.vde_id = v.id
ORDER BY v.id DESC
LIMIT 50;
""", con)
# exportar tudo para CSV
df_all = pd.read_sql("SELECT * FROM vde_db;", con)
df_all.to_csv("vde_db_export.csv", index=False)

In [8]:

# 7) Close connection
con.close()

In [6]:
from src.vde_core.db import insert_vde, insert_fuelcons, delete_row
##CLEAN INSERTED COLUMNS
for id in range(5000, 5031):
    try:
        delete_row("fuelcons_db", id)
        delete_row("vde_db", id)
        print(f"Deleted id={id}")
    except Exception as e:
        print(f"Error deleting id={id}: {e}")
    

Deleted id=5000
Deleted id=5001
Deleted id=5002
Deleted id=5003
Deleted id=5004
Deleted id=5005
Deleted id=5006
Deleted id=5007
Deleted id=5008
Deleted id=5009
Deleted id=5010
Deleted id=5011
Deleted id=5012
Deleted id=5013
Deleted id=5014
Deleted id=5015
Deleted id=5016
Deleted id=5017
Deleted id=5018
Deleted id=5019
Deleted id=5020
Deleted id=5021
Deleted id=5022
Deleted id=5023
Deleted id=5024
Deleted id=5025
Deleted id=5026
Deleted id=5027
Deleted id=5028
Deleted id=5029
Deleted id=5030


In [50]:
# to Upper columns
# vde_db
con.execute("""
UPDATE vde_db
SET
  legislation = UPPER(legislation),
  category    = UPPER(category),
  make        = UPPER(make),
  model       = UPPER(model)
""")

# fuelcons_db (se existir; ajuste os nomes conforme seu schema)
#con.execute("""UPDATE fuelcons_db SET scenario_name = UPPER(scenario_name)""")

con.commit()
con.close()
print("Pronto.")

Pronto.


In [None]:

## Utilitário: calcular rrc_N_per_kN a partir de rr_alpha_N e mass_kg
# Pode ser usado para outras colunas colucadas posteriormente
df = pd.read_sql_query("""
    SELECT id, mass_kg, inertia_class, rr_alpha_N, rrc_N_per_kN
    FROM vde_db
    ORDER BY id
""", con)

# Massa: use mass_kg (apenas isso). Se quiser cair para inertia_class, você decide depois.
m = pd.to_numeric(df["mass_kg"], errors="coerce")
A_rr = pd.to_numeric(df["rr_alpha_N"], errors="coerce")

# rrc [N/kN] = A_rr / (m*g/1000)
rrc = A_rr / (m * G / 1000.0)

# Só aceita valores válidos
df["rrc_from_rr_alpha"] = rrc.where((~rrc.isna()) & (rrc > 0))

# Prévia
df_preview = df.loc[df["rrc_from_rr_alpha"].notna(), ["id","rr_alpha_N","mass_kg","rrc_from_rr_alpha"]].head(10)
display(df_preview)
print("Válidos para update:", df["rrc_from_rr_alpha"].notna().sum())

# Persistir apenas onde rrc_N_per_kN está nulo e cálculo existe
upd = df[(df["rrc_N_per_kN"].isna()) & (df["rrc_from_rr_alpha"].notna())][["id","rrc_from_rr_alpha"]]

with sqlite3.connect(DB_PATH) as con:
    con.executemany(
        "UPDATE vde_db SET rrc_N_per_kN=? WHERE id=?",
        [(float(v), int(i)) for i, v in zip(upd["id"], upd["rrc_from_rr_alpha"])]
    )
    con.commit()

print(f"Atualizadas {len(upd)} linhas.")

Unnamed: 0,id,rr_alpha_N,mass_kg,rrc_from_rr_alpha
0,1,202.026797,1962.0,10.5
2,3,187.719057,1962.0,9.756379
3,4,190.288237,1848.0,10.5
14,15,141.025622,1848.0,7.781716
15,16,160.004701,1962.0,8.315973
16,17,166.749471,2075.0,8.194561
18,19,174.054518,2189.0,8.108096
19,20,195.628393,2302.0,8.665745
26,27,161.000676,2189.0,7.5
27,28,161.000676,2189.0,7.5


Válidos para update: 2958
Atualizadas 2958 linhas.


In [9]:
import plotly.express as px
import pandas as pd

# 1) Carrega e remove colunas duplicadas do JOIN
df_plot = pd.read_sql(
    "SELECT * FROM vde_db JOIN fuelcons_db ON vde_db.id = fuelcons_db.vde_id;",
    con
)
df_plot = df_plot.loc[:, ~df_plot.columns.duplicated(keep="first")]

# 2) Só ICE (tolerante) + drop de NaN necessários
df_ice = df_plot[~df_plot["electrification"].astype(str).str.upper().isin(["BEV","PHEV"])]
df_ice = df_ice.dropna(subset=["vde_net_mj_per_km", "fuel_l_per_100km"])

# 3) Scatter simples
fig = px.scatter(
    df_ice,
    x="vde_net_mj_per_km",
    y="fuel_l_per_100km",
    labels={"vde_net_mj_per_km":"vde_net_mj_per_km", "fuel_l_per_100km":"fuel_l_per_100km"},
    title="ICE — vde_net_mj_per_km vs fuel_l_per_100km"
)
fig.show()


**BACKUP**

In [None]:
# DELETAR DATABASE (CUIDADO: apaga tudo!) // DO NOT RUN UNLESS INTENDED

from src.vde_core.db  import truncate_db, delete_db_file, ensure_db

DB_PATH = "data/db/eco_drive.db"  # confirme o MESMO caminho usado no db.py
#truncate_db(DB_PATH) #apaga todas as linhas de todas as tabelas
ensure_db()


#delete_db_file(DB_PATH) # Deleta o arquivo do banco de dados
#ensure_db()  # cria o arquivo e as tabelas novamente




✔️ DB truncado: data\db\eco_drive.db


In [None]:
from src.vde_core.utils import load_tire_catalog
tire_path = ABS_DIR.parent / 'data' / 'standards' / 'tiresize_fromcode_table.csv' 
TIRE_CSV =Path(tire_path)
tires_df = load_tire_catalog(TIRE_CSV)

display(tires_df.head(10))

KeyError: ['tire_size']

In [None]:
from src.vde_core.services import wltp_phases_from_phase
tire_path = ABS_DIR.parent / 'data' / 'standards' / 'WLTP_Class3ab.csv' 
CYCLES_CSV =Path(tire_path)
df_cycles = pd.read_csv(CYCLES_CSV)

px.scatter(df_cycles, x="t", y="v", color="phase_name", title="WLTP Class 3a/b Phases)

: 