## Intro
An [HTTP connection](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-ddl-create-connection) must be created before using the functions in this notebook.

The default connection name is `databricks_api`.

In [0]:
use catalog identifier(:catalog);
use schema identifier(:schema);

## SQL Warehouses

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/getpermissions
create or replace function get_warehouse_permissions(
  warehouse_id string
)
comment 'Gets the permissions of a SQL warehouse. SQL warehouses can inherit permissions from their root object. https://docs.databricks.com/api/workspace/warehouses/getpermissions'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'GET',
    path => concat('2.0/permissions/warehouses/', warehouse_id)
  ).text as resp
|> select
     from_json(
       resp,
       'STRUCT<access_control_list: ARRAY<STRUCT<all_permissions: ARRAY<STRUCT<inherited: BOOLEAN, inherited_from_object: ARRAY<STRING>, permission_level: STRING>>, display_name: STRING, group_name: STRING, service_principal_name: STRING, user_name: STRING>>, object_id: STRING, object_type: STRING>'
     ) as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/setpermissions
create or replace function set_warehouse_permissions(
  warehouse_id string,
  access_control_list string
)
comment 'Sets permissions on an object, replacing existing permissions if they exist. Deletes all direct permissions if none are specified. Objects can inherit permissions from their root object. https://docs.databricks.com/api/workspace/warehouses/setpermissions'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'PUT',
    path => concat('2.0/permissions/warehouses/', warehouse_id),
    json => access_control_list
  ).text as resp
|> select
     from_json(
       resp,
       'STRUCT<access_control_list: ARRAY<STRUCT<all_permissions: ARRAY<STRUCT<inherited: BOOLEAN, inherited_from_object: ARRAY<STRING>, permission_level: STRING>>, display_name: STRING, group_name: STRING, service_principal_name: STRING, user_name: STRING>>, object_id: STRING, object_type: STRING>'
     ) as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/updatepermissions
create or replace function update_warehouse_permissions(
  warehouse_id string,
  access_control_list string
)
comment 'Updates the permissions on a SQL warehouse. SQL warehouses can inherit permissions from their root object. https://docs.databricks.com/api/workspace/warehouses/updatepermissions'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'PATCH',
    path => concat('2.0/permissions/warehouses/', warehouse_id),
    json => access_control_list
  ).text as resp
|> select
     from_json(
       resp,
       'STRUCT<access_control_list: ARRAY<STRUCT<all_permissions: ARRAY<STRUCT<inherited: BOOLEAN, inherited_from_object: ARRAY<STRING>, permission_level: STRING>>, display_name: STRING, group_name: STRING, service_principal_name: STRING, user_name: STRING>>, object_id: STRING, object_type: STRING>'
     ) as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/getpermissionlevels
create or replace function get_warehouse_permission_levels(
  warehouse_id string
)
comment 'Gets the permission levels that a user can have on an object. https://docs.databricks.com/api/workspace/warehouses/getpermissionlevels'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'GET',
    path => concat('2.0/permissions/warehouses/', warehouse_id, '/permissionLevels')
  ).text as resp
|> select
     from_json(
       resp,
       'STRUCT<permission_levels: ARRAY<STRUCT<description: STRING, permission_level: STRING>>>'
     ) as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/list
create or replace function list_warehouses(
  run_as_user_id int default null
)
comment 'Lists all SQL warehouses that a user has manager permissions on. https://docs.databricks.com/api/workspace/warehouses/list'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'GET',
    path => '2.0/sql/warehouses',
    json => 
      to_json(
        named_struct(
          'run_as_user_id', run_as_user_id
        )
      )
  ).text as resp
|> select
     from_json(
       resp,
       'STRUCT<warehouses: ARRAY<STRUCT<auto_stop_mins: STRING, channel: STRUCT<dbsql_version: STRING, name: STRING>, cluster_size: STRING, creator_name: STRING, enable_photon: BOOLEAN, enable_serverless_compute: BOOLEAN, health: STRUCT<details: STRING, failure_reason: STRUCT<code: STRING, parameters: STRUCT<property1: STRING, property2: STRING>, type: STRING>, message: STRING, status: STRING, summary: STRING>, id: STRING, instance_profile_arn: STRING, jdbc_url: STRING, max_num_clusters: BIGINT, min_num_clusters: STRING, name: STRING, num_active_sessions: BIGINT, num_clusters: BIGINT, odbc_params: STRUCT<hostname: STRING, path: STRING, port: BIGINT, protocol: STRING>, spot_instance_policy: STRING, state: STRING, tags: STRUCT<custom_tags: ARRAY<STRUCT<key: STRING, value: STRING>>>, warehouse_type: STRING>>>'
     ) as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/create
create or replace function create_warehouse(
  auto_stop_mins int default null,
  channel_name string default 'CHANNEL_NAME_CURRENT',
  cluster_size string default null,
  creator_name string default null,
  enable_serverless_compute string default null,
  instance_profile_arn string default null,
  max_num_clusters int default null,
  min_num_clusters int default null,
  name string default null,
  spot_instance_policy string default null,
  custom_tags array<struct<key:string,value:string>> default null,
  warehouse_type string default null
)
comment 'Creates a new SQL warehouse. https://docs.databricks.com/api/workspace/warehouses/create'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'POST',
    path => '2.0/sql/warehouses',
    json => 
      to_json(
        named_struct(
          'auto_stop_mins', auto_stop_mins,
          'channel', named_struct(
            'dbsql_version', null,
            'name', channel_name
          ),
          'cluster_size', cluster_size,
          'creator_name', creator_name,
          'enable_photon', true,
          'enable_serverless_compute', enable_serverless_compute,
          'instance_profile_arn', instance_profile_arn,
          'max_num_clusters', max_num_clusters,
          'min_num_clusters', min_num_clusters,
          'name', name,
          'spot_instance_policy', spot_instance_policy,
          'tags', named_struct(
            'custom_tags', custom_tags
          ),
          'warehouse_type', warehouse_type
        )
      )
  ).text as resp
|> select
     from_json(
       resp,
       'STRUCT<id: STRING>'
     ) as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/get
create or replace function get_warehouse_info(
  id string
)
comment 'Gets the information for a single SQL warehouse. https://docs.databricks.com/api/workspace/warehouses/get'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'GET',
    path => concat('2.0/sql/warehouses/', id)
  ).text as resp
|> select
     from_json(
       resp,
       'STRUCT<auto_stop_mins: STRING, channel: STRUCT<dbsql_version: STRING, name: STRING>, cluster_size: STRING, creator_name: STRING, enable_photon: BOOLEAN, enable_serverless_compute: BOOLEAN, health: STRUCT<details: STRING, failure_reason: STRUCT<code: STRING, parameters: STRUCT<property1: STRING, property2: STRING>, type: STRING>, message: STRING, status: STRING, summary: STRING>, id: STRING, instance_profile_arn: STRING, jdbc_url: STRING, max_num_clusters: BIGINT, min_num_clusters: STRING, name: STRING, num_active_sessions: BIGINT, num_clusters: BIGINT, odbc_params: STRUCT<hostname: STRING, path: STRING, port: BIGINT, protocol: STRING>, spot_instance_policy: STRING, state: STRING, tags: STRUCT<custom_tags: ARRAY<STRUCT<key: STRING, value: STRING>>>, warehouse_type: STRING>'
     ) as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/delete
create or replace function delete_warehouse(
  id string
)
comment 'Deletes a SQL warehouse. https://docs.databricks.com/api/workspace/warehouses/delete'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'DELETE',
    path => concat('2.0/sql/warehouses/', id)
  ).text as resp;

In [0]:
-- https://docs.databricks.com/api/workspace/warehouses/edit
create or replace function update_warehouse(
  id string,
  auto_stop_mins int default null,
  channel_name string default 'CHANNEL_NAME_CURRENT',
  cluster_size string default null,
  creator_name string default null,
  enable_serverless_compute string default null,
  instance_profile_arn string default null,
  max_num_clusters int default null,
  min_num_clusters int default null,
  name string default null,
  spot_instance_policy string default null,
  custom_tags array<struct<key:string,value:string>> default null,
  warehouse_type string default null
)
comment 'Updates the configuration for a SQL warehouse. https://docs.databricks.com/api/workspace/warehouses/edit'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'POST',
    path => concat('2.0/sql/warehouses/', id, '/edit'),
    json => 
      to_json(
        named_struct(
          'auto_stop_mins', auto_stop_mins,
          'channel', named_struct(
            'dbsql_version', null,
            'name', channel_name
          ),
          'cluster_size', cluster_size,
          'creator_name', creator_name,
          'enable_photon', true,
          'enable_serverless_compute', enable_serverless_compute,
          'instance_profile_arn', instance_profile_arn,
          'max_num_clusters', max_num_clusters,
          'min_num_clusters', min_num_clusters,
          'name', name,
          'spot_instance_policy', spot_instance_policy,
          'tags', named_struct(
            'custom_tags', custom_tags
          ),
          'warehouse_type', warehouse_type
        )
      )
  ).text as resp;

## Query History

In [0]:
-- A python function is used only so that this solution can be reconciled using
-- the Databricks SDK. Python and SQL functions don't return identical Unix timestamps.
create or replace function unix_timestamp_ms(dt string)
  returns bigint
  deterministic
  comment 'Returns the number of milliseconds since the Unix Epoch'
  language python
  as $$
    from datetime import datetime
    dt_object = datetime.strptime(dt, '%Y-%m-%d %H:%M:%S')
    return int(dt_object.timestamp() * 1000)
  $$;

-- https://docs.databricks.com/api/workspace/queryhistory/list
-- include_metrics type is string because of a bug related to boolean arguments
create or replace function list_queries(
  start_time_ms bigint,
  end_time_ms bigint,
  warehouse_ids array<string>,
  page_token string,
  max_results int default 100,
  include_metrics string default 'true'
)
comment 'List the history of queries through SQL warehouses, and serverless compute. You can filter by user ID, warehouse ID, status, and time range. Most recently started queries are returned first (up to max_results in request). The pagination token returned in response can be used to list subsequent query statuses. https://docs.databricks.com/api/workspace/queryhistory/list'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'GET',
    path => '2.0/sql/history/queries',
    json => 
      to_json(
        named_struct('filter_by',
          named_struct('query_start_time_range',
            named_struct('start_time_ms', start_time_ms, 'end_time_ms', end_time_ms),
            'warehouse_ids', warehouse_ids
          ),
          'max_results', max_results,
          'page_token', page_token,
          'include_metrics', include_metrics
        )
      )
  ).text as resp
|> select
     from_json(
       resp,
       'struct<has_next_page: boolean, next_page_token: string, res: array<struct<channel_used: struct<dbsql_version: string, name: string>, client_application: string, duration: bigint, endpoint_id: string, error_message: string, executed_as_user_id: bigint, executed_as_user_name: string, execution_end_time_ms: bigint, is_final: boolean, lookup_key: string, metrics: struct<compilation_time_ms: bigint, execution_time_ms: bigint, network_sent_bytes: bigint, overloading_queue_start_timestamp: bigint, photon_total_time_ms: bigint, provisioning_queue_start_timestamp: bigint, pruned_bytes: bigint, pruned_files_count: bigint, query_compilation_start_timestamp: bigint, read_bytes: bigint, read_cache_bytes: bigint, read_files_count: bigint, read_partitions_count: bigint, read_remote_bytes: bigint, result_fetch_time_ms: bigint, result_from_cache: boolean, rows_produced_count: bigint, rows_read_count: bigint, spill_to_disk_bytes: bigint, task_total_time_ms: bigint, total_time_ms: bigint, write_remote_bytes: bigint>, plans_state: string, query_end_time_ms: bigint, query_id: string, query_source: struct<alert_id: string, dashboard_id: string, genie_space_id: string, job_info: struct<job_id: string, job_run_id: string, job_task_run_id: string>, legacy_dashboard_id: string, notebook_id: string, sql_query_id: string>, query_start_time_ms: bigint, query_text: string, rows_produced: bigint, spark_ui_url: string, statement_type: string, status: string, user_id: bigint, user_name: string, warehouse_id: string>>>'
     );

## Clustering Columns

In [None]:
-- Function to get cluistering columns metadata with optional filtering
-- Parameters:
--   metadata_catalog_name (mandatory): The catalog name to filter by
--   metadata_schema_name (optional): The schema name to filter by
--   metadata_table_name (optional): The table name to filter by

CREATE OR REPLACE FUNCTION get_clustering_metadata(
    metadata_catalog_name STRING,
    metadata_schema_name STRING DEFAULT NULL,
    metadata_table_name STRING DEFAULT NULL
)
RETURNS TABLE (
    table_catalog STRING,
    table_schema STRING,
    table_name STRING,
    table_type STRING,
    data_source_format STRING,
    clustering_columns ARRAY<ARRAY<STRING>>
)
COMMENT 'Gets table metadata from information_schema with optional filtering by catalog, schema, and table name. Makes HTTP requests to get additional metadata from Databricks API.'
RETURN
SELECT
    table_catalog,
    table_schema,
    table_name,
    table_type,
    get_json_object(resp, '$.data_source_format') AS data_source_format,
    CASE
        WHEN get_json_object(resp, '$.properties.clusteringColumns') IS NOT NULL THEN
            from_json(get_json_object(resp, '$.properties.clusteringColumns'), 'array<array<string>>')
        ELSE NULL
    END AS clustering_columns
FROM (
    SELECT
        table_catalog,
        table_schema,
        table_name,
        table_type,
        http_request(
            conn => 'databricks_api',
            method => 'GET',
            path => concat('2.1/unity-catalog/tables/', table_catalog, '.', table_schema, '.', table_name)
        ).text AS resp
    FROM aa_catalog.information_schema.tables
    WHERE table_catalog = metadata_catalog_name
      AND (metadata_schema_name IS NULL OR table_schema = metadata_schema_name)
      AND (metadata_table_name IS NULL OR table_name = metadata_table_name)
) t;

-- Example Usage
-- select * from aa_catalog.dw_ops.get_clustering_metadata('aa_catalog') as t;
-- select * from aa_catalog.dw_ops.get_clustering_metadata('aa_catalog', 'dw_ops') as t;
-- select * from aa_catalog.dw_ops.get_clustering_metadata('aa_catalog', 'dw_ops', 'enrollments') as t;


## Alerts

In [None]:
-- =============================================================================
-- CREATE ALERT FUNCTION
-- =============================================================================
-- 
-- Purpose: Creates Databricks alerts using SQL queries with flexible configuration
-- API Reference: https://docs.databricks.com/api/workspace/alerts/create
-- 
-- This function allows DBAs to create monitoring alerts for:
-- - Data quality checks (null values, data freshness)
-- - Performance monitoring (slow queries, high resource usage)
-- - Error rate monitoring (application errors, system failures)
-- - Business metrics (record counts, aggregation thresholds)
-- 
-- =============================================================================
-- USAGE EXAMPLES
-- =============================================================================
-- 
-- Basic Alert (Count Monitoring):
-- %sql
-- select aa_catalog.dw_ops.create_alert(
--   display_name => 'high_error_rate_alert',
--   query_text => 'select 25 as error_count',
--   warehouse_id => '4b9b953939869799',
--   comparison_operator => 'GREATER_THAN',
--   threshold_value => 10,
--   user_email => 'akshay.amin@databricks.com',
--   cron_schedule => '0 */5 * * * ?',
--   source_display => 'error_count',
--   source_name => 'error_count',
--   parent_path => '/Workspace/Users/akshay.amin@databricks.com/offerings/dw_ops/dbsql_http/jobs'
-- );
-- 
-- =============================================================================
-- PARAMETER REFERENCE
-- =============================================================================
-- 
-- Required Parameters:
-- - display_name: Unique name for the alert
-- - query_text: SQL query that returns the value to monitor
-- - warehouse_id: SQL warehouse ID to run the query on
-- 
-- Optional Parameters (with defaults):
-- - comparison_operator: GREATER_THAN, LESS_THAN, EQUAL, etc. (default: GREATER_THAN)
-- - threshold_value: Numeric threshold to compare against (default: 0.0)
-- - user_email: Email for notifications (default: null)
-- - cron_schedule: Cron expression for evaluation schedule (default: '0 */15 * * * ?')
-- - source_display/name: Column alias from query_text (default: 'value')
-- - parent_path: Workspace path for alert location (default: '/Workspace/Users/')
-- 
-- =============================================================================

create or replace function create_alert(
  -- Required parameters
  display_name string,
  query_text string, -- Example: 'select count(*) as ct from my_table'
  warehouse_id string,
  
  -- Alert configuration
  comparison_operator string default 'GREATER_THAN', -- Options: GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, EQUAL, NOT_EQUAL
  threshold_value double default 0.0,
  empty_result_state string default 'UNKNOWN',
  
  -- Notification settings
  user_email string default null,
  notify_on_ok boolean default true,
  retrigger_seconds int default 0,
  
  -- Schedule settings
  cron_schedule string default '0 */15 * * * ?',
  timezone_id string default 'UTC',
  pause_status string default 'UNPAUSED',
  
  -- Optional settings
  parent_path string default '/Workspace/Users/',
  source_aggregation string default 'FIRST',
  source_display string default null, -- Example: 'ct' (should match column alias in query_text)
  source_name string default null -- Example: 'ct' (should match column alias in query_text)
)
comment 'Creates a Databricks alert with parameterized configuration. Allows DBAs to create alerts using SQL queries with flexible notification and scheduling options.'
return
select
  http_request(
    conn => 'databricks_api',
    method => 'POST',
    path => '2.0/alerts',
    json => concat(
      '{"display_name":"', display_name, '",',
      '"query_text":"', query_text, '",',
      '"parent_path":"', parent_path, '",',
      '"warehouse_id":"', warehouse_id, '",',
      '"evaluation":{"comparison_operator":"', comparison_operator, '",',
      '"empty_result_state":"', empty_result_state, '",',
      '"notification":{"notify_on_ok":', case when notify_on_ok then 'true' else 'false' end, ',"retrigger_seconds":', cast(retrigger_seconds as string), ',',
      '"subscriptions":[',
      case when user_email is not null then concat('{"user_email":"', user_email, '"}') else '' end,
      ']}',
      ',"source":{"aggregation":"', source_aggregation, '","display":"', coalesce(source_display, 'value'), '","name":"', coalesce(source_name, 'value'), '"}',
      ',"threshold":{"value":{"double_value":', cast(threshold_value as string), '}}}',
      ',"schedule":{"pause_status":"', pause_status, '","quartz_cron_schedule":"', cron_schedule, '","timezone_id":"', timezone_id, '"}}'
    )
  ).text as resp;