In [0]:
# 03_Run - ETL runner (delimiter detection + headers + parquet/delta + incremental + audit columns)
# Paste this entire content into a Databricks Python notebook named "03_Run"

import sys, importlib, re, traceback, json, uuid
from datetime import datetime
from pyspark.sql.functions import col, trim, lit, current_timestamp, to_timestamp
from pyspark.sql import SparkSession

# ensure utils module importable from /FileStore
if "/dbfs/FileStore" not in sys.path:
    sys.path.insert(0, "/dbfs/FileStore")
try:
    import utils_etl
    importlib.reload(utils_etl)
    print("Imported utils_etl OK.")
except Exception as e:
    print("Failed to import utils_etl - ensure /FileStore/utils_etl.py exists")
    raise

# ==============
# Widgets (ADF should pass)
# ==============
dbutils.widgets.text("domain", "Finance")
dbutils.widgets.text("file_name", "")            # e.g. Sales.Currency or Sales.Currency.csv
dbutils.widgets.text("column_list", "")          # JSON array OR CSV
dbutils.widgets.text("year_column", "")          # e.g. ModifiedDate (optional)
dbutils.widgets.text("table_name", "")           # folder name override (optional)

dbutils.widgets.text("direct_account_key", "")   # raw account key (recommended)
dbutils.widgets.text("BASE_RAW_PATH", "")        # optional override
dbutils.widgets.text("BASE_BRONZE_PATH", "")     # optional override
dbutils.widgets.text("include_layer", "false")   # if you want /<layer> in path

# incremental / target options
dbutils.widgets.text("incremental", "true")     # true|false
dbutils.widgets.text("target_format", "parquet")# parquet|delta
dbutils.widgets.text("merge", "false")          # true|false (only for delta)
dbutils.widgets.text("pk_columns", "")          # comma-separated, required for merge

# read widget values
DOMAIN = dbutils.widgets.get("domain").strip()
FILE_NAME_IN = dbutils.widgets.get("file_name").strip()
COLUMN_LIST_WIDGET = dbutils.widgets.get("column_list").strip()
YEAR_COLUMN_WIDGET = dbutils.widgets.get("year_column").strip()
TABLE_NAME_WIDGET = dbutils.widgets.get("table_name").strip()
DIRECT_KEY = dbutils.widgets.get("direct_account_key").strip()
BASE_RAW_WIDGET = dbutils.widgets.get("BASE_RAW_PATH").strip()
BASE_BRONZE_WIDGET = dbutils.widgets.get("BASE_BRONZE_PATH").strip()
INCLUDE_LAYER = dbutils.widgets.get("include_layer").strip().lower() in ("true","1","yes","y")

INCREMENTAL = dbutils.widgets.get("incremental").strip().lower() in ("true","1","yes","y")
TARGET_FORMAT = dbutils.widgets.get("target_format").strip().lower()  # parquet or delta
MERGE = dbutils.widgets.get("merge").strip().lower() in ("true","1","yes","y")
PK_COLUMNS_WIDGET = dbutils.widgets.get("pk_columns").strip()

# storage config defaults
STORAGE_ACCOUNT = "scrgvkrmade"
RAW_CONTAINER = "project"
BRONZE_CONTAINER = "bronze"

# ---------- helpers ----------
def detect_delimiter(file_path, sample_size=8192):
    """Return best-guess delimiter from sample."""
    text = None
    try:
        text = dbutils.fs.head(file_path, sample_size)
    except Exception:
        return ","
    counts = {",": text.count(","), "|": text.count("|"), ";": text.count(";"), "\t": text.count("\t")}
    delim = max(counts, key=counts.get)
    if counts[delim] == 0:
        return ","
    return delim

def set_storage_key(k):
    if not k:
        return
    if (k.startswith('"') and k.endswith('"')) or (k.startswith("'") and k.endswith("'")):
        k = k[1:-1]
    k = k.strip()
    if not re.fullmatch(r"[A-Za-z0-9+/=]{20,300}", k):
        raise Exception("direct_account_key looks invalid; pass raw account key (base64), not connection string")
    spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT}.dfs.core.windows.net", k)
    print("Set storage key for", STORAGE_ACCOUNT)
    # quick test
    try:
        display(dbutils.fs.ls(f"abfss://{RAW_CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/"))
    except Exception as e:
        print("Warning: test listing raw container failed (invalid key or container).")
        raise

# resolve base paths task or widget
def task_or_widget(key, widget_name):
    try:
        v = dbutils.jobs.taskValues.get(taskKey=key, key=key)
        if hasattr(v,"value"): return v.value
        return v
    except Exception:
        return dbutils.widgets.get(widget_name).strip()

# ------------------ set storage key early ------------------
if DIRECT_KEY:
    set_storage_key(DIRECT_KEY)
else:
    print("No direct_account_key provided; cluster auth must provide access to ADLS.")

# base path resolution
BASE_RAW_TASK = task_or_widget("BASE_RAW_PATH","BASE_RAW_PATH")
BASE_BRONZE_TASK = task_or_widget("BASE_BRONZE_PATH","BASE_BRONZE_PATH")

if BASE_RAW_TASK:
    BASE_RAW_PATH = BASE_RAW_TASK
elif BASE_RAW_WIDGET:
    BASE_RAW_PATH = BASE_RAW_WIDGET
elif DIRECT_KEY:
    BASE_RAW_PATH = f"abfss://{RAW_CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net"
else:
    raise Exception("BASE_RAW_PATH not resolved. Provide direct_account_key or BASE_RAW_PATH widget or run 01_Config.")

if BASE_BRONZE_TASK:
    BASE_BRONZE_PATH = BASE_BRONZE_TASK
elif BASE_BRONZE_WIDGET:
    BASE_BRONZE_PATH = BASE_BRONZE_WIDGET
elif DIRECT_KEY:
    BASE_BRONZE_PATH = f"abfss://{BRONZE_CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net"
else:
    raise Exception("BASE_BRONZE_PATH not resolved. Provide direct_account_key or BASE_BRONZE_PATH widget or run 01_Config.")

print("BASE_RAW_PATH:", BASE_RAW_PATH)
print("BASE_BRONZE_PATH:", BASE_BRONZE_PATH)
print("INCREMENTAL:", INCREMENTAL, "TARGET_FORMAT:", TARGET_FORMAT, "MERGE:", MERGE)

# ---------- normalize file and metadata ----------
if not FILE_NAME_IN:
    raise Exception("file_name widget required (e.g. Sales.Currency.csv)")
file_key = FILE_NAME_IN
file_key_no_ext = file_key[:-4] if file_key.lower().endswith(".csv") else file_key
folder_table_name = TABLE_NAME_WIDGET if TABLE_NAME_WIDGET else file_key_no_ext

# parse column_list
def parse_column_list(text):
    txt = text.strip()
    if not txt:
        return []
    txt = txt.replace('""','"')
    if txt.startswith("[") and txt.endswith("]"):
        try:
            arr = json.loads(txt)
            return [str(x).strip() for x in arr if x and str(x).strip()!=""]
        except Exception:
            txt2 = txt.strip("[]")
            return [c.strip().strip('"').strip("'") for c in txt2.split(",") if c.strip()!='']
    return [c.strip().strip('"').strip("'") for c in txt.split(",") if c.strip()!='']

if COLUMN_LIST_WIDGET:
    columns = parse_column_list(COLUMN_LIST_WIDGET)
else:
    raise Exception("column_list missing. Pass column_list from ADF Lookup (JSON array or CSV).")

if not columns:
    raise Exception("Parsed columns list is empty. Provide valid column_list.")

# determine year/timestamp column to use for incremental
year_hint = YEAR_COLUMN_WIDGET if YEAR_COLUMN_WIDGET else None
if not year_hint and any(c.lower()=="modifieddate" for c in columns):
    year_hint = [c for c in columns if c.lower()=="modifieddate"][0]
    print("Defaulting year_column to ModifiedDate")

if not year_hint and INCREMENTAL:
    print("Warning: incremental=true but no year_column provided. Incremental load will compare _ingest_ts only (may load all rows).")

# ---------- read raw csv ----------
raw_read_name = FILE_NAME_IN if FILE_NAME_IN and FILE_NAME_IN.strip()!="" else file_key_no_ext + ".csv"
raw_path = BASE_RAW_PATH.rstrip("/") + "/" + raw_read_name.lstrip("/")
print("Raw file path:", raw_path)

detected_sep = detect_delimiter(raw_path, sample_size=8192)
print("Detected delimiter:", repr(detected_sep))

try:
    df_raw = (spark.read
                .option("header","false")
                .option("sep", detected_sep)
                .option("quote", '"')
                .option("escape", "\\")
                .option("multiLine", "true")
                .option("inferSchema", "false")
                .csv(raw_path))
    print("Raw row count:", df_raw.count())
    display(df_raw.limit(5))
except Exception:
    print("Fallback to comma separator")
    df_raw = (spark.read
                .option("header","false")
                .option("sep", ",")
                .option("quote", '"')
                .option("escape", "\\")
                .option("multiLine", "true")
                .option("inferSchema", "false")
                .csv(raw_path))
    print("Fallback row count:", df_raw.count())
    display(df_raw.limit(5))

# apply headers
df_named = utils_etl.add_headers(df_raw, columns)
for c in df_named.columns:
    df_named = df_named.withColumn(c, trim(col(c)))

print("After applying headers sample:")
display(df_named.limit(5))

# ---------- prepare audit columns & normalized timestamp column ----------
INGEST_RUNID = str(uuid.uuid4())
INGEST_TS = datetime.utcnow().isoformat()  # string representation
df_named = df_named.withColumn("_ingest_runid", lit(INGEST_RUNID)) \
                   .withColumn("_ingest_ts", lit(INGEST_TS)) \
                   .withColumn("_source_file", lit(raw_read_name)) \
                   .withColumn("_source_path", lit(raw_path))

# normalize year/timestamp column: create _src_ts (timestamp) and _year (int)
# If year_hint provided, try to parse it to timestamp; otherwise create _src_ts from _ingest_ts
if year_hint and year_hint in df_named.columns:
    # attempt to convert the source column to timestamp (accepts many formats)
    try:
        df_named = df_named.withColumn("_src_ts", to_timestamp(col(year_hint)))
    except Exception:
        # fallback: cast to timestamp via to_timestamp
        df_named = df_named.withColumn("_src_ts", to_timestamp(col(year_hint)))
else:
    # no year column, fallback to current ingest ts
    df_named = df_named.withColumn("_src_ts", to_timestamp(lit(INGEST_TS)))

# compute _year from _src_ts (fallback to ingest year)
df_named = df_named.withColumn("_year", col("_src_ts").cast("date"))
# If you prefer integer year:
from pyspark.sql.functions import year as spark_year
df_named = df_named.withColumn("_year", spark_year(col("_src_ts")).cast("int"))

print("Sample with audit and _year/_src_ts:")
display(df_named.limit(5))

# ---------- incremental filtering ----------
def get_existing_max_src_ts(bronze_base, table_name):
    """Read existing target (parquet or delta) and return max(_src_ts) (timestamp) or None."""
    import pyspark.sql.functions as F
    try:
        # attempt to read any existing files under table path (uses Delta or parquet)
        df_exist = None
        if TARGET_FORMAT == "delta":
            # delta read (if delta table exists at path)
            try:
                df_exist = spark.read.format("delta").load(f"{bronze_base.rstrip('/')}/{table_name}")
            except Exception:
                # maybe delta table doesn't exist, fall back to parquet
                df_exist = None
        if df_exist is None:
            # try parquet
            df_exist = spark.read.parquet(f"{bronze_base.rstrip('/')}/{table_name}")
        if "_src_ts" in df_exist.columns:
            mx = df_exist.select(F.max(col("_src_ts")).alias("mx")).collect()[0]["mx"]
            return mx
        # else try year_column if present
        if year_hint and year_hint in df_exist.columns:
            mx = df_exist.select(F.max(to_timestamp(col(year_hint))).alias("mx")).collect()[0]["mx"]
            return mx
        return None
    except Exception as e:
        # no existing data or error reading -> treat as no existing
        print("No existing target data found or error reading existing:", e)
        return None

bronze_base = BASE_BRONZE_PATH.rstrip("/") + f"/{DOMAIN}" if not INCLUDE_LAYER else BASE_BRONZE_PATH.rstrip("/") + f"/{DOMAIN}/Bronze"
print("Bronze base:", bronze_base)
target_table_path = f"{bronze_base.rstrip('/')}/{folder_table_name}"

existing_max = None
if INCREMENTAL:
    existing_max = get_existing_max_src_ts(bronze_base, folder_table_name)
    print("Existing max _src_ts:", existing_max)

# filter incoming rows
if INCREMENTAL and existing_max is not None:
    # only new rows where _src_ts > existing_max
    df_to_write = df_named.filter(col("_src_ts") > lit(existing_max))
    print("Rows after incremental filter:", df_to_write.count())
else:
    df_to_write = df_named
    print("No incremental filter applied (full load). Rows:", df_to_write.count())

if df_to_write.count() == 0:
    print("No new rows to write. Exiting.")
    dbutils.notebook.exit("No new rows")
    
# ---------- write: parquet or delta (with optional merge) ----------
if TARGET_FORMAT == "delta" and MERGE:
    # require PK columns
    if not PK_COLUMNS_WIDGET:
        raise Exception("merge=true but pk_columns not provided. Pass comma-separated PK column names in pk_columns widget.")
    pk_cols = [c.strip() for c in PK_COLUMNS_WIDGET.split(",") if c.strip()!='']
    # create Delta table path if not exists, then perform MERGE using pk columns
    from delta.tables import DeltaTable
    import pyspark.sql.functions as F
    # ensure target directory exists (or Delta will create it on first write below)
    try:
        # if delta table exists, perform merge
        delta_exists = False
        try:
            DeltaTable.forPath(spark, target_table_path)
            delta_exists = True
        except Exception:
            delta_exists = False

        if not delta_exists:
            # write initial delta (overwrite) then continue with merge semantics
            df_to_write.write.format("delta").mode("overwrite").option("overwriteSchema","true").save(target_table_path)
            print(f"Delta table created at {target_table_path}")
        else:
            # perform merge: match on pk_cols
            deltaTable = DeltaTable.forPath(spark, target_table_path)
            alias_target = "t"
            alias_source = "s"
            # build merge condition
            cond = " AND ".join([f"{alias_target}.{c} = {alias_source}.{c}" for c in pk_cols])
            # do MERGE: update matched, insert not matched
            deltaTable.alias(alias_target).merge(
                df_to_write.alias(alias_source),
                cond
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
            print("Delta MERGE executed.")
    except Exception:
        traceback.print_exc()
        raise
else:
    # write parquet files (one folder per year)
    print("Writing parquet to:", target_table_path)
    # reuse utils_etl writer which writes by year into path/<table>/<year>
    utils_etl.write_parquet_by_year(df_to_write, bronze_base, folder_table_name, compression="snappy", coalesce_out=True, write_mode="append" if INCREMENTAL else "overwrite")

# ---------- confirm output ----------
years = [r["_year"] for r in df_to_write.select("_year").distinct().collect()]
print("Outputs written for years:", years)
for y in years:
    out_path = f"{bronze_base.rstrip('/')}/{folder_table_name}/{y}"
    print("Listing:", out_path)
    try:
        for f in dbutils.fs.ls(out_path):
            print(" -", f.path)
    except Exception as e:
        print("Could not list:", out_path, e)

print("03_Run finished. IngestRunId:", INGEST_RUNID)
