In [0]:
import argparse, json, runpy, sys, re
from pathlib import Path
from datetime import datetime

from pyspark.sql import SparkSession, functions as F

CATALOG = "nyc_taxi"
SCHEMA  = "data_area"

def spark():
    return SparkSession.builder.getOrCreate()

def read_cfg(p: str|Path) -> dict:
    p = Path(p)
    if not p.exists():
        raise FileNotFoundError(f"config.json não encontrado: {p}")
    return json.loads(p.read_text())

def list_parquets(prefix: str):
    # exige Databricks (dbutils)
    return [f for f in dbutils.fs.ls(prefix) if f.path.endswith(".parquet")]  # type: ignore

def ensure_exec_logs():
    s = spark()
    s.sql(f"USE CATALOG {CATALOG}"); s.sql(f"USE SCHEMA {SCHEMA}")
    s.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.pipeline_exec_log
    (
      stage        STRING,
      started_at   TIMESTAMP,
      finished_at  TIMESTAMP,
      status       STRING,
      message      STRING
    )
    USING delta
    """)
    s.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.ingestion_log
    (
      file_name     STRING,
      file_path     STRING,
      file_mod_time TIMESTAMP,
      file_size     BIGINT,
      status        STRING,
      processed_at  TIMESTAMP
    )
    USING delta
    """)

def log_exec(stage: str, status: str, message: str = ""):
    s = spark()
    now = datetime.utcnow()
    s.createDataFrame([(stage, now, now, status, message)],
                      "stage string, started_at timestamp, finished_at timestamp, status string, message string"
                      ).write.mode("append").saveAsTable(f"{CATALOG}.{SCHEMA}.pipeline_exec_log")

def detect_new_raw(cfg: dict) -> list[tuple]:
    base = cfg["base_path"]
    raw  = f'{base}{cfg["paths"]["raw"]}'
    files = list_parquets(raw)
    if not files: return []

    s = spark()
    s.sql(f"USE CATALOG {CATALOG}"); s.sql(f"USE SCHEMA {SCHEMA}")
    df_log = s.table(f"{CATALOG}.{SCHEMA}.ingestion_log").select("file_name").distinct()
    already = set(r["file_name"] for r in df_log.collect()) if df_log.count() > 0 else set()

    new = []
    for f in files:
        name = Path(f.path).name
        if name not in already:
            # tenta extrair yyyy-mm do nome (ex.: yellow_tripdata_2023-04.parquet)
            m = re.search(r"(\d{4})-(\d{2})\.parquet$", name)
            y, mth = (int(m.group(1)), int(m.group(2))) if m else (None, None)
            new.append((name, f.path, f.modificationTime, f.size, y, mth))
    return new

def mark_ingested(rows: list[tuple], status: str, msg: str = ""):
    if not rows: return
    s = spark()
    now = datetime.utcnow()
    df = s.createDataFrame(
        [(n, p, datetime.fromtimestamp(mt/1000.0), sz, status, now) for (n,p,mt,sz, *_rest) in rows],
        "file_name string, file_path string, file_mod_time timestamp, file_size long, status string, processed_at timestamp"
    )
    df.write.mode("append").saveAsTable(f"{CATALOG}.{SCHEMA}.ingestion_log")

def run_stage_py(py_path: str, injected_globals: dict|None = None):
    """Executa um .py arbitrário no mesmo processo, injetando variáveis globais se necessário."""
    injected_globals = injected_globals or {}
    # run_path executa o arquivo como se fosse __main__ mas aqui controlamos globals
    runpy.run_path(py_path, init_globals=injected_globals)

def main():
    parser = argparse.ArgumentParser(description="Automation runner (bronze -> silver -> gold)")
    parser.add_argument("--config", default="/Workspace/Users/joycelnog@gmail.com/pratica_ed/config.json")
    parser.add_argument("--stages", default="bronze,silver,gold",
                        help="Lista de estágios separados por vírgula (bronze,silver,gold)")
    parser.add_argument("--only-if-new", action="store_true",
                        help="Só roda Bronze se houver novos arquivos em RAW; se não, pula Bronze.")
    parser.add_argument("--overwrite", default="true", choices=["true","false"],
                        help="Sinaliza aos scripts para sobrescrever (se suportado).")
    parser.add_argument("--scripts-root", default="/Workspace/Users/joycelnog@gmail.com/pratica_ed/scripts",
                        help="Diretório onde estão bronze.py, silver.py, gold.py")
    args = parser.parse_args()

    cfg = read_cfg(args.config)
    ensure_exec_logs()

    stages = [s.strip().lower() for s in args.stages.split(",") if s.strip()]
    scripts_root = Path(args.scripts_root)

    # caminhos dos seus scripts
    bronze_py = str(scripts_root / "bronze.py")
    silver_py = str(scripts_root / "silver.py")
    gold_py   = str(scripts_root / "gold.py")

    # variáveis que serão injetadas nos seus .py (se quiser ler)
    injected = {
        "CONFIG_FILE": args.config,
        "OVERWRITE": args.overwrite.lower() == "true"
    }

    # --- BRONZE ---
    if "bronze" in stages:
        try:
            new_files = detect_new_raw(cfg) if args.only-if-new else None
        except Exception as e:
            log_exec("bronze", "ERROR", f"detecção de novos arquivos falhou: {e}")
            raise

        if args.only_if_new and new_files is not None and len(new_files) == 0:
            log_exec("bronze", "SKIPPED", "sem novos arquivos em RAW")
        else:
            try:
                log_exec("bronze", "START")
                # Se quiser, injete a lista de novos arquivos para seu bronze.py usar:
                if new_files is not None:
                    injected["NEW_FILES"] = [f for (_, f, *_rest) in new_files]
                run_stage_py(bronze_py, injected)
                # marca como sucesso no log de ingestão
                if new_files:
                    mark_ingested(new_files, "SUCCESS")
                log_exec("bronze", "SUCCESS")
            except Exception as e:
                # marca erro para os arquivos tentados
                if new_files:
                    mark_ingested(new_files, "ERROR", msg=str(e)[:500])
                log_exec("bronze", "ERROR", str(e)[:500])
                raise

    # --- SILVER ---
    if "silver" in stages:
        try:
            log_exec("silver", "START")
            run_stage_py(silver_py, injected)
            log_exec("silver", "SUCCESS")
        except Exception as e:
            log_exec("silver", "ERROR", str(e)[:500])
            raise

    # --- GOLD ---
    if "gold" in stages:
        try:
            log_exec("gold", "START")
            run_stage_py(gold_py, injected)
            log_exec("gold", "SUCCESS")
        except Exception as e:
            log_exec("gold", "ERROR", str(e)[:500])
            raise

if __name__ == "__main__":
    main()
