In [0]:
dbutils.widgets.text("date_filter_min", "2025-09-27", "Start Date")
dbutils.widgets.text("date_filter_max", "2025-10-27", "End Date")
dbutils.widgets.text("workspace_id", "1444828305810485", "workspace id")

In [0]:
# TODO: parameterize catalog and schema name
catalog_name = "renjiharold_demo"
schema_name = "waf_db"

spark.sql("CREATE CATALOG IF NOT EXISTS renjiharold_demo")
spark.sql("CREATE SCHEMA IF NOT EXISTS renjiharold_demo.waf_db")
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {schema_name}")

In [0]:
%sql
CREATE OR REPLACE TABLE waf_reliability
AS
WITH R_01_01 AS (
SELECT 
    --table_catalog AS cat_name,
    --table_schema AS sch_name,
    data_source_format AS tables_format,
    count(data_source_format) AS no_of_tables,
    'R_01_01' AS waf_id,
    'Detailed' AS dataset_type
    --table_type
    FROM system.information_schema.tables group by all
),
R_01_01_perc AS (
SELECT 
  ROUND((cnt_acid_tables/all_tables)*100,2) AS actual_perc,
  'R-01-01' AS waf_id,
  'Percentage' AS dataset_type
  FROM(
    SELECT
      SUM(CASE WHEN data_source_format IN ('DELTA','ICEBERG') THEN 1 ELSE 0 END) AS cnt_acid_tables,
      count(*) AS all_tables
    FROM system.information_schema.tables 
  )
),
R_01_04 AS ( 
    SELECT
    workspace_id,
    cluster_id,
    cluster_name,
    owned_by,
    auto_termination_minutes,
    'R_01_04' AS waf_id,
    'Detailed' AS dataset_type,
    ROW_NUMBER() OVER(PARTITION BY workspace_id, cluster_id ORDER BY change_time DESC) as rn
  FROM system.compute.clusters
  WHERE (cluster_source="UI" OR cluster_source="API") 
  -- AND array_contains(:workspace_id,workspace_id)
  AND :workspace_id = workspace_id
  QUALIFY rn=1 AND delete_time is null
),
R_01_04_perc AS ( 
    SELECT
    ROUND((cnt_termination_minutes/all_clusters)*100,2) AS actual_perc,
    'R-01-04' AS waf_id,
    'Percentage' AS dataset_type
    FROM 
    (SELECT
        SUM(CASE WHEN auto_termination_minutes IS NOT NULL THEN 1 ELSE 0 END) AS cnt_termination_minutes,
        COUNT(*) AS all_clusters
    FROM R_01_04)
),
R_01_05 AS (
  select workspace_id
  , usage_metadata.endpoint_name endpoint_name
  , billing_origin_product
  , sum(usage_quantity) usage_dbus 
  , 'R_01_05' AS waf_id
  , 'Detailed' AS dataset_type
  from system.billing.usage
  WHERE sku_name LIKE '%SERVERLESS_REAL_TIME_INFERENCE%'
  AND usage_date BETWEEN :date_filter_min AND :date_filter_max
  -- AND array_contains(:workspace_id,workspace_id)
  AND :workspace_id = workspace_id
  AND usage_metadata.endpoint_name is not null
  group by endpoint_name, billing_origin_product,workspace_id
),
--TBD - a DBU threshold?


R_03_01 AS (
  select workspace_id, cluster_id, cluster_name, owned_by, worker_count, create_time  
  , 'R_03_01' AS waf_id
  , 'Detailed' AS dataset_type
  , max_autoscale_workers
    from system.compute.clusters 
  where cluster_source = 'JOB'
  and create_time BETWEEN :date_filter_min AND :date_filter_max
  -- and array_contains(:workspace_id,workspace_id)
  AND :workspace_id = workspace_id
),

R_03_01_perc AS (
 SELECT
    ROUND((cnt_autoscale_minutes/all_clusters)*100,2) AS actual_perc,
    'R-03-01' AS waf_id,
    'Percentage' AS dataset_type
    FROM 
    (SELECT
        SUM(CASE WHEN max_autoscale_workers IS NOT NULL THEN 1 ELSE 0 END) AS cnt_autoscale_minutes,
        COUNT(*) AS all_clusters
    FROM R_03_01) 

),
R_03_02 AS (
  SELECT
    workspace_id,
    warehouse_id,
    warehouse_name,
    warehouse_type,
    warehouse_size,
    min_clusters,
    max_clusters,
    auto_stop_minutes,
    'R_03_02' AS waf_id,
    'Detailed' AS dataset_type
FROM
    system.compute.warehouses
    -- WHERE array_contains(:workspace_id,workspace_id)
    WHERE :workspace_id = workspace_id
QUALIFY
    ROW_NUMBER() OVER (PARTITION BY warehouse_id ORDER BY change_time DESC) = 1
    and delete_time is null
),
R_03_02_perc AS (
  SELECT
    ROUND((cnt_autoscale_minutes/all_clusters)*100,2) AS actual_perc,
    'R-03-02' AS waf_id,
    'Percentage' AS dataset_type
    FROM 
    (SELECT
        SUM(CASE WHEN IFNULL(min_clusters,0) <> IFNULL(max_clusters,0) THEN 1 ELSE 0 END) AS cnt_autoscale_minutes,
        COUNT(*) AS all_clusters
    FROM R_03_02) 

),
waf_status AS (
SELECT
  waf.waf_id,
  principle,
  best_practice,
  COALESCE(r11p.actual_perc,r14p.actual_perc,r31p.actual_perc,r32p.actual_perc,0) AS actual_perc,
  required_percentage,
  CASE 
    WHEN waf.waf_id = 'R-01-05' AND EXISTS (
      SELECT 1 FROM R_01_05 LIMIT 1
    ) THEN 'Yes'
--    WHEN waf_id = 'R-01-06' AND EXISTS (
--      SELECT 1 FROM system.billing.usage WHERE sku_name LIKE '%SERVERLESS%' OR sku_name LIKE '%DLT%' LIMIT 1
--    ) THEN 'Yes'
    WHEN waf.waf_id <> 'R-01-05' AND required_percentage <= COALESCE(r11p.actual_perc,r14p.actual_perc,r31p.actual_perc,r32p.actual_perc,0) THEN 'Yes'
    ELSE 'No'
  END AS implemented
FROM (
  SELECT * FROM VALUES
    ('R-01-01', 'Design for failure', 'Use a data format that supports ACID transactions',30),
    --('R-01-02', 'Design for failure', 'Use a resilient distributed data engine for all workloads'),
    --('R-01-03', 'Design for failure', 'Automatically rescue invalid or nonconforming data '),
    ('R-01-04', 'Design for failure', 'Configure jobs for automatic retries and termination',80),
    ('R-01-05', 'Design for failure', 'Use a scalable and production-grade model serving infrastructure',0),
    --('R-01-06', 'Design for failure', 'Use managed services for your workloads'),
    --('R-02-03', 'Manage data quality', 'Actively manage schemas'),
    --('R-02-04', 'Manage data quality', 'Use constraints and data expectations'),
    ('R-03-01', 'Design for autoscaling', 'Enable autoscaling for ETL workloads',30),
    ('R-03-02', 'Design for autoscaling', 'Use autoscaling for SQL Warehouses',30)
    --('R-02-09', 'Design workloads for performance', 'Use compaction'),
) waf(waf_id, principle, best_practice,required_percentage)
LEFT JOIN 
r_01_01_perc r11p
ON waf.waf_id = r11p.waf_id
LEFT JOIN 
r_03_02_perc r32p
ON waf.waf_id = r32p.waf_id
LEFT JOIN
r_01_04_perc r14p
ON waf.waf_id = r14p.waf_id
LEFT JOIN
r_03_01_perc r31p
ON waf.waf_id = r31p.waf_id
)
SELECT
  waf_id,
  principle,
  best_practice,
  implemented,
  actual_perc,
  required_percentage,
  COUNT(*) OVER (PARTITION BY principle) AS total_controls,
  ROW_NUMBER() OVER (PARTITION BY principle ORDER BY waf_id) AS row_num,
  SUM(CASE WHEN implemented = 'Yes' THEN 1 ELSE 0 END) OVER (PARTITION BY principle) AS implemented_controls,
  ROUND(100 * SUM(CASE WHEN implemented = 'Yes' THEN 1 ELSE 0 END) OVER (PARTITION BY principle)/ COUNT(*) OVER (PARTITION BY principle) , 0) AS completion_percent,
  ROUND(100 * SUM(CASE WHEN implemented = 'Yes' THEN 1 ELSE 0 END) OVER () / COUNT(*) OVER (), 0) AS total_percentage,
  'Summary' AS dataset_type,
  '' AS tables_format,
   0 AS no_of_tables,
  '' AS workspace_id,
  '' AS cluster_id,
  '' AS cluster_name,
  '' AS owned_by,
  '' AS endpoint_name,
  '' AS billing_origin_product,
   0 AS usage_dbus,
   0 AS worker_count, 
   NULL AS create_time,
   '' AS warehouse_id,
   '' AS warehouse_name,
   '' AS warehouse_type,
   '' AS warehouse_size,
   0 AS min_clusters,
   0 AS max_clusters,
   0 AS auto_stop_minutes
FROM waf_status

UNION ALL
SELECT
  waf_id,
  '' AS principle,
  '' AS best_practice,
  '' AS implemented,
  0 AS actual_perc,
  0 AS required_percentage,
  0 AS total_controls,
  0 AS row_num,
  0 AS implemented_controls,
  0 AS completion_percent,
  0 AS total_percentage,
  dataset_type,
  tables_format,
  no_of_tables,
  '' AS workspace_id,
  '' AS cluster_id,
  '' AS cluster_name,
  '' AS owned_by,
  '' AS endpoint_name,
  '' AS billing_origin_product,
   0 AS usage_dbus,
   0 AS worker_count, 
   NULL AS create_time,
   '' AS warehouse_id,
   '' AS warehouse_name,
   '' AS warehouse_type,
   '' AS warehouse_size,
   0 AS min_clusters,
   0 AS max_clusters,
   0 AS auto_stop_minutes
FROM R_01_01

UNION ALL
SELECT
  waf_id,
  '' AS principle,
  '' AS best_practice,
  '' AS implemented,
  0 AS actual_perc,
  0 AS required_percentage,
  0 AS total_controls,
  0 AS row_num,
  0 AS implemented_controls,
  0 AS completion_percent,
  0 AS total_percentage,
  dataset_type,
  '' AS tables_format,
  0 AS no_of_tables,
  workspace_id,
  cluster_id,
  cluster_name,
  owned_by,
  '' AS endpoint_name,
  '' AS billing_origin_product,
   0 AS usage_dbus,
   0 AS worker_count, 
   NULL AS create_time,
   '' AS warehouse_id,
   '' AS warehouse_name,
   '' AS warehouse_type,
   '' AS warehouse_size,
   0 AS min_clusters,
   0 AS max_clusters,
   0 AS auto_stop_minutes
FROM R_01_04
WHERE auto_termination_minutes IS NULL
UNION ALL
SELECT
  waf_id,
  '' AS principle,
  '' AS best_practice,
  '' AS implemented,
  0 AS actual_perc,
  0 AS required_percentage,
  0 AS total_controls,
  0 AS row_num,
  0 AS implemented_controls,
  0 AS completion_percent,
  0 AS total_percentage,
  dataset_type,
  '' AS tables_format,
  0 AS no_of_tables,
  workspace_id,
  '' AS cluster_id,
  '' AS cluster_name,
  '' AS owned_by,
   endpoint_name,
   billing_origin_product,
   usage_dbus,
   0 AS worker_count, 
   NULL AS create_time,
   '' AS warehouse_id,
   '' AS warehouse_name,
   '' AS warehouse_type,
   '' AS warehouse_size,
   0 AS min_clusters,
   0 AS max_clusters,
   0 AS auto_stop_minutes
FROM R_01_05
UNION ALL
SELECT
  waf_id,
  '' AS principle,
  '' AS best_practice,
  '' AS implemented,
  0 AS actual_perc,
  0 AS required_percentage,
  0 AS total_controls,
  0 AS row_num,
  0 AS implemented_controls,
  0 AS completion_percent,
  0 AS total_percentage,
  dataset_type,
  '' AS tables_format,
  0 AS no_of_tables,
  workspace_id,
  cluster_id,
  cluster_name,
  owned_by,
  '' AS endpoint_name,
  '' AS billing_origin_product,
   0 AS usage_dbus,
   worker_count, 
   create_time,
   '' AS warehouse_id,
   '' AS warehouse_name,
   '' AS warehouse_type,
   '' AS warehouse_size,
   0 AS min_clusters,
   0 AS max_clusters,
   0 AS auto_stop_minutes
FROM R_03_01
WHERE max_autoscale_workers IS NULL

UNION ALL
SELECT
  waf_id,
  '' AS principle,
  '' AS best_practice,
  '' AS implemented,
  0 AS actual_perc,
  0 AS required_percentage,
  0 AS total_controls,
  0 AS row_num,
  0 AS implemented_controls,
  0 AS completion_percent,
  0 AS total_percentage,
  dataset_type,
  '' AS tables_format,
  0 AS no_of_tables,
  workspace_id,
  '' AS cluster_id,
  '' AS cluster_name,
  '' AS owned_by,
  '' AS endpoint_name,
  '' AS billing_origin_product,
   0 AS usage_dbus,
   0 AS worker_count, 
   NULL AS create_time,
   warehouse_id,
   warehouse_name,
   warehouse_type,
   warehouse_size,
   min_clusters,
   max_clusters,
   auto_stop_minutes
FROM R_03_02
WHERE IFNULL(min_clusters,0) = IFNULL(max_clusters,0)
ORDER BY waf_id

In [0]:
%sql
select count(*) from waf_reliability;

In [0]:
%sql
SELECT
  current_timestamp(),
  total_percentage,
  'reliability',
  CASE
    WHEN prev_total IS NULL OR prev_total = 0 THEN 0
    ELSE ROUND((total_percentage - prev_total) / prev_total * 100, 2)
  END AS percent_change
FROM (
  SELECT
    total_percentage,
    (SELECT 
      total_percent
     FROM waf_log
     WHERE pillar = 'reliability'
     ORDER BY run_id DESC
     LIMIT 1) AS prev_total
  FROM waf_reliability
  WHERE dataset_type = 'Summary'
  LIMIT 1)

In [0]:
%sql
INSERT INTO waf_log (run_date, current_percent, prev_percent, percent_change, pillar)
SELECT
  current_timestamp(),
  total_percentage,
  prev_percent,
  CASE
    WHEN prev_percent IS NULL OR prev_percent = 0 THEN 0
    ELSE ROUND((total_percentage - prev_percent) / prev_percent * 100, 2)
  END AS percent_change,
  'reliability'
FROM (
  SELECT
    total_percentage,
    (SELECT 
      current_percent
     FROM waf_log
     WHERE pillar = 'reliability'
     ORDER BY run_id DESC
     LIMIT 1) AS prev_percent
  FROM waf_reliability
  WHERE dataset_type = 'Summary'
  LIMIT 1
);
