# All-Purpose Cluster Cost Analysis & Optimization

## Overview
This notebook analyzes all-purpose cluster costs and identifies optimization opportunities across users, clusters, and instance types.

## What This Notebook Does

### Data Collection & Analysis
* Extracts all-purpose cluster usage from `system.billing.usage`
* Enriches with telemetry data from `system.compute.node_timeline`
* Aggregates costs at user, cluster, and instance levels
* Calculates CPU and memory efficiency metrics

### Optimization Recommendations
* Identifies under-utilized clusters with specific instance type recommendations
* Provides actionable recommendations with estimated savings
* Includes driver and worker instance type details
* Shows autoscaling configuration (min/max workers)

## Output Tables (ex_dash_temp.billing_forecast)

**Base Tables:**
1. `all_purpose_base` - Raw usage data with cluster metadata
2. `user_daily_telemetry` - Daily user-level metrics
3. `cluster_daily_telemetry` - Daily cluster-level metrics
4. `instance_daily_telemetry` - Daily instance-level metrics

**Aggregated Tables:**
5. `user_total_cost` - One row per user with total costs
6. `cluster_total_cost` - One row per cluster with total costs
7. `instance_total_cost` - One row per instance type with total costs

**Opportunity Tables:**
8. `user_opportunities` - User-level optimization recommendations
9. `cluster_opportunities` - Cluster-level optimization recommendations
10. `instance_opportunities` - Instance-level optimization recommendations

## How to Use
1. Set the `start_date` widget to your desired analysis period (default: last 30 days)
2. Run all cells sequentially
3. Review the opportunity tables for cost optimization recommendations
4. Focus on CRITICAL and HIGH priority items for maximum impact

## Key Features
* Specific instance type recommendations (e.g., "Change r7g.12xlarge ‚Üí r7g.8xlarge")
* Worker configuration details (autoscale vs fixed)
* Validated savings (capped at total cost)
* One row per entity (no duplicates)

In [0]:
# Setup: Days Widget and Schema Creation
from datetime import datetime, timedelta
from pyspark.sql.functions import col, sum as spark_sum, avg, count, countDistinct, round as spark_round, coalesce, lit, when

displayHTML("<h2>ALL-PURPOSE CLUSTER COST ANALYSIS - SETUP</h2>")

# Create days_back widget with default to 30 days
dbutils.widgets.text("days_back", "30", "Days to Go Back")

# Get widget value and calculate start_date
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"""
<p>üìÖ <b>Analysis Period:</b> {start_date} to current date ({days_back} days)</p>
<p>‚öôÔ∏è <b>Creating target schema:</b> ex_dash_temp.billing_forecast</p>
""")

# Create schema if not exists
spark.sql("CREATE SCHEMA IF NOT EXISTS ex_dash_temp.billing_forecast")

displayHTML(f"""
<p>‚úÖ <b>Setup Complete!</b></p>
<ul>
<li>Days back: {days_back}</li>
<li>Calculated start date: {start_date}</li>
<li>Target schema ready: ex_dash_temp.billing_forecast</li>
</ul>
<p>üí° You can change the days_back widget to analyze different periods</p>
""")

In [0]:
# Step 1: Create All-Purpose Base Table
# Base table with all-purpose cluster usage and cost data

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>STEP 1: CREATE ALL-PURPOSE BASE TABLE</h2><p>üìÖ Date Range: {start_date} to current ({days_back} days)</p>")

# Check raw cost from billing data
raw_cost_check = spark.sql(f"""
SELECT ROUND(SUM(usage_quantity * 0.65), 2) as raw_billing_cost
FROM system.billing.usage
WHERE usage_date >= '{start_date}'
  AND sku_name LIKE '%ALL_PURPOSE%'
  AND usage_unit = 'DBU'
  AND usage_metadata.cluster_id IS NOT NULL
  AND COALESCE(product_features.is_serverless, false) = false
""")
raw_cost = raw_cost_check.collect()[0]['raw_billing_cost']

# Create base table with cluster metadata
base_table_df = spark.sql(f"""
WITH cluster_metadata AS (
  SELECT cluster_id, 
    FIRST(cluster_name) as cluster_name,
    FIRST(owned_by) as owned_by,
    MAX(auto_termination_minutes) as auto_termination_minutes
  FROM system.compute.clusters
  GROUP BY cluster_id
),
cluster_daily_usage AS (
  SELECT 
    u.usage_date,
    u.workspace_id,
    u.usage_metadata.cluster_id as cluster_id,
    SUM(u.usage_quantity) as dbus,
    SUM(u.usage_quantity * 0.65) as total_cost_usd,
    FIRST(u.usage_metadata.node_type) as node_type,
    MAX(COALESCE(u.product_features.is_photon, false)) as is_photon
  FROM system.billing.usage u
  WHERE u.usage_date >= '{start_date}'
    AND u.sku_name LIKE '%ALL_PURPOSE%'
    AND u.usage_unit = 'DBU'
    AND u.usage_metadata.cluster_id IS NOT NULL
    AND COALESCE(u.product_features.is_serverless, false) = false
  GROUP BY u.usage_date, u.workspace_id, u.usage_metadata.cluster_id
)
SELECT 
  c.usage_date,
  c.workspace_id,
  COALESCE(w.workspace_name, 'Unknown') as workspace_name,
  c.cluster_id,
  COALESCE(cm.cluster_name, 'Unknown') as cluster_name,
  cm.owned_by as cluster_owner,
  c.node_type,
  cm.owned_by as principal_email,
  CASE WHEN cm.owned_by LIKE '%@%' THEN 'user' ELSE 'service_principal' END as principal_type,
  c.dbus,
  c.total_cost_usd,
  0.65 as list_price_per_dbu,
  c.is_photon,
  cm.auto_termination_minutes,
  nt.core_count,
  nt.memory_mb,
  CURRENT_TIMESTAMP() as created_at
FROM cluster_daily_usage c
LEFT JOIN prod_sandbox.group_details.workspaces w ON c.workspace_id = w.workspace_id
LEFT JOIN cluster_metadata cm ON c.cluster_id = cm.cluster_id
LEFT JOIN system.compute.node_types nt ON c.node_type = nt.node_type
""")

base_table_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("ex_dash_temp.billing_forecast.all_purpose_base")

displayHTML("‚úÖ Base table created: ex_dash_temp.billing_forecast.all_purpose_base")

# Validate table against raw billing cost
validation = spark.sql(f"""
SELECT 
  COUNT(*) as records,
  COUNT(DISTINCT cluster_id) as clusters,
  COUNT(DISTINCT principal_email) as users,
  ROUND(SUM(total_cost_usd), 2) as total_cost_usd,
  ROUND(SUM(dbus), 2) as total_dbus
FROM ex_dash_temp.billing_forecast.all_purpose_base
""")

table_cost = validation.collect()[0]['total_cost_usd']
user_count = validation.collect()[0]['users']
variance = abs(raw_cost - table_cost)
variance_pct = (variance / raw_cost * 100) if raw_cost > 0 else 0

displayHTML("<h3>üìä SUMMARY & VALIDATION:</h3>")
display(validation)

if variance_pct < 1:
    displayHTML(f"<p>‚úÖ <b>Validation Passed:</b> Table cost (${table_cost:,.2f}) matches raw billing cost (${raw_cost:,.2f})<br>Variance: ${variance:,.2f} ({variance_pct:.2f}%)</p>")
else:
    displayHTML(f"<p>‚ö†Ô∏è <b>Validation Warning:</b> Table cost (${table_cost:,.2f}) differs from raw billing cost (${raw_cost:,.2f})<br>Variance: ${variance:,.2f} ({variance_pct:.2f}%)</p>")

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT * FROM ex_dash_temp.billing_forecast.all_purpose_base ORDER BY total_cost_usd DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 2: Create Per User Daily Telemetry Table
# Includes actual CPU, Memory, and Network metrics from system.compute.node_timeline

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"""
<h2>STEP 2: CREATE PER USER DAILY TELEMETRY TABLE</h2>
<p>üë§ Creating user-level daily analysis with telemetry</p>
<ul>
<li>CPU utilization (user + system)</li>
<li>Memory utilization</li>
<li>Network I/O (sent + received)</li>
<li>Daily cost per user</li>
</ul>
""")

# Create per user daily telemetry table
user_daily_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.user_daily_telemetry
USING DELTA
AS
WITH telemetry_aggregated AS (
  SELECT 
    b.principal_email,
    b.usage_date,
    b.workspace_name,
    
    -- Telemetry from node_timeline
    ROUND(AVG(nt.cpu_user_percent + nt.cpu_system_percent), 2) as avg_cpu_pct,
    ROUND(MAX(nt.cpu_user_percent + nt.cpu_system_percent), 2) as max_cpu_pct,
    ROUND(AVG(nt.mem_used_percent), 2) as avg_mem_pct,
    ROUND(MAX(nt.mem_used_percent), 2) as max_mem_pct,
    ROUND(SUM(nt.network_sent_bytes + nt.network_received_bytes) / 1024 / 1024 / 1024, 2) as total_network_gb,
    ROUND(AVG((nt.network_sent_bytes + nt.network_received_bytes) / 1024 / 1024), 2) as avg_network_mb,
    COUNT(DISTINCT nt.cluster_id) as clusters_with_telemetry
    
  FROM ex_dash_temp.billing_forecast.all_purpose_base b
  INNER JOIN system.compute.node_timeline nt 
    ON b.cluster_id = nt.cluster_id 
    AND DATE(nt.start_time) = b.usage_date
  WHERE b.usage_date >= '{start_date}'
  GROUP BY b.principal_email, b.usage_date, b.workspace_name
)

SELECT 
  b.usage_date,
  b.workspace_id,
  b.workspace_name,
  b.principal_email,
  b.principal_type,
  
  -- Cost metrics
  SUM(b.dbus) as total_dbus,
  SUM(b.total_cost_usd) as total_cost_usd,
  AVG(b.list_price_per_dbu) as avg_price_per_dbu,
  COUNT(DISTINCT b.cluster_id) as clusters_used,
  COUNT(DISTINCT b.node_type) as instance_types_used,
  
  -- Configuration metrics
  AVG(CASE WHEN b.is_photon THEN 1.0 ELSE 0.0 END) as photon_usage_rate,
  AVG(b.auto_termination_minutes) as avg_autoterm_minutes,
  AVG(b.core_count) as avg_cores,
  AVG(b.memory_mb) as avg_memory_mb,
  
  -- Telemetry metrics
  t.avg_cpu_pct,
  t.max_cpu_pct,
  t.avg_mem_pct,
  t.max_mem_pct,
  t.total_network_gb,
  t.avg_network_mb,
  COALESCE(t.clusters_with_telemetry, 0) as clusters_with_telemetry,
  
  CURRENT_TIMESTAMP() as created_at
  
FROM ex_dash_temp.billing_forecast.all_purpose_base b
LEFT JOIN telemetry_aggregated t 
  ON b.principal_email = t.principal_email 
  AND b.usage_date = t.usage_date
  AND b.workspace_name = t.workspace_name
WHERE b.usage_date >= '{start_date}'
GROUP BY 
  b.usage_date, b.workspace_id, b.workspace_name, b.principal_email, b.principal_type,
  t.avg_cpu_pct, t.max_cpu_pct, t.avg_mem_pct, t.max_mem_pct, 
  t.total_network_gb, t.avg_network_mb, t.clusters_with_telemetry
ORDER BY b.usage_date DESC, total_cost_usd DESC
"""

spark.sql(user_daily_query)

displayHTML("‚úÖ User daily telemetry table created: ex_dash_temp.billing_forecast.user_daily_telemetry")

# Show summary
summary = spark.sql(f"""
SELECT 
  COUNT(*) as total_user_days,
  COUNT(DISTINCT principal_email) as unique_users,
  COUNT(DISTINCT usage_date) as days,
  ROUND(SUM(total_cost_usd), 2) as total_cost_usd,
  COUNT(CASE WHEN avg_cpu_pct IS NOT NULL THEN 1 END) as days_with_telemetry,
  ROUND(AVG(CASE WHEN avg_cpu_pct IS NOT NULL THEN avg_cpu_pct END), 2) as avg_cpu_utilization,
  ROUND(AVG(CASE WHEN avg_mem_pct IS NOT NULL THEN avg_mem_pct END), 2) as avg_memory_utilization
FROM ex_dash_temp.billing_forecast.user_daily_telemetry
WHERE usage_date >= '{start_date}'
""")

displayHTML("<h3>üìä SUMMARY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT * FROM ex_dash_temp.billing_forecast.user_daily_telemetry ORDER BY total_cost_usd DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 3: Create Per Cluster Daily Telemetry Table
# Cluster-level daily analysis with telemetry metrics

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>STEP 3: CREATE PER CLUSTER DAILY TELEMETRY TABLE</h2><p>üíª Creating cluster-level daily analysis</p>")

# Create per cluster daily telemetry
cluster_daily_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.cluster_daily_telemetry
USING DELTA
AS
WITH telemetry_by_cluster AS (
  SELECT 
    cluster_id,
    DATE(start_time) as telemetry_date,
    ROUND(AVG(cpu_user_percent + cpu_system_percent), 2) as avg_cpu_pct,
    ROUND(MAX(cpu_user_percent + cpu_system_percent), 2) as max_cpu_pct,
    ROUND(AVG(mem_used_percent), 2) as avg_mem_pct,
    ROUND(MAX(mem_used_percent), 2) as max_mem_pct,
    ROUND(SUM(network_sent_bytes + network_received_bytes) / 1024 / 1024 / 1024, 2) as total_network_gb,
    ROUND(AVG((network_sent_bytes + network_received_bytes) / 1024 / 1024), 2) as avg_network_mb
  FROM system.compute.node_timeline
  WHERE DATE(start_time) >= '{start_date}'
  GROUP BY cluster_id, DATE(start_time)
)
SELECT 
  b.usage_date,
  b.workspace_id,
  b.workspace_name,
  b.cluster_id,
  b.cluster_name,
  b.cluster_owner,
  b.node_type as instance_type,
  b.principal_type,
  b.dbus as total_dbus,
  b.total_cost_usd,
  b.list_price_per_dbu as avg_price_per_dbu,
  b.is_photon as photon_enabled,
  b.auto_termination_minutes as autoterm_minutes,
  b.core_count,
  b.memory_mb,
  t.avg_cpu_pct,
  t.max_cpu_pct,
  t.avg_mem_pct,
  t.max_mem_pct,
  t.total_network_gb,
  t.avg_network_mb,
  CURRENT_TIMESTAMP() as created_at
FROM ex_dash_temp.billing_forecast.all_purpose_base b
LEFT JOIN telemetry_by_cluster t 
  ON b.cluster_id = t.cluster_id 
  AND b.usage_date = t.telemetry_date
WHERE b.usage_date >= '{start_date}'
"""

spark.sql(cluster_daily_query)

displayHTML("‚úÖ Cluster daily telemetry table created: ex_dash_temp.billing_forecast.cluster_daily_telemetry")

# Summary
summary = spark.sql(f"""
SELECT 
  COUNT(*) as records,
  COUNT(DISTINCT cluster_id) as clusters,
  ROUND(SUM(total_cost_usd), 2) as total_cost,
  ROUND(AVG(avg_cpu_pct), 2) as avg_cpu,
  ROUND(AVG(avg_mem_pct), 2) as avg_mem
FROM ex_dash_temp.billing_forecast.cluster_daily_telemetry
""")

displayHTML("<h3>üìä SUMMARY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT * FROM ex_dash_temp.billing_forecast.cluster_daily_telemetry ORDER BY total_cost_usd DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 4: Create Per Instance Daily Telemetry Table
# Instance-level daily analysis with telemetry metrics

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(
    f"<h2>STEP 4: CREATE PER INSTANCE DAILY TELEMETRY TABLE</h2>"
    f"<p>üñ•Ô∏è Creating instance-level daily analysis</p>"
)

# Create per instance daily telemetry
instance_daily_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.instance_daily_telemetry
USING DELTA
AS
WITH telemetry_by_instance AS (
  SELECT 
    b.node_type,
    b.usage_date,
    ROUND(AVG(nt.cpu_user_percent + nt.cpu_system_percent), 2) as avg_cpu_pct,
    ROUND(MAX(nt.cpu_user_percent + nt.cpu_system_percent), 2) as max_cpu_pct,
    ROUND(AVG(nt.mem_used_percent), 2) as avg_mem_pct,
    ROUND(MAX(nt.mem_used_percent), 2) as max_mem_pct,
    ROUND(
      SUM(nt.network_sent_bytes + nt.network_received_bytes) / 1024 / 1024 / 1024, 
      2
    ) as total_network_gb,
    ROUND(
      AVG((nt.network_sent_bytes + nt.network_received_bytes) / 1024 / 1024), 
      2
    ) as avg_network_mb
  FROM ex_dash_temp.billing_forecast.all_purpose_base b
  INNER JOIN system.compute.node_timeline nt 
    ON b.cluster_id = nt.cluster_id 
    AND DATE(nt.start_time) = b.usage_date
  WHERE b.usage_date >= '{start_date}'
  GROUP BY b.node_type, b.usage_date
)
SELECT 
  b.usage_date,
  b.node_type as instance_type,
  SUM(b.dbus) as total_dbus,
  SUM(b.total_cost_usd) as total_cost_usd,
  AVG(b.list_price_per_dbu) as avg_price_per_dbu,
  COUNT(DISTINCT b.cluster_id) as clusters_using,
  COUNT(DISTINCT b.principal_email) as users_using,
  COUNT(DISTINCT b.workspace_name) as workspaces_using,
  AVG(CASE WHEN b.is_photon THEN 1.0 ELSE 0.0 END) as photon_usage_rate,
  AVG(b.auto_termination_minutes) as avg_autoterm_minutes,
  MAX(b.core_count) as core_count,
  MAX(b.memory_mb) as memory_mb,
  t.avg_cpu_pct,
  t.max_cpu_pct,
  t.avg_mem_pct,
  t.max_mem_pct,
  t.total_network_gb,
  t.avg_network_mb,
  CURRENT_TIMESTAMP() as created_at
FROM ex_dash_temp.billing_forecast.all_purpose_base b
LEFT JOIN telemetry_by_instance t 
  ON b.node_type = t.node_type 
  AND b.usage_date = t.usage_date
WHERE b.usage_date >= '{start_date}'
GROUP BY 
  b.usage_date, 
  b.node_type, 
  t.avg_cpu_pct, 
  t.max_cpu_pct, 
  t.avg_mem_pct, 
  t.max_mem_pct, 
  t.total_network_gb, 
  t.avg_network_mb
"""

spark.sql(instance_daily_query)

displayHTML(
    "‚úÖ Instance daily telemetry table created: "
    "ex_dash_temp.billing_forecast.instance_daily_telemetry"
)

# Summary
summary = spark.sql("""
SELECT 
  COUNT(*) as records,
  COUNT(DISTINCT instance_type) as instance_types,
  ROUND(SUM(total_cost_usd), 2) as total_cost
FROM ex_dash_temp.billing_forecast.instance_daily_telemetry
""")

displayHTML("<h3>üìä SUMMARY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql(
    "SELECT * FROM ex_dash_temp.billing_forecast.instance_daily_telemetry "
    "ORDER BY total_cost_usd DESC LIMIT 50"
)
display(sample_data)

In [0]:
# Step 5: Create Per User Total Cost Table
# One row per user with aggregated costs and average telemetry

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"""
<h2>STEP 5: CREATE PER USER TOTAL COST TABLE</h2>
<p>üë§ Creating user-level total cost analysis (one row per user)</p>
""")

# Create per user total cost table
user_total_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.user_total_cost
USING DELTA
AS
SELECT 
  principal_email,
  principal_type,
  
  -- Primary workspace (most used)
  FIRST(workspace_name) as primary_workspace,
  COUNT(DISTINCT workspace_name) as workspaces_used,
  
  -- Cost metrics
  ROUND(SUM(total_cost_usd), 2) as total_cost_usd,
  ROUND(SUM(total_dbus), 2) as total_dbus,
  ROUND(AVG(avg_price_per_dbu), 2) as avg_price_per_dbu,
  
  -- Usage metrics
  COUNT(DISTINCT usage_date) as days_active,
  SUM(clusters_used) as total_cluster_days,
  COUNT(DISTINCT clusters_used) as unique_clusters,
  SUM(instance_types_used) as total_instance_type_days,
  
  -- Configuration metrics
  ROUND(AVG(photon_usage_rate) * 100, 1) as photon_usage_pct,
  ROUND(AVG(avg_autoterm_minutes), 0) as avg_autoterm_minutes,
  ROUND(AVG(avg_cores), 1) as avg_cores,
  ROUND(AVG(avg_memory_mb) / 1024, 1) as avg_memory_gb,
  
  -- Telemetry averages
  ROUND(AVG(CASE WHEN avg_cpu_pct IS NOT NULL THEN avg_cpu_pct END), 2) as avg_cpu_pct,
  ROUND(MAX(CASE WHEN max_cpu_pct IS NOT NULL THEN max_cpu_pct END), 2) as max_cpu_pct,
  ROUND(AVG(CASE WHEN avg_mem_pct IS NOT NULL THEN avg_mem_pct END), 2) as avg_mem_pct,
  ROUND(MAX(CASE WHEN max_mem_pct IS NOT NULL THEN max_mem_pct END), 2) as max_mem_pct,
  ROUND(SUM(CASE WHEN total_network_gb IS NOT NULL THEN total_network_gb ELSE 0 END), 2) as total_network_gb,
  ROUND(AVG(CASE WHEN avg_network_mb IS NOT NULL THEN avg_network_mb END), 2) as avg_network_mb,
  
  -- Telemetry coverage
  COUNT(CASE WHEN avg_cpu_pct IS NOT NULL THEN 1 END) as days_with_telemetry,
  ROUND(COUNT(CASE WHEN avg_cpu_pct IS NOT NULL THEN 1 END) * 100.0 / COUNT(*), 1) as telemetry_coverage_pct,
  
  MIN(usage_date) as first_usage_date,
  MAX(usage_date) as last_usage_date,
  CURRENT_TIMESTAMP() as created_at
  
FROM ex_dash_temp.billing_forecast.user_daily_telemetry
WHERE usage_date >= '{start_date}'
GROUP BY principal_email, principal_type
ORDER BY total_cost_usd DESC
"""

spark.sql(user_total_query)

displayHTML("‚úÖ User total cost table created: ex_dash_temp.billing_forecast.user_total_cost")

# Validate against base table
validation = spark.sql(f"""
WITH base_total AS (
  SELECT ROUND(SUM(total_cost_usd), 2) as base_cost
  FROM ex_dash_temp.billing_forecast.all_purpose_base
  WHERE usage_date >= '{start_date}'
),
user_total AS (
  SELECT ROUND(SUM(total_cost_usd), 2) as user_cost, COUNT(*) as user_count
  FROM ex_dash_temp.billing_forecast.user_total_cost
)
SELECT 
  b.base_cost,
  u.user_cost,
  u.user_count,
  ROUND(b.base_cost - COALESCE(u.user_cost, 0), 2) as difference,
  ROUND(ABS(b.base_cost - COALESCE(u.user_cost, 0)) / NULLIF(b.base_cost, 0) * 100, 2) as variance_pct
FROM base_total b, user_total u
""")

val_data = validation.collect()[0]
variance_pct = val_data['variance_pct'] or 0

displayHTML("<h3>üîç COST VALIDATION:</h3>")
display(validation)

if variance_pct < 1:
    displayHTML(f"<p>‚úÖ <b>Validation Passed:</b> User aggregated cost matches base table cost (Variance: {variance_pct:.2f}%)</p>")
else:
    displayHTML(f"<p>‚ö†Ô∏è <b>Validation Warning:</b> User aggregated cost differs from base table (Variance: {variance_pct:.2f}%)</p>")

# Summary
summary = spark.sql(f"""
SELECT 
  COUNT(*) as total_users,
  ROUND(SUM(total_cost_usd), 2) as total_cost_usd,
  ROUND(AVG(total_cost_usd), 2) as avg_cost_per_user,
  ROUND(AVG(days_active), 1) as avg_days_active,
  ROUND(AVG(avg_cpu_pct), 2) as avg_cpu_utilization,
  ROUND(AVG(avg_mem_pct), 2) as avg_memory_utilization,
  ROUND(AVG(telemetry_coverage_pct), 1) as avg_telemetry_coverage
FROM ex_dash_temp.billing_forecast.user_total_cost
""")

displayHTML("<h3>üìä SUMMARY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT * FROM ex_dash_temp.billing_forecast.user_total_cost ORDER BY total_cost_usd DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 6: Create Per Cluster Total Cost Table
# One row per cluster with aggregated costs and telemetry

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>STEP 6: CREATE PER CLUSTER TOTAL COST TABLE</h2><p>üíª Creating cluster-level total cost (one row per cluster)</p>")

# Create per cluster total cost
cluster_total_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.cluster_total_cost
USING DELTA
AS
WITH cluster_telemetry_avg AS (
  SELECT 
    cluster_id,
    ROUND(AVG(avg_cpu_pct), 2) as avg_cpu_pct,
    ROUND(MAX(max_cpu_pct), 2) as max_cpu_pct,
    ROUND(AVG(avg_mem_pct), 2) as avg_mem_pct,
    ROUND(MAX(max_mem_pct), 2) as max_mem_pct,
    ROUND(SUM(total_network_gb), 2) as total_network_gb,
    ROUND(AVG(avg_network_mb), 2) as avg_network_mb
  FROM ex_dash_temp.billing_forecast.cluster_daily_telemetry
  WHERE avg_cpu_pct IS NOT NULL
  GROUP BY cluster_id
),
cluster_config AS (
  SELECT 
    cluster_id,
    FIRST(driver_node_type) as driver_instance_type,
    FIRST(worker_node_type) as worker_instance_type,
    FIRST(worker_count) as worker_count,
    FIRST(min_autoscale_workers) as min_workers,
    FIRST(max_autoscale_workers) as max_workers
  FROM system.compute.clusters
  WHERE change_time >= '{start_date}'
  GROUP BY cluster_id
)
SELECT 
  b.cluster_id,
  FIRST(b.cluster_name) as cluster_name,
  FIRST(b.cluster_owner) as cluster_owner,
  FIRST(b.workspace_name) as workspace_name,
  FIRST(b.node_type) as primary_instance_type,
  COALESCE(cc.driver_instance_type, FIRST(b.node_type)) as driver_instance_type,
  COALESCE(cc.worker_instance_type, FIRST(b.node_type)) as worker_instance_type,
  cc.worker_count,
  cc.min_workers,
  cc.max_workers,
  ROUND(SUM(b.total_cost_usd), 2) as total_cost_usd,
  ROUND(SUM(b.dbus), 2) as total_dbus,
  COUNT(DISTINCT b.usage_date) as days_active,
  t.avg_cpu_pct,
  t.max_cpu_pct,
  t.avg_mem_pct,
  t.max_mem_pct,
  t.total_network_gb,
  t.avg_network_mb,
  ROUND(t.avg_cpu_pct / NULLIF(MAX(b.core_count) * 100, 0) * 100, 1) as cpu_efficiency_pct,
  ROUND(t.avg_mem_pct, 1) as memory_efficiency_pct,
  MAX(b.core_count) as core_count,
  ROUND(MAX(b.memory_mb) / 1024, 1) as memory_gb,
  MAX(b.is_photon) as photon_enabled,
  MAX(b.auto_termination_minutes) as autoterm_minutes,
  COUNT(CASE WHEN t.avg_cpu_pct IS NOT NULL THEN 1 END) * 100.0 / COUNT(*) as telemetry_coverage_pct,
  MIN(b.usage_date) as first_usage_date,
  MAX(b.usage_date) as last_usage_date,
  CURRENT_TIMESTAMP() as created_at
FROM ex_dash_temp.billing_forecast.all_purpose_base b
LEFT JOIN cluster_telemetry_avg t ON b.cluster_id = t.cluster_id
LEFT JOIN cluster_config cc ON b.cluster_id = cc.cluster_id
WHERE b.usage_date >= '{start_date}'
GROUP BY b.cluster_id, t.avg_cpu_pct, t.max_cpu_pct, t.avg_mem_pct, t.max_mem_pct, t.total_network_gb, t.avg_network_mb,
         cc.driver_instance_type, cc.worker_instance_type, cc.worker_count, cc.min_workers, cc.max_workers
ORDER BY total_cost_usd DESC
"""

spark.sql(cluster_total_query)

displayHTML("‚úÖ Cluster total cost table created: ex_dash_temp.billing_forecast.cluster_total_cost")

# Validate against base table
validation = spark.sql(f"""
WITH base_total AS (
  SELECT ROUND(SUM(total_cost_usd), 2) as base_cost
  FROM ex_dash_temp.billing_forecast.all_purpose_base
  WHERE usage_date >= '{start_date}'
),
cluster_total AS (
  SELECT ROUND(SUM(total_cost_usd), 2) as cluster_cost, COUNT(*) as cluster_count
  FROM ex_dash_temp.billing_forecast.cluster_total_cost
)
SELECT 
  b.base_cost,
  c.cluster_cost,
  c.cluster_count,
  ROUND(b.base_cost - c.cluster_cost, 2) as difference,
  ROUND(ABS(b.base_cost - c.cluster_cost) / NULLIF(b.base_cost, 0) * 100, 2) as variance_pct
FROM base_total b, cluster_total c
""")

val_data = validation.collect()[0]
variance_pct = val_data['variance_pct'] or 0

displayHTML("<h3>üîç COST VALIDATION:</h3>")
display(validation)

if variance_pct < 1:
    displayHTML(f"<p>‚úÖ <b>Validation Passed:</b> Cluster aggregated cost matches base table cost (Variance: {variance_pct:.2f}%)</p>")
else:
    displayHTML(f"<p>‚ö†Ô∏è <b>Validation Warning:</b> Cluster aggregated cost differs from base table (Variance: {variance_pct:.2f}%)</p>")

# Summary
summary = spark.sql(f"""
SELECT 
  COUNT(*) as total_clusters,
  ROUND(SUM(total_cost_usd), 2) as total_cost_usd,
  ROUND(AVG(total_cost_usd), 2) as avg_cost_per_cluster,
  ROUND(AVG(days_active), 1) as avg_days_active,
  ROUND(AVG(cpu_efficiency_pct), 1) as avg_cpu_efficiency,
  ROUND(AVG(memory_efficiency_pct), 1) as avg_memory_efficiency
FROM ex_dash_temp.billing_forecast.cluster_total_cost
""")

displayHTML("<h3>üìä SUMMARY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT cluster_id, cluster_name, driver_instance_type, worker_instance_type, worker_count, min_workers, max_workers, total_cost_usd, cpu_efficiency_pct, memory_efficiency_pct FROM ex_dash_temp.billing_forecast.cluster_total_cost ORDER BY total_cost_usd DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 6b: Enrich Cluster Table with Driver and Worker Instance Types
# Adds specific driver and worker instance information from system.compute.clusters

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML("<h2>STEP 6B: ENRICH CLUSTER TABLE WITH DRIVER/WORKER INSTANCE TYPES</h2><p>üîß Adding driver and worker instance type columns</p>")

# First, add the columns if they don't exist
try:
    spark.sql("""
    ALTER TABLE ex_dash_temp.billing_forecast.cluster_total_cost 
    ADD COLUMNS (
        driver_instance_type STRING COMMENT 'Driver node instance type',
        worker_instance_type STRING COMMENT 'Worker node instance type',
        worker_count BIGINT COMMENT 'Fixed worker count',
        min_workers BIGINT COMMENT 'Min autoscale workers',
        max_workers BIGINT COMMENT 'Max autoscale workers'
    )
    """)
    displayHTML("<p>‚úÖ Columns added successfully</p>")
except Exception as e:
    if "already exists" in str(e).lower() or "duplicate" in str(e).lower():
        displayHTML("<p>‚ÑπÔ∏è Columns already exist, proceeding to update</p>")
    else:
        displayHTML(f"<p>‚ö†Ô∏è Error adding columns: {str(e)}</p>")

# Populate the columns from system.compute.clusters (get latest configuration)
update_query = f"""
MERGE INTO ex_dash_temp.billing_forecast.cluster_total_cost AS target
USING (
    WITH ranked_configs AS (
        SELECT 
            cluster_id,
            driver_node_type,
            worker_node_type,
            worker_count,
            min_autoscale_workers,
            max_autoscale_workers,
            change_time,
            ROW_NUMBER() OVER (PARTITION BY cluster_id ORDER BY change_time DESC) as rn
        FROM system.compute.clusters
        WHERE driver_node_type IS NOT NULL
    )
    SELECT 
        cluster_id,
        driver_node_type as driver_instance_type,
        worker_node_type as worker_instance_type,
        worker_count,
        min_autoscale_workers as min_workers,
        max_autoscale_workers as max_workers
    FROM ranked_configs
    WHERE rn = 1
) AS source
ON target.cluster_id = source.cluster_id
WHEN MATCHED THEN UPDATE SET
    target.driver_instance_type = source.driver_instance_type,
    target.worker_instance_type = source.worker_instance_type,
    target.worker_count = source.worker_count,
    target.min_workers = source.min_workers,
    target.max_workers = source.max_workers
"""

spark.sql(update_query)

displayHTML("‚úÖ Driver and worker instance types populated from latest cluster configurations")

# Verify the update
verification = spark.sql("""
SELECT 
    COUNT(*) as total_clusters,
    COUNT(driver_instance_type) as clusters_with_driver_type,
    COUNT(worker_instance_type) as clusters_with_worker_type,
    COUNT(CASE WHEN driver_instance_type != worker_instance_type THEN 1 END) as clusters_with_different_types,
    COUNT(worker_count) as clusters_with_fixed_workers,
    COUNT(CASE WHEN min_workers IS NOT NULL AND max_workers IS NOT NULL THEN 1 END) as clusters_with_autoscale
FROM ex_dash_temp.billing_forecast.cluster_total_cost
""")

displayHTML("<h3>üìä SUMMARY:</h3>")
display(verification)

In [0]:
# Step 7: Create Per Instance Total Cost Table
# One row per instance type with aggregated costs and telemetry

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>STEP 7: CREATE PER INSTANCE TOTAL COST TABLE</h2><p>üñ•Ô∏è Creating instance-level total cost (one row per instance)</p>")

# Create per instance total cost
instance_total_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.instance_total_cost
USING DELTA
AS
WITH instance_telemetry_avg AS (
  SELECT 
    instance_type,
    ROUND(AVG(avg_cpu_pct), 2) as avg_cpu_pct,
    ROUND(MAX(max_cpu_pct), 2) as max_cpu_pct,
    ROUND(AVG(avg_mem_pct), 2) as avg_mem_pct,
    ROUND(MAX(max_mem_pct), 2) as max_mem_pct,
    ROUND(SUM(total_network_gb), 2) as total_network_gb,
    ROUND(AVG(avg_network_mb), 2) as avg_network_mb
  FROM ex_dash_temp.billing_forecast.instance_daily_telemetry
  WHERE avg_cpu_pct IS NOT NULL
  GROUP BY instance_type
)
SELECT 
  b.node_type as instance_type,
  ROUND(SUM(b.total_cost_usd), 2) as total_cost_usd,
  ROUND(SUM(b.dbus), 2) as total_dbus,
  COUNT(DISTINCT b.cluster_id) as unique_clusters,
  COUNT(DISTINCT b.principal_email) as unique_users,
  COUNT(DISTINCT b.workspace_name) as unique_workspaces,
  COUNT(DISTINCT b.usage_date) as days_active,
  t.avg_cpu_pct,
  t.max_cpu_pct,
  t.avg_mem_pct,
  t.max_mem_pct,
  t.total_network_gb,
  t.avg_network_mb,
  ROUND(t.avg_cpu_pct / NULLIF(MAX(b.core_count) * 100, 0) * 100, 1) as cpu_efficiency_pct,
  ROUND(t.avg_mem_pct, 1) as memory_efficiency_pct,
  MAX(b.core_count) as core_count,
  ROUND(MAX(b.memory_mb) / 1024, 1) as memory_gb,
  AVG(CASE WHEN b.is_photon THEN 100.0 ELSE 0.0 END) as photon_usage_pct,
  AVG(b.auto_termination_minutes) as avg_autoterm_minutes,
  100.0 as telemetry_coverage_pct,
  MIN(b.usage_date) as first_usage_date,
  MAX(b.usage_date) as last_usage_date,
  CURRENT_TIMESTAMP() as created_at
FROM ex_dash_temp.billing_forecast.all_purpose_base b
LEFT JOIN instance_telemetry_avg t ON b.node_type = t.instance_type
WHERE b.usage_date >= '{start_date}'
GROUP BY b.node_type, t.avg_cpu_pct, t.max_cpu_pct, t.avg_mem_pct, t.max_mem_pct, t.total_network_gb, t.avg_network_mb
ORDER BY total_cost_usd DESC
"""

spark.sql(instance_total_query)

displayHTML("‚úÖ Instance total cost table created: ex_dash_temp.billing_forecast.instance_total_cost")

# Validate against base table
validation = spark.sql(f"""
WITH base_total AS (
  SELECT ROUND(SUM(total_cost_usd), 2) as base_cost
  FROM ex_dash_temp.billing_forecast.all_purpose_base
  WHERE usage_date >= '{start_date}'
),
instance_total AS (
  SELECT ROUND(SUM(total_cost_usd), 2) as instance_cost, COUNT(*) as instance_count
  FROM ex_dash_temp.billing_forecast.instance_total_cost
)
SELECT 
  b.base_cost,
  i.instance_cost,
  i.instance_count,
  ROUND(b.base_cost - i.instance_cost, 2) as difference,
  ROUND(ABS(b.base_cost - i.instance_cost) / NULLIF(b.base_cost, 0) * 100, 2) as variance_pct
FROM base_total b, instance_total i
""")

val_data = validation.collect()[0]
variance_pct = val_data['variance_pct'] or 0

displayHTML("<h3>üîç COST VALIDATION:</h3>")
display(validation)

if variance_pct < 1:
    displayHTML(f"<p>‚úÖ <b>Validation Passed:</b> Instance aggregated cost matches base table cost (Variance: {variance_pct:.2f}%)</p>")
else:
    displayHTML(f"<p>‚ö†Ô∏è <b>Validation Warning:</b> Instance aggregated cost differs from base table (Variance: {variance_pct:.2f}%)</p>")

# Summary
summary = spark.sql(f"""
SELECT 
  COUNT(*) as total_instance_types,
  ROUND(SUM(total_cost_usd), 2) as total_cost_usd,
  ROUND(AVG(total_cost_usd), 2) as avg_cost_per_instance,
  ROUND(AVG(cpu_efficiency_pct), 1) as avg_cpu_efficiency,
  ROUND(AVG(memory_efficiency_pct), 1) as avg_memory_efficiency
FROM ex_dash_temp.billing_forecast.instance_total_cost
""")

displayHTML("<h3>üìä SUMMARY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT * FROM ex_dash_temp.billing_forecast.instance_total_cost ORDER BY total_cost_usd DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 8: Create Per User Opportunity Recommendations
# Identifies cost optimization opportunities for each user

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML("<h2>STEP 8: CREATE PER USER OPPORTUNITY RECOMMENDATIONS</h2><p>üéØ Creating user-level cost optimization opportunities</p>")

# Create per user opportunities table
user_opportunities_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.user_opportunities
USING DELTA
AS
WITH base_cost AS (
  SELECT SUM(total_cost_usd) as total_all_purpose_cost
  FROM ex_dash_temp.billing_forecast.all_purpose_base
  WHERE usage_date >= '{start_date}'
)
SELECT 
  u.principal_email,
  u.principal_type,
  u.primary_workspace,
  u.total_cost_usd,
  u.days_active,
  u.unique_clusters,
  u.avg_cpu_pct,
  u.avg_mem_pct,
  u.avg_network_mb,
  u.total_network_gb,
  u.avg_cores,
  u.avg_memory_gb,
  u.photon_usage_pct,
  u.avg_autoterm_minutes,
  u.telemetry_coverage_pct,
  
  -- Opportunity identification
  CASE 
    WHEN u.avg_cpu_pct < 20 AND u.avg_mem_pct < 30 THEN 'CRITICAL'
    WHEN u.avg_cpu_pct < 30 OR u.avg_mem_pct < 40 THEN 'HIGH'
    WHEN u.avg_autoterm_minutes > 60 OR u.avg_autoterm_minutes IS NULL THEN 'MEDIUM'
    WHEN u.photon_usage_pct < 50 THEN 'LOW'
    ELSE 'OPTIMAL'
  END as opportunity_priority,
  
  -- Detailed recommendations
  CASE 
    WHEN u.avg_cpu_pct < 20 AND u.avg_mem_pct < 30 
      THEN CONCAT('CRITICAL: Severe under-utilization (CPU: ', ROUND(u.avg_cpu_pct, 1), '%, Memory: ', ROUND(u.avg_mem_pct, 1), '%). Switch to smaller instance types immediately.')
    WHEN u.avg_cpu_pct < 30 
      THEN CONCAT('HIGH: Low CPU utilization (', ROUND(u.avg_cpu_pct, 1), '%). Consider compute-optimized instances or reduce cluster size.')
    WHEN u.avg_mem_pct < 40 
      THEN CONCAT('HIGH: Low memory utilization (', ROUND(u.avg_mem_pct, 1), '%). Consider memory-optimized instances or reduce memory allocation.')
    WHEN u.avg_autoterm_minutes > 60 OR u.avg_autoterm_minutes IS NULL 
      THEN CONCAT('MEDIUM: Auto-termination set to ', COALESCE(CAST(u.avg_autoterm_minutes AS STRING), 'NONE'), ' minutes. Reduce to 15-30 minutes to save on idle time.')
    WHEN u.photon_usage_pct < 50 
      THEN CONCAT('LOW: Photon usage at ', ROUND(u.photon_usage_pct, 1), '%. Enable Photon for 2-3x performance improvement.')
    ELSE 'OPTIMAL: Resource utilization appears efficient. Continue monitoring.'
  END as recommendation,
  
  -- Savings calculation
  CASE 
    WHEN u.avg_cpu_pct < 20 AND u.avg_mem_pct < 30 THEN ROUND(u.total_cost_usd * 0.40, 2)
    WHEN u.avg_cpu_pct < 30 THEN ROUND(u.total_cost_usd * 0.25, 2)
    WHEN u.avg_mem_pct < 40 THEN ROUND(u.total_cost_usd * 0.20, 2)
    WHEN u.avg_autoterm_minutes > 60 OR u.avg_autoterm_minutes IS NULL THEN ROUND(u.total_cost_usd * 0.15, 2)
    WHEN u.photon_usage_pct < 50 THEN ROUND(u.total_cost_usd * 0.10, 2)
    ELSE 0
  END as estimated_monthly_savings,
  
  -- Action items
  CASE 
    WHEN u.avg_cpu_pct < 20 AND u.avg_mem_pct < 30 
      THEN 'Downsize to instance with 50% fewer cores and memory'
    WHEN u.avg_cpu_pct < 30 
      THEN 'Switch to compute-optimized instance family'
    WHEN u.avg_mem_pct < 40 
      THEN 'Reduce memory allocation by 30-40%'
    WHEN u.avg_autoterm_minutes > 60 OR u.avg_autoterm_minutes IS NULL 
      THEN 'Set auto-termination to 20 minutes'
    WHEN u.photon_usage_pct < 50 
      THEN 'Enable Photon on all clusters'
    ELSE 'No immediate action required'
  END as action_item,
  
  -- Validated savings (capped at total cost)
  ROUND(
    LEAST(
      CASE 
        WHEN u.avg_cpu_pct < 20 AND u.avg_mem_pct < 30 THEN u.total_cost_usd * 0.40
        WHEN u.avg_cpu_pct < 30 THEN u.total_cost_usd * 0.25
        WHEN u.avg_mem_pct < 40 THEN u.total_cost_usd * 0.20
        WHEN u.avg_autoterm_minutes > 60 OR u.avg_autoterm_minutes IS NULL THEN u.total_cost_usd * 0.15
        WHEN u.photon_usage_pct < 50 THEN u.total_cost_usd * 0.10
        ELSE 0
      END,
      u.total_cost_usd,
      (SELECT total_all_purpose_cost FROM base_cost)
    ), 2
  ) as validated_savings,
  
  CURRENT_TIMESTAMP() as created_at
  
FROM ex_dash_temp.billing_forecast.user_total_cost u
ORDER BY estimated_monthly_savings DESC, u.total_cost_usd DESC
"""

spark.sql(user_opportunities_query)

displayHTML("‚úÖ User opportunities table created: ex_dash_temp.billing_forecast.user_opportunities")

# Show summary by priority
summary = spark.sql(f"""
SELECT 
  opportunity_priority,
  COUNT(*) as users_count,
  ROUND(SUM(total_cost_usd), 2) as total_cost,
  ROUND(SUM(validated_savings), 2) as total_potential_savings,
  ROUND(AVG(avg_cpu_pct), 2) as avg_cpu_utilization,
  ROUND(AVG(avg_mem_pct), 2) as avg_memory_utilization
FROM ex_dash_temp.billing_forecast.user_opportunities
GROUP BY opportunity_priority
ORDER BY 
  CASE opportunity_priority
    WHEN 'CRITICAL' THEN 1
    WHEN 'HIGH' THEN 2
    WHEN 'MEDIUM' THEN 3
    WHEN 'LOW' THEN 4
    ELSE 5
  END
""")

displayHTML("<h3>üìä SUMMARY BY PRIORITY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT * FROM ex_dash_temp.billing_forecast.user_opportunities ORDER BY validated_savings DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 9: Create Per Cluster Opportunity Recommendations
# Identifies cost optimization opportunities for each cluster with specific instance type recommendations

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML("<h2>STEP 9: CREATE PER CLUSTER OPPORTUNITY RECOMMENDATIONS</h2><p>üéØ Creating cluster-level cost optimization opportunities</p>")

# Get total all-purpose cost for savings validation
total_cost_val = spark.sql(f"""
SELECT ROUND(SUM(total_cost_usd), 2) as total_cost
FROM ex_dash_temp.billing_forecast.all_purpose_base
WHERE usage_date >= '{start_date}'
""").collect()[0]['total_cost']

# Create cluster opportunities with specific instance type recommendations
cluster_opp_df = spark.sql(f"""
WITH cluster_analysis AS (
  SELECT 
    cluster_id,
    cluster_name,
    cluster_owner,
    workspace_name,
    primary_instance_type,
    driver_instance_type,
    worker_instance_type,
    worker_count,
    min_workers,
    max_workers,
    total_cost_usd,
    days_active,
    avg_cpu_pct,
    avg_mem_pct,
    avg_network_mb,
    total_network_gb,
    cpu_efficiency_pct,
    memory_efficiency_pct,
    core_count,
    memory_gb,
    telemetry_coverage_pct,
    autoterm_minutes,
    
    -- Calculate raw savings
    CASE 
      WHEN cpu_efficiency_pct < 15 AND memory_efficiency_pct < 25 THEN total_cost_usd * 0.45
      WHEN cpu_efficiency_pct < 25 THEN total_cost_usd * 0.30
      WHEN memory_efficiency_pct < 40 THEN total_cost_usd * 0.20
      WHEN autoterm_minutes > 60 THEN total_cost_usd * 0.15
      ELSE total_cost_usd * 0.05
    END as raw_savings,
    
    -- Priority
    CASE 
      WHEN cpu_efficiency_pct < 15 AND memory_efficiency_pct < 25 THEN 'CRITICAL'
      WHEN cpu_efficiency_pct < 25 OR memory_efficiency_pct < 40 THEN 'HIGH'
      ELSE 'LOW'
    END as opportunity_priority,
    
    -- Suggested instances (downsize by one level)
    CASE 
      WHEN driver_instance_type LIKE '%12xlarge%' THEN REGEXP_REPLACE(driver_instance_type, '12xlarge', '8xlarge')
      WHEN driver_instance_type LIKE '%16xlarge%' THEN REGEXP_REPLACE(driver_instance_type, '16xlarge', '8xlarge')
      WHEN driver_instance_type LIKE '%8xlarge%' THEN REGEXP_REPLACE(driver_instance_type, '8xlarge', '4xlarge')
      WHEN driver_instance_type LIKE '%4xlarge%' THEN REGEXP_REPLACE(driver_instance_type, '4xlarge', '2xlarge')
      WHEN driver_instance_type LIKE '%2xlarge%' THEN REGEXP_REPLACE(driver_instance_type, '2xlarge', 'xlarge')
      ELSE driver_instance_type
    END as suggested_driver_instance,
    
    CASE 
      WHEN worker_instance_type LIKE '%12xlarge%' THEN REGEXP_REPLACE(worker_instance_type, '12xlarge', '8xlarge')
      WHEN worker_instance_type LIKE '%16xlarge%' THEN REGEXP_REPLACE(worker_instance_type, '16xlarge', '8xlarge')
      WHEN worker_instance_type LIKE '%8xlarge%' THEN REGEXP_REPLACE(worker_instance_type, '8xlarge', '4xlarge')
      WHEN worker_instance_type LIKE '%4xlarge%' THEN REGEXP_REPLACE(worker_instance_type, '4xlarge', '2xlarge')
      WHEN worker_instance_type LIKE '%2xlarge%' THEN REGEXP_REPLACE(worker_instance_type, '2xlarge', 'xlarge')
      ELSE worker_instance_type
    END as suggested_worker_instance,
    
    -- Current worker configuration
    CASE 
      WHEN worker_count IS NOT NULL THEN CONCAT('Fixed: ', worker_count, ' workers')
      WHEN min_workers IS NOT NULL AND max_workers IS NOT NULL THEN CONCAT('Autoscale: ', min_workers, '-', max_workers, ' workers')
      ELSE 'Unknown'
    END as current_worker_config
    
  FROM ex_dash_temp.billing_forecast.cluster_total_cost
  WHERE telemetry_coverage_pct > 50
),
cluster_with_recommendations AS (
  SELECT 
    *,
    -- Detailed recommendation with current configuration
    CASE 
      WHEN cpu_efficiency_pct < 15 AND memory_efficiency_pct < 25 THEN 
        CASE 
          WHEN driver_instance_type != worker_instance_type THEN
            CONCAT('CRITICAL: Cluster "', cluster_name, '" severely under-utilized (CPU: ', ROUND(cpu_efficiency_pct, 1), '%, Memory: ', ROUND(memory_efficiency_pct, 1), 
                   '%). Current: Driver=', driver_instance_type, ', Workers=', worker_instance_type, ' (', current_worker_config, '). ',
                   'Recommended: Change driver to ', suggested_driver_instance, ' and workers to ', suggested_worker_instance, '.')
          ELSE
            CONCAT('CRITICAL: Cluster "', cluster_name, '" severely under-utilized (CPU: ', ROUND(cpu_efficiency_pct, 1), '%, Memory: ', ROUND(memory_efficiency_pct, 1), 
                   '%). Current: Driver & Workers=', driver_instance_type, ' (', current_worker_config, '). ',
                   'Recommended: Change both to ', suggested_driver_instance, '.')
        END
      WHEN cpu_efficiency_pct < 25 THEN 
        CASE 
          WHEN driver_instance_type != worker_instance_type THEN
            CONCAT('HIGH: Cluster "', cluster_name, '" has low CPU efficiency (', ROUND(cpu_efficiency_pct, 1), 
                   '%). Current: Driver=', driver_instance_type, ', Workers=', worker_instance_type, ' (', current_worker_config, '). ',
                   'Recommended: Change driver to ', suggested_driver_instance, ' and workers to ', suggested_worker_instance, '.')
          ELSE
            CONCAT('HIGH: Cluster "', cluster_name, '" has low CPU efficiency (', ROUND(cpu_efficiency_pct, 1), 
                   '%). Current: Driver & Workers=', driver_instance_type, ' (', current_worker_config, '). ',
                   'Recommended: Change both to ', suggested_driver_instance, '.')
        END
      WHEN memory_efficiency_pct < 40 THEN 
        CASE 
          WHEN driver_instance_type != worker_instance_type THEN
            CONCAT('HIGH: Cluster "', cluster_name, '" has low memory efficiency (', ROUND(memory_efficiency_pct, 1), 
                   '%). Current: Driver=', driver_instance_type, ', Workers=', worker_instance_type, ' (', current_worker_config, '). ',
                   'Recommended: Switch to compute-optimized instances.')
          ELSE
            CONCAT('HIGH: Cluster "', cluster_name, '" has low memory efficiency (', ROUND(memory_efficiency_pct, 1), 
                   '%). Current: Driver & Workers=', driver_instance_type, ' (', current_worker_config, '). ',
                   'Recommended: Switch to compute-optimized instances.')
        END
      ELSE 
        CONCAT('LOW: Cluster "', cluster_name, '" is reasonably utilized. Continue monitoring.')
    END as recommendation,
    
    -- Action item with specific instance types
    CASE 
      WHEN cpu_efficiency_pct < 15 AND memory_efficiency_pct < 25 THEN 
        CASE 
          WHEN driver_instance_type != worker_instance_type THEN
            CONCAT('Change Driver: ', driver_instance_type, ' ‚Üí ', suggested_driver_instance, '; Change Workers: ', worker_instance_type, ' ‚Üí ', suggested_worker_instance, ' (Keep ', current_worker_config, ')')
          ELSE
            CONCAT('Change Driver & Workers: ', driver_instance_type, ' ‚Üí ', suggested_driver_instance, ' (Keep ', current_worker_config, ')')
        END
      WHEN cpu_efficiency_pct < 25 THEN 
        CASE 
          WHEN driver_instance_type != worker_instance_type THEN
            CONCAT('Change Driver: ', driver_instance_type, ' ‚Üí ', suggested_driver_instance, '; Change Workers: ', worker_instance_type, ' ‚Üí ', suggested_worker_instance, ' (Keep ', current_worker_config, ')')
          ELSE
            CONCAT('Change Driver & Workers: ', driver_instance_type, ' ‚Üí ', suggested_driver_instance, ' (Keep ', current_worker_config, ')')
        END
      WHEN memory_efficiency_pct < 40 THEN 
        CASE 
          WHEN driver_instance_type != worker_instance_type THEN
            CONCAT('Switch Driver (', driver_instance_type, ') and Workers (', worker_instance_type, ') to compute-optimized (Keep ', current_worker_config, ')')
          ELSE
            CONCAT('Switch Driver & Workers (', driver_instance_type, ') to compute-optimized (Keep ', current_worker_config, ')')
        END
      ELSE 
        'Continue monitoring'
    END as action_item
    
  FROM cluster_analysis
),
total_savings AS (
  SELECT SUM(raw_savings) as total_raw_savings
  FROM cluster_with_recommendations
)
SELECT 
  c.cluster_id,
  c.cluster_name,
  c.cluster_owner,
  c.workspace_name,
  c.primary_instance_type,
  c.driver_instance_type,
  c.worker_instance_type,
  c.suggested_driver_instance,
  c.suggested_worker_instance,
  c.worker_count,
  c.min_workers,
  c.max_workers,
  c.current_worker_config,
  c.total_cost_usd,
  c.days_active,
  c.avg_cpu_pct,
  c.avg_mem_pct,
  c.avg_network_mb,
  c.total_network_gb,
  c.cpu_efficiency_pct,
  c.memory_efficiency_pct,
  c.core_count,
  c.memory_gb,
  c.telemetry_coverage_pct,
  c.autoterm_minutes,
  c.opportunity_priority,
  c.recommendation,
  c.action_item,
  -- Cap individual savings proportionally if total exceeds all-purpose cost
  CASE 
    WHEN (SELECT total_raw_savings FROM total_savings) > {total_cost_val} THEN 
      ROUND(c.raw_savings * {total_cost_val} / (SELECT total_raw_savings FROM total_savings), 2)
    ELSE 
      ROUND(c.raw_savings, 2)
  END as validated_savings
FROM cluster_with_recommendations c
ORDER BY validated_savings DESC, total_cost_usd DESC
""")

# Write table
cluster_opp_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("ex_dash_temp.billing_forecast.cluster_opportunities")

displayHTML("‚úÖ Cluster opportunities table created: ex_dash_temp.billing_forecast.cluster_opportunities")

# Summary
summary = spark.sql("""
SELECT 
  opportunity_priority,
  COUNT(*) as clusters,
  ROUND(SUM(total_cost_usd), 2) as total_cost,
  ROUND(SUM(validated_savings), 2) as total_savings
FROM ex_dash_temp.billing_forecast.cluster_opportunities
GROUP BY opportunity_priority
ORDER BY 
  CASE opportunity_priority 
    WHEN 'CRITICAL' THEN 1 
    WHEN 'HIGH' THEN 2 
    ELSE 3 
  END
""")

displayHTML("<h3>üìä SUMMARY BY PRIORITY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("""
SELECT 
  cluster_name,
  driver_instance_type,
  worker_instance_type,
  current_worker_config,
  suggested_driver_instance,
  suggested_worker_instance,
  total_cost_usd,
  cpu_efficiency_pct,
  memory_efficiency_pct,
  opportunity_priority,
  recommendation,
  action_item,
  validated_savings
FROM ex_dash_temp.billing_forecast.cluster_opportunities 
ORDER BY validated_savings DESC 
LIMIT 50
""")
display(sample_data)

In [0]:
# Step 10: Create Per Instance Opportunity Recommendations
# Identifies cost optimization opportunities for each instance type

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML("<h2>STEP 10: CREATE PER INSTANCE OPPORTUNITY RECOMMENDATIONS</h2><p>üéØ Creating instance-level cost optimization opportunities</p>")

# Get total all-purpose cost for savings validation
total_cost_val = spark.sql(f"""
SELECT ROUND(SUM(total_cost_usd), 2) as total_cost
FROM ex_dash_temp.billing_forecast.all_purpose_base
WHERE usage_date >= '{start_date}'
""").collect()[0]['total_cost']

# Create instance opportunities
instance_opp_query = f"""
CREATE OR REPLACE TABLE ex_dash_temp.billing_forecast.instance_opportunities
USING DELTA
AS
WITH instance_analysis AS (
  SELECT 
    instance_type,
    total_cost_usd,
    unique_clusters,
    unique_users,
    unique_workspaces,
    days_active,
    avg_cpu_pct,
    avg_mem_pct,
    avg_network_mb,
    total_network_gb,
    cpu_efficiency_pct,
    memory_efficiency_pct,
    core_count,
    memory_gb,
    telemetry_coverage_pct,
    
    -- Calculate raw savings
    CASE 
      WHEN cpu_efficiency_pct < 15 THEN total_cost_usd * 0.50
      WHEN cpu_efficiency_pct < 25 THEN total_cost_usd * 0.35
      WHEN memory_efficiency_pct < 30 THEN total_cost_usd * 0.25
      ELSE total_cost_usd * 0.10
    END as raw_savings,
    
    -- Priority
    CASE 
      WHEN cpu_efficiency_pct < 15 THEN 'CRITICAL'
      WHEN cpu_efficiency_pct < 25 OR memory_efficiency_pct < 30 THEN 'HIGH'
      ELSE 'LOW'
    END as opportunity_priority,
    
    -- Recommendation
    CASE 
      WHEN cpu_efficiency_pct < 15 THEN 
        CONCAT('CRITICAL: Instance type "', instance_type, '" is severely under-utilized across ', unique_clusters, ' clusters (CPU: ', ROUND(cpu_efficiency_pct, 1), '%, Memory: ', ROUND(memory_efficiency_pct, 1), '%). Migrate all workloads to smaller instance family.')
      WHEN cpu_efficiency_pct < 25 THEN 
        CONCAT('HIGH: Instance type "', instance_type, '" has low CPU efficiency (', ROUND(cpu_efficiency_pct, 1), '%) across ', unique_clusters, ' clusters. Consider compute-optimized alternatives.')
      WHEN memory_efficiency_pct < 30 THEN 
        CONCAT('HIGH: Instance type "', instance_type, '" has low memory efficiency (', ROUND(memory_efficiency_pct, 1), '%) across ', unique_clusters, ' clusters. Consider compute-optimized alternatives.')
      ELSE 
        CONCAT('LOW: Instance type "', instance_type, '" is reasonably utilized across ', unique_clusters, ' clusters.')
    END as recommendation,
    
    -- Suggested action
    CASE 
      WHEN cpu_efficiency_pct < 15 THEN 
        CONCAT('Migrate to instance with ', CAST(CEIL(core_count * 0.4) AS INT), ' cores, ', ROUND(memory_gb * 0.4, 1), ' GB')
      WHEN cpu_efficiency_pct < 25 THEN 
        CONCAT('Switch to compute-optimized with ', CAST(CEIL(core_count * 0.6) AS INT), ' cores')
      WHEN memory_efficiency_pct < 30 THEN 
        'Switch to compute-optimized instance'
      ELSE 
        'Continue monitoring'
    END as suggested_action,
    
    CONCAT('Affects ', unique_clusters, ' clusters, ', unique_users, ' users across ', unique_workspaces, ' workspaces') as impact_scope
    
  FROM ex_dash_temp.billing_forecast.instance_total_cost
  WHERE telemetry_coverage_pct > 50
),
total_savings AS (
  SELECT SUM(raw_savings) as total_raw_savings
  FROM instance_analysis
)
SELECT 
  i.*,
  -- Cap individual savings proportionally if total exceeds all-purpose cost
  CASE 
    WHEN (SELECT total_raw_savings FROM total_savings) > {total_cost_val} THEN 
      ROUND(i.raw_savings * {total_cost_val} / (SELECT total_raw_savings FROM total_savings), 2)
    ELSE 
      ROUND(i.raw_savings, 2)
  END as validated_savings
FROM instance_analysis i
ORDER BY validated_savings DESC, total_cost_usd DESC
"""

spark.sql(instance_opp_query)

displayHTML("‚úÖ Instance opportunities table created: ex_dash_temp.billing_forecast.instance_opportunities")

# Summary
summary = spark.sql("""
SELECT 
  opportunity_priority,
  COUNT(*) as instances,
  ROUND(SUM(total_cost_usd), 2) as total_cost,
  ROUND(SUM(validated_savings), 2) as total_savings
FROM ex_dash_temp.billing_forecast.instance_opportunities
GROUP BY opportunity_priority
ORDER BY 
  CASE opportunity_priority 
    WHEN 'CRITICAL' THEN 1 
    WHEN 'HIGH' THEN 2 
    ELSE 3 
  END
""")

displayHTML("<h3>üìä SUMMARY BY PRIORITY:</h3>")
display(summary)

# Display sample
displayHTML("<h3>üìã SAMPLE DATA (50 rows):</h3>")
sample_data = spark.sql("SELECT * FROM ex_dash_temp.billing_forecast.instance_opportunities ORDER BY validated_savings DESC LIMIT 50")
display(sample_data)

In [0]:
# Step 11: Comprehensive Summary
# Display all created tables and overall analysis summary with validation

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>COMPREHENSIVE ALL-PURPOSE CLUSTER COST ANALYSIS - SUMMARY</h2><p>üìÖ Analysis Period: {start_date} to current date</p>")

# Get total all-purpose cost
total_cost_summary = spark.sql(f"""
SELECT 
  ROUND(SUM(total_cost_usd), 2) as total_all_purpose_cost,
  COUNT(DISTINCT usage_date) as days_analyzed,
  COUNT(DISTINCT cluster_id) as unique_clusters,
  COUNT(DISTINCT principal_email) as unique_users,
  COUNT(DISTINCT workspace_name) as unique_workspaces
FROM ex_dash_temp.billing_forecast.all_purpose_base
WHERE usage_date >= '{start_date}'
""")

total_cost_data = total_cost_summary.collect()[0]
total_all_purpose_cost = total_cost_data['total_all_purpose_cost'] or 0

displayHTML(f"<p>üí∞ <b>TOTAL ALL-PURPOSE COST:</b> ${total_all_purpose_cost:,.2f}</p>")

displayHTML(f"""
<h3>üìä ANALYSIS SCOPE:</h3>
<ul>
<li>Days Analyzed: {total_cost_data['days_analyzed']}</li>
<li>Unique Clusters: {total_cost_data['unique_clusters']}</li>
<li>Unique Users: {total_cost_data['unique_users']}</li>
<li>Unique Workspaces: {total_cost_data['unique_workspaces']}</li>
</ul>
""")

# Calculate total savings potential
savings_summary = spark.sql("""
WITH user_savings AS (
  SELECT COALESCE(SUM(validated_savings), 0) as user_savings, COUNT(*) as user_count
  FROM ex_dash_temp.billing_forecast.user_opportunities
),
cluster_savings AS (
  SELECT COALESCE(SUM(validated_savings), 0) as cluster_savings, COUNT(*) as cluster_count
  FROM ex_dash_temp.billing_forecast.cluster_opportunities
),
instance_savings AS (
  SELECT COALESCE(SUM(validated_savings), 0) as instance_savings, COUNT(*) as instance_count
  FROM ex_dash_temp.billing_forecast.instance_opportunities
)
SELECT 
  ROUND(u.user_savings, 2) as user_level_savings,
  u.user_count,
  ROUND(c.cluster_savings, 2) as cluster_level_savings,
  c.cluster_count,
  ROUND(i.instance_savings, 2) as instance_level_savings,
  i.instance_count,
  ROUND(GREATEST(u.user_savings, c.cluster_savings, i.instance_savings), 2) as max_potential_savings
FROM user_savings u, cluster_savings c, instance_savings i
""")

savings_data = savings_summary.collect()[0]
max_savings = savings_data['max_potential_savings'] or 0

displayHTML(f"""
<h3>üí∏ POTENTIAL SAVINGS ANALYSIS:</h3>
<ul>
<li>User-level Opportunities: ${savings_data['user_level_savings']:,.2f} ({savings_data['user_count']} users)</li>
<li>Cluster-level Opportunities: ${savings_data['cluster_level_savings']:,.2f} ({savings_data['cluster_count']} clusters)</li>
<li>Instance-level Opportunities: ${savings_data['instance_level_savings']:,.2f} ({savings_data['instance_count']} instance types)</li>
<li>Maximum Potential Savings: ${max_savings:,.2f} ({(max_savings/total_all_purpose_cost*100) if total_all_purpose_cost > 0 else 0:.1f}%)</li>
</ul>
""")

# Validate savings don't exceed total cost
if max_savings <= total_all_purpose_cost:
    displayHTML(f"<p>‚úÖ <b>Validation Passed:</b> Total savings (${max_savings:,.2f}) ‚â§ Total cost (${total_all_purpose_cost:,.2f})</p>")
else:
    displayHTML(f"<p>‚ö†Ô∏è <b>Validation Warning:</b> Total savings (${max_savings:,.2f}) exceeds total cost (${total_all_purpose_cost:,.2f})</p>")

displayHTML("""
<h3>üìä ALL TABLES CREATED IN: ex_dash_temp.billing_forecast</h3>
<ol>
<li>all_purpose_base - Base table with all-purpose cluster usage</li>
<li>user_daily_telemetry - Per user daily cost with telemetry</li>
<li>cluster_daily_telemetry - Per cluster daily cost with telemetry</li>
<li>instance_daily_telemetry - Per instance daily cost with telemetry</li>
<li>user_total_cost - Per user total cost (one row per user)</li>
<li>cluster_total_cost - Per cluster total cost (one row per cluster)</li>
<li>instance_total_cost - Per instance total cost (one row per instance)</li>
<li>user_opportunities - Per user savings opportunities</li>
<li>cluster_opportunities - Per cluster savings opportunities</li>
<li>instance_opportunities - Per instance savings opportunities</li>
</ol>
<p>‚úÖ <b>ANALYSIS COMPLETE - ALL TABLES READY FOR QUERYING</b></p>
""")

In [0]:
# Display: User Total Cost Analysis
# Complete results for all users

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>üë§ USER TOTAL COST ANALYSIS</h2><p>Period: {start_date} onwards</p>")

user_results = spark.sql(f"""
SELECT 
  principal_email,
  principal_type,
  primary_workspace,
  workspaces_used,
  total_cost_usd,
  total_dbus,
  days_active,
  unique_clusters,
  avg_cpu_pct,
  avg_mem_pct,
  avg_network_mb,
  total_network_gb,
  avg_cores,
  avg_memory_gb,
  photon_usage_pct,
  avg_autoterm_minutes,
  telemetry_coverage_pct,
  first_usage_date,
  last_usage_date
FROM ex_dash_temp.billing_forecast.user_total_cost
ORDER BY total_cost_usd DESC
""")

user_count = user_results.count()
total_user_cost = user_results.agg({'total_cost_usd': 'sum'}).collect()[0][0] or 0

displayHTML(f"<p><b>Total Users:</b> {user_count}<br><b>Total Cost:</b> ${total_user_cost:,.2f}</p>")

if user_count > 0:
    display(user_results)
else:
    displayHTML("<p>‚ö†Ô∏è No user data found for the selected date range</p>")

In [0]:
# Display: Cluster Total Cost Analysis
# Complete results for all clusters

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>üíª CLUSTER TOTAL COST ANALYSIS</h2><p>Period: {start_date} onwards</p>")

cluster_results = spark.sql(f"""
SELECT 
  cluster_id,
  cluster_name,
  cluster_owner,
  workspace_name,
  primary_instance_type,
  driver_instance_type,
  worker_instance_type,
  worker_count,
  min_workers,
  max_workers,
  total_cost_usd,
  total_dbus,
  days_active,
  avg_cpu_pct,
  avg_mem_pct,
  avg_network_mb,
  total_network_gb,
  cpu_efficiency_pct,
  memory_efficiency_pct,
  core_count,
  memory_gb,
  photon_enabled,
  autoterm_minutes,
  telemetry_coverage_pct,
  first_usage_date,
  last_usage_date
FROM ex_dash_temp.billing_forecast.cluster_total_cost
ORDER BY total_cost_usd DESC
""")

cluster_count = cluster_results.count()
total_cluster_cost = cluster_results.agg({'total_cost_usd': 'sum'}).collect()[0][0] or 0

displayHTML(f"<p><b>Total Clusters:</b> {cluster_count}<br><b>Total Cost:</b> ${total_cluster_cost:,.2f}</p>")

if cluster_count > 0:
    display(cluster_results)
else:
    displayHTML("<p>‚ö†Ô∏è No cluster data found for the selected date range</p>")

In [0]:
# Display: Instance Total Cost Analysis
# Complete results for all instance types

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>üñ•Ô∏è INSTANCE TOTAL COST ANALYSIS</h2><p>Period: {start_date} onwards</p>")

instance_results = spark.sql(f"""
SELECT 
  instance_type,
  total_cost_usd,
  total_dbus,
  unique_clusters,
  unique_users,
  unique_workspaces,
  days_active,
  avg_cpu_pct,
  avg_mem_pct,
  avg_network_mb,
  total_network_gb,
  cpu_efficiency_pct,
  memory_efficiency_pct,
  core_count,
  memory_gb,
  photon_usage_pct,
  avg_autoterm_minutes,
  telemetry_coverage_pct,
  first_usage_date,
  last_usage_date
FROM ex_dash_temp.billing_forecast.instance_total_cost
ORDER BY total_cost_usd DESC
""")

instance_count = instance_results.count()
total_instance_cost = instance_results.agg({'total_cost_usd': 'sum'}).collect()[0][0] or 0

displayHTML(f"<p><b>Total Instance Types:</b> {instance_count}<br><b>Total Cost:</b> ${total_instance_cost:,.2f}</p>")

if instance_count > 0:
    display(instance_results)
else:
    displayHTML("<p>‚ö†Ô∏è No instance data found for the selected date range</p>")

In [0]:
# Display: User Opportunities and Recommendations
# Complete results for all users with opportunities

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>üéØ USER OPPORTUNITIES AND RECOMMENDATIONS</h2><p>Period: {start_date} onwards</p>")

user_opp_results = spark.sql(f"""
SELECT 
  principal_email,
  primary_workspace,
  total_cost_usd,
  days_active,
  avg_cpu_pct,
  avg_mem_pct,
  avg_network_mb,
  total_network_gb,
  opportunity_priority,
  recommendation,
  action_item,
  validated_savings,
  telemetry_coverage_pct
FROM ex_dash_temp.billing_forecast.user_opportunities
ORDER BY validated_savings DESC, total_cost_usd DESC
""")

user_opp_count = user_opp_results.count()
total_user_savings = user_opp_results.agg({'validated_savings': 'sum'}).collect()[0][0] or 0

displayHTML(f"<p><b>Total Users with Opportunities:</b> {user_opp_count}<br><b>Total Potential Savings:</b> ${total_user_savings:,.2f}</p>")

if user_opp_count > 0:
    display(user_opp_results)
else:
    displayHTML("<p>‚ö†Ô∏è No user opportunities found for the selected date range</p>")

In [0]:
# Display: Cluster Opportunities and Recommendations
# Complete results for all clusters with opportunities

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>üéØ CLUSTER OPPORTUNITIES AND RECOMMENDATIONS</h2><p>Period: {start_date} onwards</p>")

cluster_opp_results = spark.sql(f"""
SELECT 
  cluster_id,
  cluster_name,
  cluster_owner,
  workspace_name,
  driver_instance_type,
  worker_instance_type,
  current_worker_config,
  suggested_driver_instance,
  suggested_worker_instance,
  total_cost_usd,
  days_active,
  avg_cpu_pct,
  avg_mem_pct,
  cpu_efficiency_pct,
  memory_efficiency_pct,
  opportunity_priority,
  recommendation,
  action_item,
  validated_savings,
  telemetry_coverage_pct
FROM ex_dash_temp.billing_forecast.cluster_opportunities
ORDER BY validated_savings DESC, total_cost_usd DESC
""")

cluster_opp_count = cluster_opp_results.count()
total_cluster_savings = cluster_opp_results.agg({'validated_savings': 'sum'}).collect()[0][0] or 0

displayHTML(f"<p><b>Total Clusters with Opportunities:</b> {cluster_opp_count}<br><b>Total Potential Savings:</b> ${total_cluster_savings:,.2f}</p>")

if cluster_opp_count > 0:
    display(cluster_opp_results)
else:
    displayHTML("<p>‚ö†Ô∏è No cluster opportunities found for the selected date range</p>")

In [0]:
# Display: Instance Opportunities and Recommendations
# Complete results for all instance types with opportunities

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>üéØ INSTANCE OPPORTUNITIES AND RECOMMENDATIONS</h2><p>Period: {start_date} onwards</p>")

instance_opp_results = spark.sql(f"""
SELECT 
  instance_type,
  total_cost_usd,
  unique_clusters,
  unique_users,
  unique_workspaces,
  days_active,
  avg_cpu_pct,
  avg_mem_pct,
  avg_network_mb,
  total_network_gb,
  cpu_efficiency_pct,
  memory_efficiency_pct,
  opportunity_priority,
  recommendation,
  suggested_action,
  impact_scope,
  validated_savings,
  telemetry_coverage_pct
FROM ex_dash_temp.billing_forecast.instance_opportunities
ORDER BY validated_savings DESC, total_cost_usd DESC
""")

instance_opp_count = instance_opp_results.count()
total_instance_savings = instance_opp_results.agg({'validated_savings': 'sum'}).collect()[0][0] or 0

displayHTML(f"<p><b>Total Instance Types with Opportunities:</b> {instance_opp_count}<br><b>Total Potential Savings:</b> ${total_instance_savings:,.2f}</p>")

if instance_opp_count > 0:
    display(instance_opp_results)
else:
    displayHTML("<p>‚ö†Ô∏è No instance opportunities found for the selected date range</p>")

In [0]:
# Executive Summary - Key Insights and Recommendations

from datetime import datetime, timedelta

# Calculate start_date from days_back widget
days_back = int(dbutils.widgets.get("days_back"))
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')

displayHTML(f"<h2>EXECUTIVE SUMMARY - ALL-PURPOSE CLUSTER COST ANALYSIS</h2><p>üìÖ Analysis Period: {start_date} to current date</p>")

# Get key metrics
key_metrics = spark.sql(f"""
WITH base_metrics AS (
  SELECT 
    ROUND(SUM(total_cost_usd), 2) as total_cost,
    COUNT(DISTINCT usage_date) as days_analyzed,
    COUNT(DISTINCT cluster_id) as total_clusters,
    COUNT(DISTINCT workspace_name) as total_workspaces
  FROM ex_dash_temp.billing_forecast.all_purpose_base
  WHERE usage_date >= '{start_date}'
),
cluster_opp AS (
  SELECT 
    COUNT(*) as clusters_with_opp,
    SUM(CASE WHEN opportunity_priority = 'CRITICAL' THEN 1 ELSE 0 END) as critical_clusters,
    SUM(CASE WHEN opportunity_priority = 'HIGH' THEN 1 ELSE 0 END) as high_clusters,
    ROUND(SUM(validated_savings), 2) as cluster_savings
  FROM ex_dash_temp.billing_forecast.cluster_opportunities
),
instance_opp AS (
  SELECT 
    COUNT(*) as instances_with_opp,
    SUM(CASE WHEN opportunity_priority = 'CRITICAL' THEN 1 ELSE 0 END) as critical_instances,
    SUM(CASE WHEN opportunity_priority = 'HIGH' THEN 1 ELSE 0 END) as high_instances,
    ROUND(SUM(validated_savings), 2) as instance_savings
  FROM ex_dash_temp.billing_forecast.instance_opportunities
),
top_cluster AS (
  SELECT cluster_name, ROUND(total_cost_usd, 2) as cost
  FROM ex_dash_temp.billing_forecast.cluster_total_cost
  ORDER BY total_cost_usd DESC LIMIT 1
),
top_instance AS (
  SELECT instance_type, ROUND(total_cost_usd, 2) as cost, unique_clusters
  FROM ex_dash_temp.billing_forecast.instance_total_cost
  ORDER BY total_cost_usd DESC LIMIT 1
),
avg_util AS (
  SELECT 
    ROUND(AVG(avg_cpu_pct), 0) as avg_cpu,
    ROUND(AVG(avg_mem_pct), 0) as avg_mem,
    ROUND(AVG(telemetry_coverage_pct), 0) as avg_telemetry
  FROM ex_dash_temp.billing_forecast.cluster_total_cost
)
SELECT 
  b.*,
  c.clusters_with_opp,
  c.critical_clusters,
  c.high_clusters,
  c.cluster_savings,
  i.instances_with_opp,
  i.critical_instances,
  i.high_instances,
  i.instance_savings,
  GREATEST(c.cluster_savings, i.instance_savings) as max_savings,
  tc.cluster_name as top_cluster_name,
  tc.cost as top_cluster_cost,
  ti.instance_type as top_instance_type,
  ti.cost as top_instance_cost,
  ti.unique_clusters as top_instance_clusters,
  u.avg_cpu,
  u.avg_mem,
  u.avg_telemetry
FROM base_metrics b, cluster_opp c, instance_opp i, top_cluster tc, top_instance ti, avg_util u
""")

metrics = key_metrics.collect()[0]

# Convert to float
total_cost = float(metrics['total_cost'])
max_savings = float(metrics['max_savings'])
cluster_savings = float(metrics['cluster_savings'])

displayHTML(f"""
<h3>üí∞ FINANCIAL SUMMARY:</h3>
<ul>
<li>Total All-Purpose Cost: ${total_cost:,.2f}</li>
<li>Maximum Potential Savings: ${max_savings:,.2f}</li>
<li>Savings Percentage: {(max_savings/total_cost*100):.1f}%</li>
</ul>

<h3>üìä SCOPE:</h3>
<ul>
<li>Days Analyzed: {metrics['days_analyzed']}</li>
<li>Total Clusters: {metrics['total_clusters']}</li>
<li>Total Workspaces: {metrics['total_workspaces']}</li>
<li>Instance Types: {metrics['instances_with_opp']}</li>
</ul>

<h3>üî¥ CRITICAL PRIORITIES (Immediate Action):</h3>
<ul>
<li>{metrics['critical_clusters']} clusters with severe under-utilization</li>
<li>{metrics['critical_instances']} instance types with &lt;15% CPU efficiency</li>
<li>Potential Savings: ‚àº${cluster_savings * 0.7:,.2f}</li>
</ul>

<h3>üü° HIGH PRIORITIES (Action Within 30 Days):</h3>
<ul>
<li>{metrics['high_clusters']} clusters with low utilization</li>
<li>Potential Savings: ‚àº${cluster_savings * 0.3:,.2f}</li>
</ul>

<h3>üéØ TOP RECOMMENDATIONS:</h3>
<p><b>1. IMMEDIATE ACTIONS (Next 7 Days):</b></p>
<ul>
<li>Review and downsize the top 10 CRITICAL clusters</li>
<li>Focus on top cost drivers: {metrics['top_instance_type']} (${metrics['top_instance_cost']:,.0f})</li>
<li>Implement auto-termination policies (20 minutes max)</li>
</ul>

<p><b>2. SHORT-TERM ACTIONS (Next 30 Days):</b></p>
<ul>
<li>Migrate HIGH priority clusters to compute-optimized instances</li>
<li>Enable Photon on all compatible clusters</li>
<li>Standardize instance sizing across workspaces</li>
</ul>

<p><b>3. GOVERNANCE & MONITORING:</b></p>
<ul>
<li>Implement cluster policies with max instance sizes</li>
<li>Set up cost alerts for high-cost clusters</li>
<li>Monthly cost reviews with cluster owners</li>
</ul>

<h3>üîë KEY INSIGHTS:</h3>
<ul>
<li>Average CPU utilization: {metrics['avg_cpu']:.0f}%</li>
<li>Average memory utilization: {metrics['avg_mem']:.0f}%</li>
<li>Telemetry coverage: {metrics['avg_telemetry']:.0f}%</li>
<li>Most expensive cluster: ${metrics['top_cluster_cost']:,.0f} ({metrics['top_cluster_name']})</li>
<li>Most expensive instance: {metrics['top_instance_type']} (${metrics['top_instance_cost']:,.0f} across {metrics['top_instance_clusters']} clusters)</li>
</ul>

<p>‚úÖ <b>ANALYSIS COMPLETE - CHANGE days_back WIDGET TO ANALYZE DIFFERENT PERIODS</b></p>
""")