In [0]:
# Databricks: 01_ingest_bronze.py
# Purpose: Ingest raw CSVs -> Delta "bronze_*" tables.
# - SOURCE: "gcs" (recommended) or "dbfs"
# - For GCS: store your SA key in Databricks Secrets (scope/key), we write it to /local_disk0/gcp.json and set Spark/Hadoop confs.
# - Works with or without Unity Catalog. If CATALOG is blank, we use hive_metastore default and fully qualify as <SCHEMA>.<TABLE>.

# -----------------------------
# Widgets / Parameters
# -----------------------------
dbutils.widgets.text("SOURCE", "gcs")                   # "gcs" or "dbfs"
dbutils.widgets.text("GCP_SECRET_SCOPE", "gcp-secrets") # only used if SOURCE="gcs"
dbutils.widgets.text("GCP_SECRET_KEY", "gcs-key-json")  # only used if SOURCE="gcs"
dbutils.widgets.text("GCP_PROJECT_ID", "")              # optional; helpful for logs
dbutils.widgets.text("BUCKET", "de-portfolio-20250909-db-471908305")    # only used if SOURCE="gcs"
dbutils.widgets.text("DBFS_RAW_DIR", "dbfs:/FileStore/tables/raw")  # only if SOURCE="dbfs"

dbutils.widgets.text("CATALOG", "")          # e.g., "main" if UC is enabled; leave blank for non-UC
dbutils.widgets.text("SCHEMA", "demo_finance")

# -----------------------------
# Read widgets
# -----------------------------
SOURCE           = dbutils.widgets.get("SOURCE").strip().lower()
SCOPE            = dbutils.widgets.get("GCP_SECRET_SCOPE").strip()
SECRET_KEY       = dbutils.widgets.get("GCP_SECRET_KEY").strip()
PROJECT_ID       = dbutils.widgets.get("GCP_PROJECT_ID").strip()
BUCKET           = dbutils.widgets.get("BUCKET").strip()
DBFS_RAW_DIR     = dbutils.widgets.get("DBFS_RAW_DIR").strip().rstrip("/")

CATALOG          = dbutils.widgets.get("CATALOG").strip()
SCHEMA           = dbutils.widgets.get("SCHEMA").strip()

if SOURCE not in {"gcs", "dbfs"}:
    raise ValueError(f"SOURCE must be 'gcs' or 'dbfs', got: {SOURCE}")

if not SCHEMA:
    raise ValueError("SCHEMA must not be empty")

# -----------------------------
# Helper: qualify table name
# -----------------------------
def fqtn(table: str) -> str:
    if CATALOG:
        return f"{CATALOG}.{SCHEMA}.{table}"
    else:
        return f"{SCHEMA}.{table}"

# -----------------------------
# Optional: Use UC catalog (if provided)
# -----------------------------
if CATALOG:
    # If your workspace is not on UC, this will fail; in that case leave CATALOG blank.
    spark.sql(f"USE CATALOG {CATALOG}")

# Create schema / database and USE it
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")
spark.sql(f"USE SCHEMA {SCHEMA}")

# -----------------------------
# Configure source paths
# -----------------------------
if SOURCE == "gcs":
    if not BUCKET or BUCKET.startswith("<your-bucket"):
        raise ValueError("Set the BUCKET widget to your GCS bucket name (e.g., de-portfolio-2025-xxxx).")

    # Pull key from Secrets -> write local keyfile
    keyfile = "/local_disk0/gcp.json"
    creds_json = dbutils.secrets.get(scope=SCOPE, key=SECRET_KEY)
    with open(keyfile, "w") as f:
        f.write(creds_json)

    # Force Hadoop GCS connector to use this keyfile
    spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
    spark.conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", keyfile)

    # Some clusters use the 'fs.gs.*' namespace as well—set both
    spark.conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true")
    spark.conf.set("spark.hadoop.fs.gs.auth.service.account.json.keyfile", keyfile)
    if PROJECT_ID:
        spark.conf.set("spark.hadoop.fs.gs.project.id", PROJECT_ID)

    # Also set ADC env for driver/executors (belt & suspenders)
    spark.conf.set("spark.driverEnv.GOOGLE_APPLICATION_CREDENTIALS", keyfile)
    spark.conf.set("spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS", keyfile)

    # Live Hadoop conf (for immediate use)
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set("google.cloud.auth.service.account.enable", "true")
    hconf.set("google.cloud.auth.service.account.json.keyfile", keyfile)
    hconf.set("fs.gs.auth.service.account.enable", "true")
    hconf.set("fs.gs.auth.service.account.json.keyfile", keyfile)
    if PROJECT_ID:
        hconf.set("fs.gs.project.id", PROJECT_ID)

    subs_path = f"gs://{BUCKET}/raw/subscriptions/*.csv"
    pays_path = f"gs://{BUCKET}/raw/payments/*.csv"
    cbs_path  = f"gs://{BUCKET}/raw/chargebacks/*.csv"

    # Optional sanity check listing
    try:
        print("Listing gs:// paths...")
        print(dbutils.fs.ls(f"gs://{BUCKET}/raw/"))
    except Exception as e:
        print("Warning: could not list GCS raw/ folder (will still attempt reads).", str(e))

else:  # DBFS
    subs_path = f"{DBFS_RAW_DIR}/subscriptions.csv"
    pays_path = f"{DBFS_RAW_DIR}/payments.csv"
    cbs_path  = f"{DBFS_RAW_DIR}/chargebacks.csv"

    # Optional sanity check listing
    print("Listing DBFS raw dir:", DBFS_RAW_DIR)
    print(dbutils.fs.ls(DBFS_RAW_DIR))

# -----------------------------
# Read CSVs
# -----------------------------
def read_csv(path: str):
    df = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(path))
    return df

try:
    subs_df = read_csv(subs_path)
    pays_df = read_csv(pays_path)
    cbs_df  = read_csv(cbs_path)
except Exception as e:
    msg = f"Failed to read CSVs. Check SOURCE paths and credentials.\nsubs_path={subs_path}\npays_path={pays_path}\ncbs_path={cbs_path}\nError={e}"
    raise RuntimeError(msg)

# -----------------------------
# Write Delta "bronze_*" tables
# -----------------------------
tables = {
    "bronze_subscriptions": subs_df,
    "bronze_payments":      pays_df,
    "bronze_chargebacks":   cbs_df
}

for name, df in tables.items():
    target = fqtn(name)
    (df.write.mode("overwrite").format("delta").saveAsTable(target))
    print(f"Wrote table: {target} (mode=overwrite)")

# -----------------------------
# Row counts / verification
# -----------------------------
for name in tables.keys():
    target = fqtn(name)
    cnt = spark.table(target).count()
    print(f"{target} -> {cnt} rows")

print("✅ Bronze ingest complete.")
