### Jobs Monitoring
#### Based on Databricks documentation for tracking jobs spend

In [0]:

-- Most expensive jobs in last 30 days
CREATE OR REPLACE MATERIALIZED VIEW most_expensive_jobs AS
with list_cost_per_job as (
  SELECT
    usage_data.workspace_id,
    usage_data.usage_metadata.job_id,
    COUNT(DISTINCT usage_data.usage_metadata.job_run_id) as runs,
    SUM(usage_data.usage_quantity * pricing_data.pricing.default) as list_cost,
    SUM(usage_data.usage_quantity * pricing_data.pricing.effective_list.default) as effective_cost,
    first(identity_metadata.run_as, true) as run_as,
    first(usage_data.custom_tags, true) as custom_tags,
    MAX(usage_data.usage_end_time) as last_seen_date
  FROM system.billing.usage usage_data
  INNER JOIN system.billing.list_prices pricing_data on
    usage_data.cloud = pricing_data.cloud and
    usage_data.sku_name = pricing_data.sku_name and
    usage_data.usage_start_time >= pricing_data.price_start_time and
    (usage_data.usage_end_time <= pricing_data.price_end_time or pricing_data.price_end_time is null)
  WHERE
    usage_data.billing_origin_product = "JOBS"
    AND usage_data.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
  GROUP BY ALL
),
most_recent_jobs as (
  SELECT
    *,
    ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) as rn
  FROM
    system.lakeflow.jobs QUALIFY rn=1
)
SELECT
    job_metadata.name,
    spending_by_job.job_id,
    spending_by_job.workspace_id,
    spending_by_job.runs,
    spending_by_job.run_as,
    SUM(list_cost) as list_cost,
    SUM(effective_cost) as effective_cost,
    spending_by_job.custom_tags,
    spending_by_job.last_seen_date
FROM list_cost_per_job spending_by_job
  LEFT JOIN most_recent_jobs job_metadata USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC;

In [0]:

-- Most expensive job runs in last 30 days
CREATE OR REPLACE MATERIALIZED VIEW most_expensive_job_runs AS
with list_cost_per_job_run as (
  SELECT
    usage_data.workspace_id,
    usage_data.usage_metadata.job_id,
    usage_data.usage_metadata.job_run_id as run_id,
    SUM(usage_data.usage_quantity * pricing_data.pricing.default) as list_cost,
    SUM(usage_data.usage_quantity * pricing_data.pricing.effective_list.default) as effective_cost,
    first(identity_metadata.run_as, true) as run_as,
    first(usage_data.custom_tags, true) as custom_tags,
    MAX(usage_data.usage_end_time) as last_seen_date
  FROM system.billing.usage usage_data
  INNER JOIN system.billing.list_prices pricing_data on
    usage_data.cloud = pricing_data.cloud and
    usage_data.sku_name = pricing_data.sku_name and
    usage_data.usage_start_time >= pricing_data.price_start_time and
    (usage_data.usage_end_time <= pricing_data.price_end_time or pricing_data.price_end_time is null)
  WHERE
    usage_data.billing_origin_product = 'JOBS'
    AND usage_data.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
  GROUP BY ALL
  ),
  most_recent_jobs as (
    SELECT
      *,
      ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) as rn
    FROM
      system.lakeflow.jobs QUALIFY rn=1
  )
SELECT
    spending_by_run.workspace_id,
    job_metadata.name,
    spending_by_run.job_id,
    spending_by_run.run_id,
    spending_by_run.run_as,
    SUM(list_cost) as list_cost,
    SUM(effective_cost) as effective_cost,
    spending_by_run.custom_tags,
    spending_by_run.last_seen_date
FROM list_cost_per_job_run spending_by_run
  LEFT JOIN most_recent_jobs job_metadata USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC;

In [0]:

-- Job spend trends over the past 14 days
CREATE OR REPLACE MATERIALIZED VIEW job_spend_trend AS
with job_run_timeline_with_cost as (
  SELECT
    usage_data.*,
    usage_data.usage_metadata.job_id as job_id,
    usage_data.identity_metadata.run_as as run_as,
    usage_data.usage_quantity * pricing_data.pricing.default AS list_cost,
    usage_data.usage_quantity * pricing_data.pricing.effective_list.default AS effective_cost,
    usage_data.custom_tags
  FROM system.billing.usage usage_data
    INNER JOIN system.billing.list_prices pricing_data
      ON
        usage_data.cloud = pricing_data.cloud AND
        usage_data.sku_name = pricing_data.sku_name AND
        usage_data.usage_start_time >= pricing_data.price_start_time AND
        (usage_data.usage_end_time <= pricing_data.price_end_time or pricing_data.price_end_time is NULL)
  WHERE
    usage_data.billing_origin_product = 'JOBS' AND
    usage_data.usage_date >= CURRENT_DATE() - INTERVAL 14 DAY
),
most_recent_jobs as (
  SELECT
    *,
    ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) as rn
  FROM
    system.lakeflow.jobs QUALIFY rn=1
),
job_spending_aggregated as (
  SELECT
    workspace_id,
    job_id,
    run_as,
    sku_name,
    SUM(list_cost) AS spend,
    SUM(CASE WHEN usage_end_time BETWEEN date_add(current_date(), -8) AND date_add(current_date(), -1) THEN effective_cost ELSE 0 END) AS Last7DaySpend,
    SUM(CASE WHEN usage_end_time BETWEEN date_add(current_date(), -15) AND date_add(current_date(), -8) THEN effective_cost ELSE 0 END) AS Last14DaySpend,
    custom_tags
  FROM job_run_timeline_with_cost
  GROUP BY ALL
)
SELECT
  job_metadata.name,
  spending_by_job.workspace_id,
  spending_by_job.job_id,
  spending_by_job.sku_name,
  spending_by_job.run_as,
  spending_by_job.custom_tags,
  spending_by_job.Last7DaySpend,
  spending_by_job.Last14DaySpend,
  spending_by_job.Last7DaySpend - spending_by_job.Last14DaySpend as Last7DayGrowth,
  try_divide((spending_by_job.Last7DaySpend - spending_by_job.Last14DaySpend), spending_by_job.Last14DaySpend) * 100 AS Last7DayGrowthPct
FROM job_spending_aggregated spending_by_job
  LEFT JOIN most_recent_jobs job_metadata USING (workspace_id, job_id)
ORDER BY
  Last7DayGrowth DESC;

In [0]:

-- Analysis of failed jobs and their cost in last 30 days
CREATE OR REPLACE MATERIALIZED VIEW failed_jobs_analysis AS
with job_run_timeline_with_cost as (
  SELECT
    usage_data.*,
    usage_data.identity_metadata.run_as as run_as,
    timeline.job_id,
    timeline.run_id,
    timeline.result_state,
    usage_data.usage_quantity * pricing_data.pricing.default as list_cost,
    usage_data.usage_quantity * pricing_data.pricing.effective_list.default as effective_cost,
    usage_data.custom_tags
  FROM system.billing.usage usage_data
    INNER JOIN system.lakeflow.job_run_timeline timeline
      ON
        usage_data.workspace_id = timeline.workspace_id
        AND usage_data.usage_metadata.job_id = timeline.job_id
        AND usage_data.usage_metadata.job_run_id = timeline.run_id
        AND usage_data.usage_start_time >= date_trunc("Hour", timeline.period_start_time)
        AND usage_data.usage_start_time < date_trunc("Hour", timeline.period_end_time) + INTERVAL 1 HOUR
    INNER JOIN system.billing.list_prices pricing_data on
      usage_data.cloud = pricing_data.cloud and
      usage_data.sku_name = pricing_data.sku_name and
      usage_data.usage_start_time >= pricing_data.price_start_time and
      (usage_data.usage_end_time <= pricing_data.price_end_time or pricing_data.price_end_time is null)
  WHERE
    usage_data.billing_origin_product = 'JOBS' AND
    usage_data.usage_date >= CURRENT_DATE() - INTERVAL 30 DAYS
),
cumulative_run_status_cost as (
  SELECT
    workspace_id,
    job_id,
    run_id,
    run_as,
    result_state,
    usage_end_time,
    custom_tags,
    SUM(list_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_list_cost,
    SUM(effective_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_effective_cost
  FROM job_run_timeline_with_cost
  ORDER BY workspace_id, job_id, run_id, usage_end_time
),
cost_per_status as (
  SELECT
      workspace_id,
      job_id,
      run_id,
      run_as,
      result_state,
      usage_end_time,
      custom_tags,
      cumulative_list_cost - COALESCE(LAG(cumulative_list_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time), 0) AS result_state_list_cost,
      cumulative_effective_cost - COALESCE(LAG(cumulative_effective_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time), 0) AS result_state_effective_cost
  FROM cumulative_run_status_cost
  WHERE result_state IS NOT NULL
  ORDER BY workspace_id, job_id, run_id, usage_end_time
),
cost_per_status_agg as (
  SELECT
    workspace_id,
    job_id,
    FIRST(run_as, TRUE) as run_as,
    FIRST(custom_tags, TRUE) as custom_tags,
    SUM(result_state_list_cost) as list_cost,
    SUM(result_state_effective_cost) as effective_cost
  FROM cost_per_status
  WHERE
    result_state IN ('ERROR', 'FAILED', 'TIMED_OUT')
  GROUP BY ALL
),
terminal_statuses as (
  SELECT
    workspace_id,
    job_id,
    CASE WHEN result_state IN ('ERROR', 'FAILED', 'TIMED_OUT') THEN 1 ELSE 0 END as is_failure,
    period_end_time as last_seen_date
  FROM system.lakeflow.job_run_timeline
  WHERE
    result_state IS NOT NULL AND
    period_end_time >= CURRENT_DATE() - INTERVAL 30 DAYS
),
most_recent_jobs as (
  SELECT
    *,
    ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) as rn
  FROM
    system.lakeflow.jobs QUALIFY rn=1
)
SELECT
  first(job_metadata.name) as name,
  run_status.workspace_id,
  run_status.job_id,
  COUNT(*) as runs,
  failure_costs.run_as,
  failure_costs.custom_tags,
  SUM(is_failure) as failures,
  (1 - COALESCE(try_divide(SUM(is_failure), COUNT(*)), 0)) * 100 as success_ratio,
  first(failure_costs.list_cost) as failure_list_cost,
  first(failure_costs.effective_cost) as failure_effective_cost,
  MAX(run_status.last_seen_date) as last_seen_date
FROM terminal_statuses run_status
  LEFT JOIN most_recent_jobs job_metadata USING (workspace_id, job_id)
  LEFT JOIN cost_per_status_agg failure_costs USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY failures DESC;

In [0]:

-- Job retry patterns and cost in the past 30 days
CREATE OR REPLACE MATERIALIZED VIEW job_retry_patterns AS
with job_run_timeline_with_cost as (
  SELECT
    usage_data.*,
    timeline.job_id,
    timeline.run_id,
    usage_data.identity_metadata.run_as as run_as,
    timeline.result_state,
    usage_data.usage_quantity * pricing_data.pricing.default as list_cost,
    usage_data.usage_quantity * pricing_data.pricing.effective_list.default as effective_cost,
    usage_data.custom_tags
  FROM system.billing.usage usage_data
    INNER JOIN system.lakeflow.job_run_timeline timeline
      ON
        usage_data.workspace_id = timeline.workspace_id
        AND usage_data.usage_metadata.job_id = timeline.job_id
        AND usage_data.usage_metadata.job_run_id = timeline.run_id
        AND usage_data.usage_start_time >= date_trunc("Hour", timeline.period_start_time)
        AND usage_data.usage_start_time < date_trunc("Hour", timeline.period_end_time) + INTERVAL 1 HOUR
    INNER JOIN system.billing.list_prices pricing_data on
      usage_data.cloud = pricing_data.cloud and
      usage_data.sku_name = pricing_data.sku_name and
      usage_data.usage_start_time >= pricing_data.price_start_time and
      (usage_data.usage_end_time <= pricing_data.price_end_time or pricing_data.price_end_time is null)
  WHERE
    usage_data.billing_origin_product = 'JOBS' AND
    usage_data.usage_date >= CURRENT_DATE() - INTERVAL 30 DAYS
),
cumulative_run_status_cost as (
  SELECT
    workspace_id,
    job_id,
    run_id,
    run_as,
    custom_tags,
    result_state,
    usage_end_time,
    SUM(list_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_list_cost,
    SUM(effective_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_effective_cost
  FROM job_run_timeline_with_cost
  ORDER BY workspace_id, job_id, run_id, usage_end_time
),
cost_per_status as (
  SELECT
      workspace_id,
      job_id,
      run_id,
      run_as,
      custom_tags,
      result_state,
      usage_end_time,
      cumulative_list_cost - COALESCE(LAG(cumulative_list_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time), 0) AS result_state_list_cost,
      cumulative_effective_cost - COALESCE(LAG(cumulative_effective_cost) OVER (ORDER BY workspace_id, job_id, run_id, usage_end_time), 0) AS result_state_effective_cost
  FROM cumulative_run_status_cost
  WHERE result_state IS NOT NULL
  ORDER BY workspace_id, job_id, run_id, usage_end_time
),
cost_per_unsuccessful_status_agg as (
  SELECT
    workspace_id,
    job_id,
    run_id,
    result_state,
    first(custom_tags, TRUE) as custom_tags,
    first(run_as, TRUE) as run_as,
    SUM(result_state_list_cost) as list_cost,
    SUM(result_state_effective_cost) as effective_cost
  FROM cost_per_status
  WHERE
    result_state != "SUCCEEDED"
  GROUP BY ALL
),
repaired_runs as (
  SELECT
    workspace_id, job_id, run_id, COUNT(*) as cnt
  FROM system.lakeflow.job_run_timeline
  WHERE result_state IS NOT NULL
  GROUP BY ALL
  HAVING cnt > 1
),
successful_repairs as (
  SELECT 
    timeline.workspace_id, 
    timeline.job_id, 
    timeline.run_id, 
    MAX(timeline.period_end_time) as period_end_time
  FROM system.lakeflow.job_run_timeline timeline
  JOIN repaired_runs repair_candidates
  ON timeline.workspace_id = repair_candidates.workspace_id 
     AND timeline.job_id = repair_candidates.job_id 
     AND timeline.run_id = repair_candidates.run_id
  WHERE timeline.result_state = "SUCCEEDED"
  GROUP BY ALL
),
combined_repairs as (
  SELECT
    repair_candidates.*,
    successful_completion.period_end_time,
    repair_candidates.cnt as repairs
  FROM repaired_runs repair_candidates
    LEFT JOIN successful_repairs successful_completion USING (workspace_id, job_id, run_id)
),
most_recent_jobs as (
  SELECT
    *,
    ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) as rn
  FROM
    system.lakeflow.jobs QUALIFY rn=1
)
SELECT
  last(job_metadata.name) as name,
  repair_summary.workspace_id,
  repair_summary.job_id,
  repair_summary.run_id,
  first(failure_costs.run_as, TRUE) as run_as,
  first(failure_costs.custom_tags, TRUE) as custom_tags,
  first(repair_summary.repairs) - 1 as repairs,
  first(failure_costs.list_cost) as repair_list_cost,
  first(failure_costs.effective_cost) as repair_effective_cost,
  CASE WHEN repair_summary.period_end_time IS NOT NULL 
       THEN CAST(repair_summary.period_end_time - MIN(timeline_details.period_end_time) as LONG) 
       ELSE NULL 
  END AS repair_time_seconds
FROM combined_repairs repair_summary
  JOIN system.lakeflow.job_run_timeline timeline_details USING (workspace_id, job_id, run_id)
  LEFT JOIN most_recent_jobs job_metadata USING (workspace_id, job_id)
  LEFT JOIN cost_per_unsuccessful_status_agg failure_costs USING (workspace_id, job_id, run_id)
WHERE
  timeline_details.result_state IS NOT NULL
GROUP BY repair_summary.workspace_id, repair_summary.job_id, repair_summary.run_id, repair_summary.period_end_time
ORDER BY repairs DESC;