In [0]:
dbutils.widgets.text("hours_back", "24", "Hours Back")
dbutils.widgets.text("TARGET_TAG_KEY", "etl", "Target Tag Key")
dbutils.widgets.text("TARGET_TAG_VALUES", "ADS_RDS", "Target Tag Values (comma-separated)")
dbutils.widgets.text("CATALOG", "gap_catalog", "Target Catalog")
dbutils.widgets.text("SCHEMA", "ads_owner", "Target Schema")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Read widget values
hours_back        = int(dbutils.widgets.get("hours_back"))
TARGET_TAG_KEY    = dbutils.widgets.get("TARGET_TAG_KEY")
TARGET_TAG_VALUES = [v.strip() for v in dbutils.widgets.get("TARGET_TAG_VALUES").split(",") if v.strip()]
CATALOG           = dbutils.widgets.get("CATALOG")
SCHEMA            = dbutils.widgets.get("SCHEMA")

RUN_TABLE  = f"{CATALOG}.{SCHEMA}.job_run_costs"
TASK_TABLE = f"{CATALOG}.{SCHEMA}.job_run_task_costs"
TARGET_CURRENCY  = "USD"

# =========================================
# Helpers
# =========================================
def table_exists(catalog: str, schema: str, name: str) -> bool:
    return (
        spark.sql(f"SHOW TABLES IN {catalog}.{schema}")
             .filter(F.col("tableName") == name)
             .limit(1).count() > 0
    )

def current_prices(currency: str):
    # Unit price = COALESCE(promotional.default, effective_list.default, default)
    raw = (
        spark.read.table("system.billing.list_prices")
        .filter(F.col("currency_code") == currency)
        .filter(
            (F.col("price_start_time") <= F.current_timestamp()) &
            (F.col("price_end_time").isNull() | (F.col("price_end_time") >= F.current_timestamp()))
        )
        .select(
            "sku_name", "price_start_time",
            F.coalesce(
                F.col("pricing.promotional.default"),
                F.col("pricing.effective_list.default"),
                F.col("pricing.default")
            ).cast("double").alias("unit_price")
        )
    )
    w = Window.partitionBy("sku_name").orderBy(F.col("price_start_time").desc_nulls_last())
    return (
        raw.withColumn("rn", F.row_number().over(w))
           .filter("rn = 1")
           .select(F.col("sku_name").alias("price_sku_name"),
                   F.col("unit_price").alias("dbu_rate"))
    )

# =========================================
# Cutoff & tag prep
# =========================================
cutoff_ts = F.expr(f"current_timestamp() - INTERVAL {hours_back} HOURS")
now_ts    = F.current_timestamp()
vals_lc   = [v.lower() for v in TARGET_TAG_VALUES]

# =========================================
# Tagged jobs
# =========================================
df_jobs_tagged = (
    spark.read.table("system.lakeflow.jobs")
    .select("workspace_id", "job_id", "name", "tags")
    .withColumn("tag_val_lc", F.lower(F.element_at("tags", F.lit(TARGET_TAG_KEY))))
    .filter(F.col("tag_val_lc").isin(vals_lc))
    .select("workspace_id", "job_id", "name")
    .dropDuplicates(["workspace_id", "job_id"])
)

# =========================================
# RUNS (workflow-level) — aggregate slices -> 1 row/run
# Use overlap filter so we don't miss runs that started earlier
# =========================================
df_run_slices = (
    spark.read.table("system.lakeflow.job_run_timeline")
    .filter(
        (F.col("period_start_time") <= now_ts) &
        (F.col("period_end_time").isNull() | (F.col("period_end_time") >= cutoff_ts))
    )
    .select("workspace_id", "job_id", "run_id",
            "period_start_time", "period_end_time", "result_state")
    .withColumnRenamed("result_state", "status")
    .join(df_jobs_tagged.select("workspace_id","job_id","name"), ["workspace_id","job_id"], "inner")
)

df_runs = (
    df_run_slices
    .groupBy("workspace_id","job_id","run_id","name")
    .agg(
        F.min("period_start_time").alias("period_start_time"),
        F.max("period_end_time").alias("period_end_time"),
        F.max("status").alias("status")
    )
    .withColumn("period_end_time", F.coalesce("period_end_time", now_ts))
    .withColumn(
        "duration_minutes",
        (F.col("period_end_time").cast("double") - F.col("period_start_time").cast("double")) / 60.0
    )
    .withColumn("job_name", F.coalesce(F.col("name"), F.concat(F.lit("job_id="), F.col("job_id"))))
    .select("workspace_id","job_id","run_id","period_start_time","period_end_time","status","duration_minutes","job_name")
)

# =========================================
# Usage & costs (run-level)
# =========================================
df_prices = current_prices(TARGET_CURRENCY)

df_usage = (
    spark.read.table("system.billing.usage")
    .select(
        F.col("workspace_id"),
        F.col("usage_metadata.job_run_id").cast("string").alias("run_id"),
        F.col("sku_name"),
        F.col("usage_quantity").cast("double").alias("usage_quantity")
    )
    .filter("run_id IS NOT NULL")
)

df_billing = (
    df_usage
    .join(df_prices, df_usage.sku_name == df_prices.price_sku_name, "left")
    .groupBy("run_id")
    .agg(
        F.round(F.sum(F.col("usage_quantity") * F.coalesce(F.col("dbu_rate"), F.lit(0.0))), 6)
            .alias("total_cost_usd"),
        F.round(F.sum("usage_quantity"), 6).alias("total_dbus_used")
    )
)

# =========================================
# TASK RUNS (subtasks) — aggregate slices -> 1 row per (run_id, task_key)
# Use overlap filter like runs
# =========================================
df_task_slices = (
    spark.read.table("system.lakeflow.job_task_run_timeline")
    .filter(
        (F.col("period_start_time") <= now_ts) &
        (F.col("period_end_time").isNull() | (F.col("period_end_time") >= cutoff_ts))
    )
    .select("workspace_id", "job_id", "run_id", "job_run_id", "task_key",
            "period_start_time", "period_end_time", "result_state")
    .withColumnRenamed("result_state", "status")
    .withColumn("period_end_time", F.coalesce("period_end_time", now_ts))
    .withColumn(
        "slice_minutes",
        (F.col("period_end_time").cast("double") - F.col("period_start_time").cast("double")) / 60.0
    )
)

df_task_durations = (
    df_task_slices
    .groupBy("workspace_id","job_id","job_run_id","task_key")
    .agg(
        F.sum("slice_minutes").alias("task_duration_minutes"),
        F.max("status").alias("status")
    )
    .withColumnRenamed("job_run_id", "run_id")
)

# Keep only tasks for surfaced runs
df_task_durations = df_task_durations.join(
    df_runs.select("workspace_id","job_id","run_id").distinct(),
    ["workspace_id","job_id","run_id"], "inner"
).dropDuplicates(["run_id","task_key"])

# =========================================
# Allocate run costs to tasks by duration share
# =========================================
task_minutes_per_run = (
    df_task_durations
    .groupBy("run_id")
    .agg(F.sum(F.col("task_duration_minutes")).alias("run_task_minutes"))
)

alloc_with_totals = (
    df_task_durations
    .join(task_minutes_per_run, "run_id", "left")
    .withColumn(
        "duration_share",
        F.when(F.col("run_task_minutes") > 0,
               F.col("task_duration_minutes") / F.col("run_task_minutes")).otherwise(F.lit(0.0))
    )
    .join(df_billing, "run_id", "left")
    .withColumn("total_dbus_used", F.coalesce(F.col("total_dbus_used"), F.lit(0.0)))
    .withColumn("total_cost_usd", F.coalesce(F.col("total_cost_usd"), F.lit(0.0)))
    .withColumn("task_dbus_used", F.round(F.col("duration_share") * F.col("total_dbus_used"), 6))
    .withColumn("task_cost_usd", F.round(F.col("duration_share") * F.col("total_cost_usd"), 6))
)

# =========================================
# Final results (RUN / TASK)
# =========================================
df_run_result = (
    df_runs
    .join(df_billing, "run_id", "left")
    .withColumn("total_dbus_used", F.coalesce(F.col("total_dbus_used"), F.lit(0.0)))
    .withColumn("total_cost_usd", F.coalesce(F.col("total_cost_usd"), F.lit(0.0)))
    .select(
        "period_start_time",
        "workspace_id",
        "job_id",
        "run_id",
        "job_name",
        "status",
        F.round("duration_minutes", 2).alias("duration_minutes"),
        "total_dbus_used",
        "total_cost_usd",
    )
)

df_task_result = (
    alloc_with_totals
    .select(
        "workspace_id",
        "job_id",
        "run_id",
        "task_key",
        "status",
        F.round("task_duration_minutes", 2).alias("task_duration_minutes"),
        F.round("duration_share", 6).alias("duration_share"),
        F.round("task_dbus_used", 6).alias("task_dbus_used"),
        F.round("task_cost_usd", 6).alias("task_cost_usd"),
    )
)

# =========================================
# Persist to Delta (serverless-safe) with MERGE
#   - De-dup sources BEFORE MERGE to avoid conflicts
# =========================================
spark.sql(f"CREATE SCHEMA  IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Dedup runs: 1 row/run_id
w_run = Window.partitionBy("run_id").orderBy(F.col("period_start_time").desc())
df_run_result_dedup = (
    df_run_result
    .withColumn("rn", F.row_number().over(w_run))
    .filter(F.col("rn") == 1)
    .drop("rn")
    .withColumn("load_ts", F.current_timestamp())
    .withColumn("load_date", F.to_date("period_start_time"))
    .withColumn("window_hours", F.lit(hours_back))
)

# Dedup tasks: 1 row/(run_id,task_key)
w_task = Window.partitionBy("run_id","task_key").orderBy(F.col("task_duration_minutes").desc())
df_task_result_dedup = (
    df_task_result
    .withColumn("rn", F.row_number().over(w_task))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

# Attach run start date for task partitioning
df_task_result_with_date = (
    df_task_result_dedup.join(
        df_run_result_dedup.select("run_id", "period_start_time"),
        "run_id", "left"
    )
    .withColumn("load_ts", F.current_timestamp())
    .withColumn("load_date", F.to_date("period_start_time"))
    .withColumn("window_hours", F.lit(hours_back))
    .drop("period_start_time")
)

# Save runs
if not table_exists(CATALOG, SCHEMA, "job_run_costs"):
    (df_run_result_dedup
        .write.format("delta").mode("overwrite").partitionBy("load_date")
        .saveAsTable(RUN_TABLE))
else:
    tgt = DeltaTable.forName(spark, RUN_TABLE)
    (tgt.alias("t")
        .merge(df_run_result_dedup.alias("s"), "t.run_id = s.run_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())

# Save tasks (key = run_id + task_key)
if not table_exists(CATALOG, SCHEMA, "job_run_task_costs"):
    (df_task_result_with_date
        .write.format("delta").mode("overwrite").partitionBy("load_date")
        .saveAsTable(TASK_TABLE))
else:
    tgt = DeltaTable.forName(spark, TASK_TABLE)
    (tgt.alias("t")
        .merge(df_task_result_with_date.alias("s"),
               "t.run_id = s.run_id AND t.task_key = s.task_key")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())

print("Saved results.")
# Optional quick peek:
display(spark.table(RUN_TABLE).orderBy(F.col("period_start_time").desc()).limit(20))
display(spark.table(TASK_TABLE).orderBy(F.col("run_id").desc(), F.col("task_key")).limit(50))


In [0]:
from pyspark.sql import functions as F

# ---------- Parameters ----------
dbutils.widgets.text("CATALOG", "")
dbutils.widgets.text("SCHEMA", "")
dbutils.widgets.text("START_DATE", "")
dbutils.widgets.text("END_DATE", "")
dbutils.widgets.dropdown("WRITE_STATS", "false", ["false","true"])

CATALOG     = dbutils.widgets.get("CATALOG").strip()
SCHEMA      = dbutils.widgets.get("SCHEMA").strip()
START_DATE  = dbutils.widgets.get("START_DATE").strip()
END_DATE    = dbutils.widgets.get("END_DATE").strip()
WRITE_STATS = dbutils.widgets.get("WRITE_STATS") == "true"

RUN_TABLE  = f"{CATALOG}.{SCHEMA}.job_run_costs"
TASK_TABLE = f"{CATALOG}.{SCHEMA}.job_run_task_costs"

OUT_TASK   = f"{CATALOG}.{SCHEMA}.task_cost_stats"
OUT_RUN    = f"{CATALOG}.{SCHEMA}.job_run_stats"

# ---------- Helpers ----------
def nz_div(num_col, den_col):
    return F.when(den_col.isNull() | (den_col == 0), F.lit(0.0)).otherwise(num_col / den_col)

def q(col, probs=(0.5, 0.9), accuracy=10000):
    return F.expr(f"percentile_approx({col}, array({','.join(map(str, probs))}), {accuracy})")

# ---------- Load ----------
df_tasks = spark.read.table(TASK_TABLE)
df_runs  = spark.read.table(RUN_TABLE)

# Optional date filters (by load_date)
if START_DATE:
    df_tasks = df_tasks.filter(F.col("load_date") >= F.to_date(F.lit(START_DATE)))
    df_runs  = df_runs.filter(F.col("load_date")  >= F.to_date(F.lit(START_DATE)))
if END_DATE:
    df_tasks = df_tasks.filter(F.col("load_date") <= F.to_date(F.lit(END_DATE)))
    df_runs  = df_runs.filter(F.col("load_date")  <= F.to_date(F.lit(END_DATE)))

# ---------- task_key rollup ----------
task_stats = (
    df_tasks.groupBy("task_key")
    .agg(
        F.count(F.lit(1)).alias("n_runs"),
        F.sum(F.when(F.col("status")=="SUCCEEDED",1).otherwise(0)).alias("n_success"),
        F.sum("task_duration_minutes").alias("total_minutes"),
        F.avg("task_duration_minutes").alias("avg_minutes"),
        F.min("task_duration_minutes").alias("min_minutes"),
        F.max("task_duration_minutes").alias("max_minutes"),
        F.stddev_pop("task_duration_minutes").alias("std_minutes"),
        q("task_duration_minutes").alias("q_minutes"),
        F.sum("task_cost_usd").alias("total_cost_usd"),
        F.avg("task_cost_usd").alias("avg_cost_usd"),
        F.min("task_cost_usd").alias("min_cost_usd"),
        F.max("task_cost_usd").alias("max_cost_usd"),
        F.stddev_pop("task_cost_usd").alias("std_cost_usd"),
        q("task_cost_usd").alias("q_cost_usd"),
        F.sum("task_dbus_used").alias("total_dbus"),
        F.avg("task_dbus_used").alias("avg_dbus"),
        F.stddev_pop("task_dbus_used").alias("std_dbus"),
        q("task_dbus_used").alias("q_dbus"),
        F.avg("duration_share").alias("avg_duration_share"),
        F.max("load_ts").alias("last_load_ts")
    )
    .withColumn("success_rate", nz_div(F.col("n_success").cast("double"), F.col("n_runs").cast("double")))
    .withColumn("median_minutes", F.col("q_minutes")[0]).withColumn("p90_minutes", F.col("q_minutes")[1])
    .withColumn("median_cost_usd", F.col("q_cost_usd")[0]).withColumn("p90_cost_usd", F.col("q_cost_usd")[1])
    .withColumn("median_dbus", F.col("q_dbus")[0]).withColumn("p90_dbus", F.col("q_dbus")[1])
    .drop("q_minutes","q_cost_usd","q_dbus")
    .withColumn("avg_cost_per_min", nz_div(F.col("total_cost_usd"), F.col("total_minutes")))
    .withColumn("avg_dbus_per_min", nz_div(F.col("total_dbus"), F.col("total_minutes")))
    .withColumn("cv_minutes", nz_div(F.col("std_minutes"), F.col("avg_minutes")))
    .withColumn("cv_cost_usd", nz_div(F.col("std_cost_usd"), F.col("avg_cost_usd")))
)

# Optional rounding for readability (keep raw if you prefer)
task_stats = (
    task_stats
    .select(
        "task_key","n_runs","n_success",
        F.round("total_minutes",6).alias("total_minutes"),
        F.round("avg_minutes",6).alias("avg_minutes"),
        F.round("min_minutes",6).alias("min_minutes"),
        F.round("max_minutes",6).alias("max_minutes"),
        F.round("std_minutes",6).alias("std_minutes"),
        F.round("median_minutes",6).alias("median_minutes"),
        F.round("p90_minutes",6).alias("p90_minutes"),
        F.round("total_cost_usd",6).alias("total_cost_usd"),
        F.round("avg_cost_usd",6).alias("avg_cost_usd"),
        F.round("min_cost_usd",6).alias("min_cost_usd"),
        F.round("max_cost_usd",6).alias("max_cost_usd"),
        F.round("std_cost_usd",6).alias("std_cost_usd"),
        F.round("median_cost_usd",6).alias("median_cost_usd"),
        F.round("p90_cost_usd",6).alias("p90_cost_usd"),
        F.round("total_dbus",6).alias("total_dbus"),
        F.round("avg_dbus",6).alias("avg_dbus"),
        F.round("std_dbus",6).alias("std_dbus"),
        F.round("median_dbus",6).alias("median_dbus"),
        F.round("p90_dbus",6).alias("p90_dbus"),
        F.round("avg_duration_share",6).alias("avg_duration_share"),
        "last_load_ts",
        F.round("success_rate",6).alias("success_rate"),
        F.round("avg_cost_per_min",12).alias("avg_cost_per_min"),
        F.round("avg_dbus_per_min",12).alias("avg_dbus_per_min"),
        F.round("cv_minutes",12).alias("cv_minutes"),
        F.round("cv_cost_usd",12).alias("cv_cost_usd"),
    )
)

# ---------- run-level job stats ----------
run_aggs = (
    df_runs.groupBy("workspace_id","job_id","job_name")
    .agg(
        F.count(F.lit(1)).alias("n_runs"),
        F.sum(F.when(F.col("status")=="SUCCEEDED",1).otherwise(0)).alias("n_success"),
        F.sum("duration_minutes").alias("total_run_minutes"),
        F.avg("duration_minutes").alias("avg_run_minutes"),
        F.expr("percentile_approx(duration_minutes, array(0.5, 0.9), 10000)").alias("q_run_minutes"),
        F.sum("total_cost_usd").alias("total_run_cost_usd"),
        F.avg("total_cost_usd").alias("avg_run_cost_usd"),
        F.expr("percentile_approx(total_cost_usd, array(0.5, 0.9), 10000)").alias("q_run_cost_usd"),
        F.sum("total_dbus_used").alias("total_run_dbus"),
        F.avg("total_dbus_used").alias("avg_run_dbus"),
        F.max("load_ts").alias("last_load_ts")
    )
    .withColumn("success_rate", nz_div(F.col("n_success").cast("double"), F.col("n_runs").cast("double")))
    .withColumn("median_run_minutes", F.col("q_run_minutes")[0]).withColumn("p90_run_minutes", F.col("q_run_minutes")[1])
    .withColumn("median_run_cost_usd", F.col("q_run_cost_usd")[0]).withColumn("p90_run_cost_usd", F.col("q_run_cost_usd")[1])
    .drop("q_run_minutes","q_run_cost_usd")
    .withColumn("avg_run_cost_per_min", nz_div(F.col("total_run_cost_usd"), F.col("total_run_minutes")))
    .withColumn("avg_run_dbus_per_min", nz_div(F.col("total_run_dbus"), F.col("total_run_minutes")))
)

# Optional rounding
run_aggs = (
    run_aggs
    .select(
        "workspace_id","job_id","job_name","n_runs","n_success",
        F.round("total_run_minutes",6).alias("total_run_minutes"),
        F.round("avg_run_minutes",6).alias("avg_run_minutes"),
        F.round("median_run_minutes",6).alias("median_run_minutes"),
        F.round("p90_run_minutes",6).alias("p90_run_minutes"),
        F.round("total_run_cost_usd",6).alias("total_run_cost_usd"),
        F.round("avg_run_cost_usd",6).alias("avg_run_cost_usd"),
        F.round("median_run_cost_usd",6).alias("median_run_cost_usd"),
        F.round("p90_run_cost_usd",6).alias("p90_run_cost_usd"),
        F.round("total_run_dbus",6).alias("total_run_dbus"),
        F.round("avg_run_dbus",6).alias("avg_run_dbus"),
        "last_load_ts",
        F.round("success_rate",6).alias("success_rate"),
        F.round("avg_run_cost_per_min",12).alias("avg_run_cost_per_min"),
        F.round("avg_run_dbus_per_min",12).alias("avg_run_dbus_per_min"),
    )
)

# ---------- (optional) Persist ----------
# if WRITE_STATS:
#     spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
#     task_stats.write.mode("overwrite").format("delta").saveAsTable(OUT_TASK)
#     run_aggs.write.mode("overwrite").format("delta").saveAsTable(OUT_RUN)

# ---------- Displays ----------
display(task_stats.orderBy(F.col("total_cost_usd").desc()).limit(50))
display(run_aggs.orderBy(F.col("total_run_cost_usd").desc()).limit(50))
