# Genie Space Observability E2E Notebook

### Introduction
This notebook covers how to leverage Genie REST APIs and Databricks System Tables to create an e2e Genie Observability workflow, complete with a Genie Observability Dashboard and a Meta-Genie space to ask questions about your Genie spaces!

Running this notebook will create **Two Delta Tables**; _genie_observability_main_table_ and _genie_cost_analysis_main_table_.

- **genie_observability_main_table:** This table consists of ALL the messages sen to your genie spaces along with Genie natural langauge response and the SQL code generated as part of it. There are additional columns that provide the conversation_id, user_feedback,user_email, the statement execution ID linked to the SQL code and much more.
- **genie_cost_analysis_main_table:** This table consist of ALL information required to attribute costs to a genie space through Genie space ID. Additional columns include statement_id, user_email, start_time, cost per statement ID and much more. 

**Notes:**
- The entire solution is vibe coded (with human in the loop!), so please verify code before running in prod.
- The cost per query attribution logic is borrowed from DBSQL SME blog and dashboard. Link here: https://github.com/CodyAustinDavis/dbsql_sme/tree/main/Observability%20Dashboards%20and%20DBA%20Resources/Observability%20Lakeview%20Dashboard%20Templates/DBSQL%20Warehouse%20Advisor%20With%20Data%20Model 

**Usage Guidance:**
- You can use this notebook as part of the wider Lakehouse Adoption Dashboard and pipeline that can be deployed as part of Databricks Asset Bundles. Equally, you can download this notebook and Dashboard JSON and incorporate it into your own workflow.
- We recommend running this notebook in a test environment first and tailor the code as per your preferance.

**Pre-requisites:**
- You should be a Genie Space Author of atleast one Genie space (CAN MANAGE PERMISSION)
- You should have enough permissions to create tables in a schema within a catalog of your choice
- SELECT permission on system catalog and usage and billing schema




In [0]:
# Upgrade to the latest version of databricks_sdk package
%pip install databricks_sdk --upgrade

In [0]:
# Restart Python to ensure all libraries are reloaded
dbutils.library.restartPython()

In [0]:
# Create a text widget for catalog_name with default value 'users'
dbutils.widgets.text("catalog_name", "users")
# Create a text widget for schema_name with no default value
dbutils.widgets.text("schema_name", "")

# Retrieve the value of catalog_name from the widget
catalog_name = dbutils.widgets.get("catalog_name")
# Retrieve the value of schema_name from the widget
schema_name = dbutils.widgets.get("schema_name")
# Ensure both catalog_name and schema_name are provided
assert catalog_name and schema_name, "catalog_name and schema_name must be provided"

In [0]:
from databricks.sdk import WorkspaceClient
import pandas as pd
import json
from typing import Dict, Any, List
from datetime import datetime

# Initialize the Databricks workspace client
w = WorkspaceClient()

# Get current user information using the workspace client
current_user = w.current_user.me()
user_name = current_user.user_name

# Print the current user's username
print(f"Current user: {user_name}")

# Print the Databricks workspace host URL
print(f"Workspace: {w.config.host}")

Below we list all Genie spaces in the current workspace

In [0]:
# Initialize an empty list to store Genie space metadata
spaces = []
page_token = None

from databricks.sdk import WorkspaceClient
from datetime import datetime
import pandas as pd

# Initialize the Databricks Workspace client
w = WorkspaceClient()

# Paginate through all Genie spaces using the SDK
while True:
    response = w.genie.list_spaces(page_token=page_token)
    for s in response.spaces:
        # Append relevant space details to the list
        spaces.append({
            "space_id": getattr(s, "space_id", None),
            "name": getattr(s, "title", None),
            "description": getattr(s, "description", None),
            "warehouse_id": getattr(s, "warehouse_id", None)
        })
    # Break if there are no more pages
    if not response.next_page_token or response.next_page_token == "":
        break
    page_token = response.next_page_token

# Convert the list of spaces to a Pandas DataFrame
genie_spaces_pdf = pd.DataFrame(spaces)

In [0]:
# Display the DataFrame containing all Genie spaces metadata
display(genie_spaces_pdf)
print("---------------")
print(f"Total Genie spaces: {len(genie_spaces_pdf)}")

This code cell below is where we define the single most important function that allows us to generate the genie_observability_main_table table. You can edit this function as per your requirements or even reverse engineer this to generate newer insights!

#### Databricks Python SDK implementation
For implementation with python requests library see appendix

In [0]:
from typing import Dict, Any, List, Optional
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType, IntegerType
from pyspark.sql.functions import col, from_unixtime

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.iam import User

# Cache for user email lookups to prevent redundant API calls
USER_CACHE: Dict[str, str] = {}


def get_genie_observability_table(space_id: str) -> DataFrame:
    """
    Fetches and constructs a Spark DataFrame containing observability data 
    for all messages in a Genie space using the Databricks SDK.
    
    Args:
        space_id: The ID of the Genie space to fetch data from.
        
    Returns:
        A Spark DataFrame with message-level observability data.
    """
    w = WorkspaceClient()
    
    # Get space details
    space = w.genie.get_space(space_id=space_id)
    space_name = space.title or f"Space_{space_id}"
    
    records = []
    
    # List all conversations with pagination
    conversations = _list_all_conversations(w, space_id)
    
    for conv in conversations:
        # List all messages in the conversation with pagination
        messages = _list_all_messages(w, space_id, conv.conversation_id)
        
        for msg in messages:
            record = _extract_message_data(msg, space_id, space_name, w)
            records.append(record)
    
    # Create Spark DataFrame
    spark = SparkSession.builder.getOrCreate()
    schema = _get_schema()
    
    if not records:
        return spark.createDataFrame([], schema)
    
    # Convert dicts to tuples in schema field order
    ordered_data = [
        tuple(r.get(field.name) for field in schema.fields)
        for r in records
    ]
        
    df = spark.createDataFrame(ordered_data, schema=schema)
    
    # Add human-readable datetime columns
    df = (df
        .withColumn("created_datetime", from_unixtime(col("created_timestamp") / 1000).cast(TimestampType()))
        .withColumn("last_updated_datetime", from_unixtime(col("last_updated_timestamp") / 1000).cast(TimestampType()))
        .orderBy("created_timestamp")
    )
    
    return df


def _list_all_conversations(w: WorkspaceClient, space_id: str) -> List:
    """Fetches all conversations in a space, handling pagination."""
    conversations = []
    page_token = None
    
    while True:
        response = w.genie.list_conversations(
            space_id=space_id,
            include_all=True,
            page_token=page_token
        )
        conversations.extend(response.conversations or [])
        
        page_token = response.next_page_token
        if not page_token:
            break
    
    return conversations


def _list_all_messages(w: WorkspaceClient, space_id: str, conversation_id: str) -> List:
    """Fetches all messages in a conversation, handling pagination."""
    messages = []
    page_token = None
    
    while True:
        response = w.genie.list_conversation_messages(
            space_id=space_id,
            conversation_id=conversation_id,
            page_token=page_token
        )
        messages.extend(response.messages or [])
        
        page_token = response.next_page_token
        if not page_token:
            break
    
    return messages

def _extract_message_data(message, space_id: str, space_name: str, w: WorkspaceClient) -> Dict[str, Any]:
    """Extracts relevant fields from a GenieMessage into a flat dictionary."""
    msg_dict = message.as_dict()
    
    # Extract IDs (API has both 'id' and 'message_id' for legacy compatibility)
    msg_id = msg_dict.get('message_id') or msg_dict.get('id')
    user_id = msg_dict.get('user_id')
    user_email = _resolve_user_email(str(user_id) if user_id else None, w)

    record = {
        'space_id': space_id,
        'space_name': space_name,
        'message_id': msg_id,
        'conversation_id': msg_dict.get('conversation_id'),
        'user_id': str(user_id) if user_id else None,
        'user_email': user_email,
        'status': str(msg_dict.get('status')) if msg_dict.get('status') else None,
        'created_timestamp': msg_dict.get('created_timestamp'),
        'last_updated_timestamp': msg_dict.get('last_updated_timestamp'),
        'user_question': msg_dict.get('content'),
    }
    
    # Process attachments
    ai_responses, sql_queries, statement_ids, suggested_qs = [], [], [], []
    attachments = msg_dict.get('attachments') or []
    
    for att in attachments:
        # Text attachment
        if text_obj := att.get('text'):
            ai_responses.append(text_obj.get('content', ''))
        
        # Query attachment
        if query_obj := att.get('query'):
            sql_queries.append(query_obj.get('query', ''))
            if stmt_id := query_obj.get('statement_id'):
                statement_ids.append(str(stmt_id))
        
        # Suggested questions attachment
        if sq_obj := att.get('suggested_questions'):
            suggested_qs.extend(sq_obj.get('questions', []))

    def join_non_empty(items: List, sep: str = ' | ') -> Optional[str]:
        filtered = list(filter(None, items))
        return sep.join(filtered) if filtered else None

    record.update({
        'ai_response': join_non_empty(ai_responses),
        'sql_query': join_non_empty(sql_queries),
        'statement_id': join_non_empty(statement_ids),
        'suggested_questions': join_non_empty(suggested_qs, ', '),
        'num_attachments': len(attachments),
    })
    
    # Feedback rating
    feedback = msg_dict.get('feedback') or {}
    record['feedback_rating'] = str(feedback.get('rating')) if feedback.get('rating') else 'NONE'
    
    # Error info
    error = msg_dict.get('error')
    if error and isinstance(error, dict):
        record['error_type'] = error.get('type')
        record['error_message'] = error.get('message') or error.get('error')
    elif error:
        record['error_type'] = 'Unknown'
        record['error_message'] = str(error)
    else:
        record['error_type'] = None
        record['error_message'] = None
    
    return record

def _resolve_user_email(user_id: Optional[str], w: WorkspaceClient) -> Optional[str]:
    """Resolves a user ID to an email address using the SCIM Users API."""
    if not user_id or user_id in ("None", "0"):
        return None
    
    if user_id in USER_CACHE:
        return USER_CACHE[user_id]
    
    try:
        user: User = w.users.get(user_id)
        email = user.user_name
        USER_CACHE[user_id] = email
        return email
    except Exception:
        # Return a placeholder if user lookup fails
        return f"ID_{user_id}"

def _get_schema() -> StructType:
    return StructType([
        StructField("space_id", StringType(), True),
        StructField("space_name", StringType(), True),
        StructField("message_id", StringType(), True),
        StructField("conversation_id", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("user_email", StringType(), True),
        StructField("status", StringType(), True),
        StructField("created_timestamp", LongType(), True),
        StructField("last_updated_timestamp", LongType(), True),
        StructField("user_question", StringType(), True),
        StructField("ai_response", StringType(), True),
        StructField("sql_query", StringType(), True),
        StructField("statement_id", StringType(), True),
        StructField("suggested_questions", StringType(), True),
        StructField("num_attachments", IntegerType(), True),
        StructField("feedback_rating", StringType(), True),
        StructField("error_type", StringType(), True),
        StructField("error_message", StringType(), True),
    ])


You can test the function by running the function on a Genie space

In [0]:
# --- Usage Example ---

# Define your Space ID
space_id = "XXX" # Replace with your actual Space ID

# Run the function
df = get_genie_observability_table(space_id)

# Display results
display(df)

Below is a simple scripts that runs this function on ALL your genie spaces (currently limited to 10). 

Note: You may not have permssion to view messages of certain Genie spaces as you may not have access to the underlying tables. In this case you will see a lot of NULLs

In [0]:
from databricks.sdk import WorkspaceClient

# Initialize the Databricks workspace client
w = WorkspaceClient()

# Fetch all Genie spaces
print("üîç Fetching all Genie spaces...")
spaces = []
page_token = None

while True:
    response = w.genie.list_spaces(page_token=page_token)
    for s in response.spaces:
        spaces.append({
            "space_id": getattr(s, "space_id", None),
            "name": getattr(s, "title", None),
            "description": getattr(s, "description", None),
            "warehouse_id": getattr(s, "warehouse_id", None)
        })
    if not response.next_page_token or response.next_page_token == "":
        break
    page_token = response.next_page_token

print(f"‚úì Found {len(spaces)} Genie spaces")

# Limit to 10 spaces
MAX_SPACES = 10
if len(spaces) > MAX_SPACES:
    print(f"‚ö† Limiting processing to first {MAX_SPACES} spaces\n")
    spaces = spaces[:MAX_SPACES]
else:
    print()

# Collect observability data from all spaces
all_dfs = []

for i, space in enumerate(spaces, 1):
    space_id = space['space_id']
    space_name = space['name']
    
    print(f"\n{'='*80}")
    print(f"[{i}/{len(spaces)}] Processing Space: {space_name}")
    print(f"Space ID: {space_id}")
    print(f"{'='*80}")
    
    try:
        # Get observability data for this space
        df_space = get_genie_observability_table(space_id)
        
        if df_space.count() > 0:
            all_dfs.append(df_space)
            print(f"\n‚úì Successfully extracted {df_space.count()} messages from {space_name}")
        else:
            print(f"\n‚ö† No messages found in {space_name}")
            
    except Exception as e:
        print(f"\n‚ùå Error processing space {space_name}: {str(e)}")
        continue

# Combine all DataFrames
if all_dfs:
    print(f"\n\n{'='*80}")
    print("üìä COMBINING ALL RESULTS")
    print(f"{'='*80}")
    
    # Union all DataFrames
    df = all_dfs[0]
    for df_next in all_dfs[1:]:
        df = df.union(df_next)
    
    total_messages = df.count()
    total_spaces = df.select("space_id").distinct().count()
    
    print(f"\n‚úÖ SUCCESS!")
    print(f"   Total Spaces Processed: {total_spaces}")
    print(f"   Total Messages Extracted: {total_messages}")
    print(f"\n{'='*80}\n")
    
    display(df)
else:
    print("\n‚ö† No data found across any Genie spaces")
    df = None

Save the table as a delta table and add a nice detailed table description!

In [0]:
# Write the observability data to a Delta table
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog_name}.{schema_name}.genie_observability_main_table")

# Add a detailed table description
spark.sql(f"""
  COMMENT ON TABLE {catalog_name}.{schema_name}.genie_observability_main_table IS 
  'Comprehensive observability table for Genie AI/BI spaces. Contains detailed message-level data including user questions, AI responses, generated SQL queries, execution metadata, user feedback ratings, and error information. Used for monitoring Genie usage, analyzing query patterns, and tracking user engagement across all accessible Genie spaces.'
""")

We successfully created the **genie_observability_main_table** table! Now lets proceed to create the **genie_cost_analysis_main_table** table

**An Aside: The Strategy: "Hourly Time-Weighted" Attribution**
The below SQL code is inspired from - https://github.com/databrickslabs/sandbox/blob/main/dbsql/cost_per_query/PrPr/DBSQL%20Cost%20Per%20Query%20MV%20(PrPr).sql 
We treat the cost of a warehouse hour as a "pie" and slice it up based on how much work each query contributed during that specific hour.

Step 1: Calculate the Hourly Bill (Per Warehouse) First, we look at the system billing logs (system.billing.usage) to calculate the exact price tag (in DBUs and Dollars) for every hour a warehouse was active. Unlike the basic approach which averages costs over days, this locks in the specific cost for that specific hour.

Step 2: Measure "Work" (Per Query, Per Hour) We look at the query history (system.query.history) to measure duration.

Crucially, if a long-running query spans multiple hours (e.g., 1:50 PM to 2:10 PM), we split it into two chunks: 10 minutes in the 1 PM bucket and 10 minutes in the 2 PM bucket.

Step 3: Calculate the Ratio (The Hourly Slice) We compare the query's duration in that specific hour to the total duration of all queries running on the warehouse in that same hour.

Logic: "In the 1 PM hour, this query ran for 5 minutes. The warehouse processed queries for a total of 50 minutes. Therefore, this query pays for 10% of the 1 PM bill."

Step 4: Assign the Cost & Absorb Idle Time We multiply that percentage (10%) by the total hourly bill found in Step 1.

Note: This method automatically accounts for "idle time." If a warehouse costs $10/hr and only runs your single 5-minute query, your query is 100% of the work, so it absorbs the full $10 cost (including the 55 minutes the warehouse sat idle waiting for you).

Step 5: Filter for Genie Finally, we aggregate the hourly slices back together and filter for genie_space_id, giving you the precise total cost for every interaction your users had with your Genie Space.

In [0]:
# %sql
# -- Use this query to attribute compute costs to a single Genie space with HOURLY precision.
# -- This handles idle time and spot-price fluctuations correctly.

# WITH 
# --------------------------------------------------------------------------------
# -- STEP 1: HOURLY WAREHOUSE BILLING
# -- Instead of summing the whole month, we calculate the cost for every specific Hour.
# --------------------------------------------------------------------------------
# hourly_warehouse_bill AS (
#   SELECT
#     u.usage_metadata.warehouse_id,
#     date_trunc('HOUR', u.usage_start_time) AS hour_bucket,
#     SUM(u.usage_quantity) AS total_hourly_dbus,
#     SUM(u.usage_quantity * p.pricing.default) AS total_hourly_dollars
#   FROM system.billing.usage u
#   JOIN system.billing.list_prices p 
#     ON u.sku_name = p.sku_name
#     AND u.usage_start_time >= p.price_start_time
#     AND (u.usage_start_time < p.price_end_time OR p.price_end_time IS NULL)
#   WHERE u.usage_start_time >= :start_date::timestamp
#     AND u.usage_end_time <= :end_date::timestamp
#     AND u.usage_unit = 'DBU'
#     AND u.sku_name ILIKE '%SQL%'
#   GROUP BY 1, 2
# ),

# --------------------------------------------------------------------------------
# -- STEP 2: SPLIT QUERIES BY HOUR
# -- If a query runs from 1:55 to 2:05, we split it into two rows so we can match
# -- the 1:00 PM cost and the 2:00 PM cost separately.
# --------------------------------------------------------------------------------
# query_work_split AS (
#   SELECT
#     statement_id,
#     compute.warehouse_id,
#     query_source.genie_space_id,
#     executed_by,
#     statement_text,
#     start_time,
    
#     -- "Explode" logic creates a row for every hour the query touched
#     timestampadd(HOUR, h, date_trunc('HOUR', start_time)) AS hour_bucket,
    
#     -- Calculate precise seconds worked IN THIS SPECIFIC HOUR
#     (LEAST(UNIX_TIMESTAMP(end_time), UNIX_TIMESTAMP(timestampadd(HOUR, h + 1, date_trunc('HOUR', start_time)))) - 
#      GREATEST(UNIX_TIMESTAMP(start_time), UNIX_TIMESTAMP(timestampadd(HOUR, h, date_trunc('HOUR', start_time))))) 
#      AS seconds_worked_in_hour
     
#   FROM system.query.history
#   -- This sequence generator handles the splitting
#   JOIN lateral explode(sequence(0, floor((UNIX_TIMESTAMP(end_time) - UNIX_TIMESTAMP(date_trunc('HOUR', start_time))) / 3600))) t(h)
#   WHERE start_time >= :start_date::timestamp
#     AND start_time <= :end_date::timestamp
#     AND compute.warehouse_id IS NOT NULL
#     AND total_task_duration_ms > 0
# ),

# --------------------------------------------------------------------------------
# -- STEP 3: HOURLY DENOMINATOR
# -- Calculate the total work done by ALL queries on the warehouse for each hour.
# --------------------------------------------------------------------------------
# warehouse_hourly_activity AS (
#   SELECT 
#     warehouse_id,
#     hour_bucket,
#     SUM(seconds_worked_in_hour) AS total_seconds_worked_by_all
#   FROM query_work_split
#   GROUP BY 1, 2
# ),

# --------------------------------------------------------------------------------
# -- STEP 4: COST ATTRIBUTION (The Slice)
# -- Join the Query Slice + The Hourly Pie Size + The Hourly Bill
# --------------------------------------------------------------------------------
# allocated_metrics AS (
#   SELECT
#     q.statement_id,
#     q.executed_by,
#     q.statement_text,
#     q.genie_space_id,
#     q.start_time,
#     q.warehouse_id,
#     q.seconds_worked_in_hour,
    
#     -- Logic: (My Time / Total Time) * Total Bill
#     (q.seconds_worked_in_hour / w.total_seconds_worked_by_all) AS work_proportion,
#     b.total_hourly_dbus,
#     b.total_hourly_dollars,
    
#     -- Calculate allocated cost for this specific hour slice
#     ((q.seconds_worked_in_hour / w.total_seconds_worked_by_all) * b.total_hourly_dbus) AS allocated_dbus_slice,
#     ((q.seconds_worked_in_hour / w.total_seconds_worked_by_all) * b.total_hourly_dollars) AS allocated_dollars_slice

#   FROM query_work_split q
#   -- Join to get the total activity in this hour
#   JOIN warehouse_hourly_activity w 
#     ON q.warehouse_id = w.warehouse_id AND q.hour_bucket = w.hour_bucket
#   -- Join to get the bill for this hour
#   LEFT JOIN hourly_warehouse_bill b     
#     ON q.warehouse_id = b.warehouse_id AND q.hour_bucket = b.hour_bucket
# )

# --------------------------------------------------------------------------------
# -- STEP 5: FINAL AGGREGATION
# -- Sum the hourly slices back up to get the total cost per query.
# --------------------------------------------------------------------------------
# SELECT 
#   statement_id,
#   executed_by AS user_email,
#   start_time,
#   genie_space_id,
#   warehouse_id,
#   -- Re-sum the duration from the slices for display
#   ROUND(SUM(seconds_worked_in_hour), 2) AS accurate_duration_seconds,
#   ROUND(SUM(allocated_dbus_slice), 4) AS dbus_consumed,
#   ROUND(SUM(allocated_dollars_slice), 4) AS cost_usd,
#   statement_text AS sql_code
# FROM allocated_metrics
# WHERE genie_space_id = :genie_space_id -- Filter for specific space
# GROUP BY 1, 2, 3, 4, 5, 9
# ORDER BY start_time DESC;

In the code below we use system tables and the ability to link query execution source to Genie Space ID to calculate the costs that arise from a genie space

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.genie_cost_analysis_main_table
COMMENT 'Granular cost attribution for Databricks Genie spaces, accounting for idle time and hourly fluctuations.'
AS
WITH 
-- 1. DEFINE BOUNDARIES: Auto-detect the time range based on available billing/query history
table_boundaries AS (
  SELECT 
    date_trunc('HOUR', LEAST(
      (SELECT MAX(event_time) FROM system.compute.warehouse_events),
      (SELECT MAX(end_time) FROM system.query.history),
      (SELECT MAX(usage_end_time) FROM system.billing.usage)
    )) AS selected_end_time,
    (date_trunc('HOUR', GREATEST(
      (SELECT MIN(event_time) FROM system.compute.warehouse_events),
      (SELECT MIN(start_time) FROM system.query.history),
      (SELECT MIN(usage_end_time) FROM system.billing.usage)
    )) + INTERVAL 1 HOUR)::timestamp AS selected_start_time
),

-- 2. GET HOURLY WAREHOUSE BILLING (The "Cost to distribute")
cpq_warehouse_usage AS (
  SELECT
    usage_metadata.warehouse_id AS warehouse_id,
    u.*
  FROM system.billing.usage AS u
  WHERE usage_metadata.warehouse_id IS NOT NULL
    AND usage_start_time >= (SELECT MIN(selected_start_time) FROM table_boundaries)
    AND usage_end_time <= (SELECT MAX(selected_end_time) FROM table_boundaries)
),

prices AS (
  SELECT coalesce(price_end_time, date_add(current_date, 1)) as coalesced_price_end_time, *
  FROM system.billing.list_prices
  WHERE currency_code = 'USD'
),

filtered_warehouse_usage AS (
    SELECT 
      u.warehouse_id,
      date_trunc('HOUR', u.usage_start_time) AS usage_start_hour,
      u.usage_quantity AS dbus,
      (CAST(p.pricing.effective_list.default AS FLOAT) * u.usage_quantity) AS usage_dollars
    FROM cpq_warehouse_usage AS u
    LEFT JOIN prices as p
      ON u.sku_name = p.sku_name
      AND u.usage_unit = p.usage_unit
      AND (u.usage_end_time BETWEEN p.price_start_time AND p.coalesced_price_end_time)
),

-- 3. GET QUERY HISTORY (We need ALL queries, not just Genie, to calculate the denominator correctly)
cpq_warehouse_query_history AS (
  SELECT
    statement_id,
    executed_by,
    statement_text,
    compute.warehouse_id AS warehouse_id,
    -- Calculate precise execution time excluding metadata overhead
    (COALESCE(CAST(total_task_duration_ms AS FLOAT) / 1000, 0) +
      COALESCE(CAST(result_fetch_duration_ms AS FLOAT) / 1000, 0) +
      COALESCE(CAST(compilation_duration_ms AS FLOAT) / 1000, 0)
    ) AS query_work_task_time,
    start_time,
    end_time,
    -- Normalize start/end times for calculation
    timestampadd(MILLISECOND , coalesce(waiting_at_capacity_duration_ms, 0) + coalesce(waiting_for_compute_duration_ms, 0) + coalesce(compilation_duration_ms, 0), start_time) AS query_work_start_time,
    timestampadd(MILLISECOND, coalesce(result_fetch_duration_ms, 0), end_time) AS query_work_end_time,
    -- Identify Genie Source
    CASE
      WHEN query_source.genie_space_id IS NOT NULL THEN 'GENIE SPACE'
      ELSE 'OTHER'
    END AS query_source_type,
    query_source.genie_space_id
  FROM system.query.history AS h
  WHERE statement_type IS NOT NULL
    AND start_time < (SELECT selected_end_time FROM table_boundaries)
    AND end_time > (SELECT selected_start_time FROM table_boundaries)
    AND total_task_duration_ms > 0
    AND compute.warehouse_id IS NOT NULL
),

-- 4. SPLIT QUERIES ACROSS HOURLY BUCKETS (Handling long-running queries)
hour_intervals AS (
  SELECT
    statement_id,
    warehouse_id,
    query_work_start_time,
    query_work_end_time,
    query_work_task_time,
    explode(
      sequence(
        0,
        floor((UNIX_TIMESTAMP(query_work_end_time) - UNIX_TIMESTAMP(date_trunc('hour', query_work_start_time))) / 3600)
      )
    ) AS hours_interval,
    timestampadd(hour, hours_interval, date_trunc('hour', query_work_start_time)) AS hour_bucket
  FROM cpq_warehouse_query_history
),

statement_proportioned_work AS (
    SELECT * , 
        GREATEST(0,
          UNIX_TIMESTAMP(LEAST(query_work_end_time, timestampadd(hour, 1, hour_bucket))) -
          UNIX_TIMESTAMP(GREATEST(query_work_start_time, hour_bucket))
        ) AS overlap_duration,
        CASE WHEN CAST(query_work_end_time AS DOUBLE) - CAST(query_work_start_time AS DOUBLE) = 0
        THEN 0
        ELSE query_work_task_time * (overlap_duration / (CAST(query_work_end_time AS DOUBLE) - CAST(query_work_start_time AS DOUBLE)))
        END AS proportional_query_work
    FROM hour_intervals
),

attributed_query_work_all AS (
    SELECT
      statement_id,
      hour_bucket,
      warehouse_id,
      SUM(proportional_query_work) AS attributed_query_work
    FROM statement_proportioned_work
    GROUP BY statement_id, warehouse_id, hour_bucket
),

-- 5. CALCULATE TOTAL WORK PER WAREHOUSE/HOUR
warehouse_time as (
  select
    warehouse_id,
    hour_bucket,
    SUM(attributed_query_work) as total_work_done_on_warehouse
  from attributed_query_work_all
  group by warehouse_id, hour_bucket
),

-- 6. ATTRIBUTE COSTS (Proportion of Work * Cost of Warehouse Hour)
history_with_pricing AS (
  SELECT
    a.statement_id,
    a.warehouse_id,
    a.hour_bucket,
    a.attributed_query_work,
    b.total_work_done_on_warehouse,
    wh.dbus AS total_warehouse_period_dbus,
    wh.usage_dollars AS total_warehouse_period_dollars,
    -- Logic: If I did 10% of the work, I pay 10% of the total bill (including the idle time inherent in the bill)
    CASE 
      WHEN b.total_work_done_on_warehouse = 0 THEN 0 
      ELSE a.attributed_query_work / b.total_work_done_on_warehouse 
    END AS query_task_time_proportion
  FROM attributed_query_work_all a
  INNER JOIN warehouse_time b ON a.warehouse_id = b.warehouse_id AND a.hour_bucket = b.hour_bucket
  LEFT JOIN filtered_warehouse_usage AS wh ON a.warehouse_id = wh.warehouse_id AND a.hour_bucket = wh.usage_start_hour
),

final_attribution AS (
  SELECT
    statement_id,
    (query_task_time_proportion * total_warehouse_period_dollars) AS query_attributed_dollars,
    (query_task_time_proportion * total_warehouse_period_dbus) AS query_attributed_dbus
  FROM history_with_pricing
)

-- 7. FINAL FILTER FOR GENIE OUTPUT
SELECT 
  q.genie_space_id,
  q.executed_by AS user_email,
  q.statement_id,
  q.start_time,
  q.warehouse_id,
  q.statement_text AS sql_code,
  SUM(fa.query_attributed_dbus) AS total_dbus_consumed,
  SUM(fa.query_attributed_dollars) AS total_cost_usd,
  MAX(q.query_work_task_time) as execution_duration_seconds
FROM cpq_warehouse_query_history q
JOIN final_attribution fa ON q.statement_id = fa.statement_id
WHERE q.query_source_type = 'GENIE SPACE' -- CRITICAL: Filter only for Genie here at the end
GROUP BY 1, 2, 3, 4, 5, 6
ORDER BY start_time DESC;
""")
print("Table created successfully.")

In [0]:
display(spark.table(f"{catalog_name}.{schema_name}.genie_cost_analysis_main_table"))

In [0]:
# Add a detailed table description
spark.sql(f"""
  COMMENT ON TABLE {catalog_name}.{schema_name}.genie_cost_analysis_main_table IS 
  'This table provides a granular cost attribution analysis for Databricks Genie spaces. It links individual SQL queries executed within Genie to the underlying SQL Warehouse compute costs. By calculating the "work proportion" of every query relative to the warehouse''s total activity, it estimates the specific DBU consumption and USD cost for each query statement.'
""")

This concludes this notebook. We now have created two critical tables that will help us monitor and analyse our genie spaces across multiple dimensions:
- Cost attribution and chargeback
- Usability and accuracy
- Insights to further improve the space by adding more tables, adding SQL examples, trusted assets etc



### Take this to the next level
You can enhance this notebook by incorporating more advanced techniques such as:
- Run topic modelling on user user questions to identify common patterns that can help you improve youe Genie space
- Incorporate more advanced cost attribution strategies for charge backs. See [LINK](https://github.com/databrickslabs/sandbox/tree/main/dbsql/cost_per_query/PrPr) and [LINK](https://www.databricks.com/resources/demos/tutorials/governance/system-tables?itm_data=demo_center) for inspiration.
- Incorporate this notebook as part of your custome Databricks Jobs process

### Appendix

REST API Implementation for get_genie_observability_table function using Python Requests library

In [0]:
# import requests
# import json
# from typing import Dict, Any, List
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType, IntegerType
# from pyspark.sql.functions import col, from_unixtime

# # Cache for User Email lookups to prevent redundant API calls
# USER_CACHE = {}

# def get_genie_observability_table(
#     space_id: str, 
#     databricks_token: str, 
#     host_url: str,
#     include_all_users: bool = True
# ) -> 'pyspark.sql.DataFrame':
#     """
#     Fetches and constructs a Spark DataFrame containing observability data for all messages in a Genie space.

#     Args:
#         space_id (str): The Genie space ID.
#         databricks_token (str): Databricks personal access token for authentication.
#         host_url (str): Databricks workspace host URL.
#         include_all_users (bool, optional): Whether to include all users' conversations. Defaults to True.

#     Returns:
#         pyspark.sql.DataFrame: DataFrame with observability records for the specified Genie space.
#     """
#     host_url = host_url.rstrip('/')
#     headers = {'Authorization': f'Bearer {databricks_token}', 'Content-Type': 'application/json'}
    
#     # Step 1: Get space name
#     space_url = f"{host_url}/api/2.0/genie/spaces/{space_id}"
#     space_resp = requests.get(space_url, headers=headers)
#     space_resp.raise_for_status()
#     space_name = space_resp.json().get('title', f"Space_{space_id}")
    
#     # Step 2: Get conversations
#     conversations = _get_all_conversations(space_id, host_url, headers, include_all_users)
    
#     # Step 3: Extract and Flatten
#     records = []
#     for conv in conversations:
#         messages = _get_all_conversation_messages(space_id, conv['conversation_id'], host_url, headers)
#         for msg in messages:
#             record = _extract_message_data(msg, space_id, space_name, host_url, headers)
#             records.append(record)
    
#     # Step 4: Create Spark DataFrame
#     spark = SparkSession.builder.getOrCreate()
#     schema = _get_schema()
    
#     if not records:
#         return spark.createDataFrame([], schema)
        
#     df = spark.createDataFrame(records, schema=schema)
    
#     # Convert timestamps to Datetime
#     df = df.withColumn("created_datetime", from_unixtime(col("created_timestamp") / 1000).cast(TimestampType())) \
#            .withColumn("last_updated_datetime", from_unixtime(col("last_updated_timestamp") / 1000).cast(TimestampType())) \
#            .orderBy("created_timestamp")
    
#     return df

# def _get_all_conversations(space_id: str, host_url: str, headers: Dict[str, str], 
#                            include_all: bool) -> List[Dict[str, Any]]:
#     """
#     Retrieves all conversations for a given Genie space, handling pagination.

#     Args:
#         space_id (str): Genie space ID.
#         host_url (str): Databricks workspace host URL.
#         headers (Dict[str, str]): HTTP headers for authentication.
#         include_all (bool): Whether to include all users' conversations.

#     Returns:
#         List[Dict[str, Any]]: List of conversation metadata dictionaries.
#     """
#     all_conversations = []
#     page_token = None
    
#     while True:
#         url = f"{host_url}/api/2.0/genie/spaces/{space_id}/conversations"
#         params = {'page_size': 100}
        
#         if page_token:
#             params['page_token'] = page_token
#         if include_all:
#             params['include_all'] = 'true'
        
#         response = requests.get(url, headers=headers, params=params)
#         response.raise_for_status()
#         result = response.json()
        
#         conversations = result.get('conversations', [])
#         all_conversations.extend(conversations)
        
#         page_token = result.get('next_page_token')
#         if not page_token:
#             break
    
#     return all_conversations


# def _get_all_conversation_messages(space_id: str, conversation_id: str, 
#                                    host_url: str, headers: Dict[str, str]) -> List[Dict[str, Any]]:
#     """
#     Retrieves all messages from a specific conversation, handling pagination.

#     Args:
#         space_id (str): Genie space ID.
#         conversation_id (str): Conversation ID.
#         host_url (str): Databricks workspace host URL.
#         headers (Dict[str, str]): HTTP headers for authentication.

#     Returns:
#         List[Dict[str, Any]]: List of message dictionaries.
#     """
#     all_messages = []
#     page_token = None
    
#     while True:
#         url = f"{host_url}/api/2.0/genie/spaces/{space_id}/conversations/{conversation_id}/messages"
#         params = {'page_size': 100}
        
#         if page_token:
#             params['page_token'] = page_token
        
#         response = requests.get(url, headers=headers, params=params)
#         response.raise_for_status()
#         result = response.json()
        
#         messages = result.get('messages', [])
#         all_messages.extend(messages)
        
#         page_token = result.get('next_page_token')
#         if not page_token:
#             break
    
#     return all_messages
    
# def _extract_message_data(message: Dict[str, Any], space_id: str, space_name: str, host_url: str, headers: dict) -> Dict[str, Any]:
#     """
#     Extracts and flattens relevant fields from a Genie message object for observability.

#     Args:
#         message (Dict[str, Any]): Message dictionary from Genie API.
#         space_id (str): Genie space ID.
#         space_name (str): Genie space name.
#         host_url (str): Databricks workspace host URL.
#         headers (dict): HTTP headers for authentication.

#     Returns:
#         Dict[str, Any]: Flattened record with observability fields.
#     """
#     # Resolve User Email
#     user_id = str(message.get('user_id')) if message.get('user_id') else None
#     user_email = _resolve_email(user_id, host_url, headers)

#     record = {
#         'space_id': space_id,
#         'space_name': space_name,
#         'message_id': message.get('message_id'),
#         'conversation_id': message.get('conversation_id'),
#         'user_id': user_id,
#         'user_email': user_email,
#         'status': message.get('status'),
#         'created_timestamp': message.get('created_timestamp'),
#         'last_updated_timestamp': message.get('last_updated_timestamp'),
#         'user_question': message.get('content'),
#     }
    
#     ai_responses, sql_queries, statement_ids, suggested_qs = [], [], [], []
    
#     for att in message.get('attachments', []):
#         # Text Attachment
#         if att.get('text'):
#             ai_responses.append(att['text'].get('content', ''))
        
#         # Query Attachment (Corrected sibling path)
#         query_obj = att.get('query')
#         if query_obj:
#             sql_queries.append(query_obj.get('query', ''))
#             s_id = query_obj.get('statement_id')
#             if s_id: statement_ids.append(str(s_id))
            
#         # Suggested Questions
#         if att.get('suggested_questions'):
#             suggested_qs.extend(att['suggested_questions'].get('questions', []))

#     # Helper to return None instead of empty string
#     def join_clean(lst, sep=' | '): return sep.join(filter(None, lst)) if lst else None

#     record.update({
#         'ai_response': join_clean(ai_responses),
#         'sql_query': join_clean(sql_queries),
#         'statement_id': join_clean(statement_ids),
#         'suggested_questions': join_clean(suggested_qs, sep=', '),
#         'num_attachments': len(message.get('attachments', []))
#     })
    
#     # Feedback - Where the "Review Comments" live
#     feedback = message.get('feedback', {})
#     record['feedback_rating'] = feedback.get('rating', 'NONE')
        
#     # Errors
#     error = message.get('error', {})
#     record['error_type'] = error.get('type')
#     record['error_message'] = error.get('error')
    
#     return record

# def _resolve_email(user_id: str, host_url: str, headers: dict) -> str:
#     """
#     Resolves a Databricks user ID to an email address using the SCIM API, with caching.

#     Args:
#         user_id (str): Databricks user ID.
#         host_url (str): Databricks workspace host URL.
#         headers (dict): HTTP headers for authentication.

#     Returns:
#         str: User email if found, or a fallback string.
#     """
#     if not user_id or user_id in ["None", "0"]: return None
#     if user_id in USER_CACHE: return USER_CACHE[user_id]
    
#     try:
#         url = f"{host_url}/api/2.0/preview/scim/v2/Users/{user_id}"
#         resp = requests.get(url, headers=headers)
#         if resp.status_code == 200:
#             email = resp.json().get('userName')
#             USER_CACHE[user_id] = email
#             return email
#     except: pass
#     return f"ID_{user_id}"

# def _get_schema() -> StructType:
#     """
#     Returns the schema for the Genie observability Spark DataFrame.

#     Returns:
#         StructType: Spark schema for observability records.
#     """
#     return StructType([
#         StructField("space_id", StringType(), True),
#         StructField("space_name", StringType(), True),
#         StructField("message_id", StringType(), True),
#         StructField("conversation_id", StringType(), True),
#         StructField("user_id", StringType(), True),
#         StructField("user_email", StringType(), True),
#         StructField("status", StringType(), True),
#         StructField("created_timestamp", LongType(), True),
#         StructField("last_updated_timestamp", LongType(), True),
#         StructField("user_question", StringType(), True),
#         StructField("ai_response", StringType(), True),
#         StructField("sql_query", StringType(), True),
#         StructField("statement_id", StringType(), True),
#         StructField("suggested_questions", StringType(), True),
#         StructField("num_attachments", IntegerType(), True),
#         StructField("feedback_rating", StringType(), True),
#         StructField("error_type", StringType(), True),
#         StructField("error_message", StringType(), True),
#     ])


In [0]:
# # Set your Genie space ID
# space_id = "XXX"  # Replace with your space ID

# # Use WorkspaceClient to get host and token
# from databricks.sdk import WorkspaceClient

# # Initialize the Databricks workspace client
# w = WorkspaceClient()

# # Retrieve the workspace host URL
# host = w.config.host

# # Retrieve the API token from the workspace client config
# token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

# # Fetch Genie observability data for the specified space
# df = get_genie_observability_table(space_id, token, host)

# # Display the resulting DataFrame
# display(df)

In [0]:
# from databricks.sdk import WorkspaceClient
# from datetime import datetime
# import pandas as pd

# # Initialize the Databricks workspace client
# w = WorkspaceClient()

# # Get authentication parameters
# token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
# host = w.config.host

# # Fetch all Genie spaces
# print("üîç Fetching all Genie spaces...")
# spaces = []
# page_token = None

# while True:
#     response = w.genie.list_spaces(page_token=page_token)
#     for s in response.spaces:
#         spaces.append({
#             "space_id": getattr(s, "space_id", None),
#             "name": getattr(s, "title", None),
#             "description": getattr(s, "description", None),
#             "warehouse_id": getattr(s, "warehouse_id", None)
#         })
#     if not response.next_page_token or response.next_page_token == "":
#         break
#     page_token = response.next_page_token

# print(f"‚úì Found {len(spaces)} Genie spaces")

# # Limit to 10 spaces
# MAX_SPACES = 10
# if len(spaces) > MAX_SPACES:
#     print(f"‚ö† Limiting processing to first {MAX_SPACES} spaces\n")
#     spaces = spaces[:MAX_SPACES]
# else:
#     print()

# # Collect observability data from all spaces
# all_dfs = []

# for i, space in enumerate(spaces, 1):
#     space_id = space['space_id']
#     space_name = space['name']
    
#     print(f"\n{'='*80}")
#     print(f"[{i}/{len(spaces)}] Processing Space: {space_name}")
#     print(f"Space ID: {space_id}")
#     print(f"{'='*80}")
    
#     try:
#         # Get observability data for this space
#         df_space = get_genie_observability_table(space_id, token, host)
        
#         if df_space.count() > 0:
#             all_dfs.append(df_space)
#             print(f"\n‚úì Successfully extracted {df_space.count()} messages from {space_name}")
#         else:
#             print(f"\n‚ö† No messages found in {space_name}")
            
#     except Exception as e:
#         print(f"\n‚ùå Error processing space {space_name}: {str(e)}")
#         continue

# # Combine all DataFrames
# if all_dfs:
#     print(f"\n\n{'='*80}")
#     print("üìä COMBINING ALL RESULTS")
#     print(f"{'='*80}")
    
#     # Union all DataFrames
#     df = all_dfs[0]
#     for df_next in all_dfs[1:]:
#         df = df.union(df_next)
    
#     total_messages = df.count()
#     total_spaces = df.select("space_id").distinct().count()
    
#     print(f"\n‚úÖ SUCCESS!")
#     print(f"   Total Spaces Processed: {total_spaces}")
#     print(f"   Total Messages Extracted: {total_messages}")
#     print(f"\n{'='*80}\n")
    
#     display(df)
# else:
#     print("\n‚ö† No data found across any Genie spaces")
#     df = None