In [0]:
%pip install databricks-sdk --upgrade
dbutils.library.restartPython()

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import workspace

w = WorkspaceClient()
workspace_id = w.get_workspace_id()

response = w.query_history.list()
for r in response:
    print(r.statement_type.value)

In [0]:
data_type = dbutils.widgets.get("data_type")
target_table = dbutils.widgets.get("target_table")

In [0]:

from dbx_scraper import DBXScraper

scraper = DBXScraper(data_type, target_table, workspace_id, spark)

scraper.execute()

In [0]:
%sql
-- DROP TABLE gary_burgett.dlt.query_history

In [0]:
def fetch_query_history_to_df(databricks_client, workspace_id, start_time, end_time):
    # Pull data from Query History API
    response = databricks_client.query_history.list(
        workspace_id=workspace_id,
        filter_by_start_time=start_time,
        filter_by_end_time=end_time
    )
    records = response.get('statements', [])

    # Map API response to system.query.history schema
    rows = []
    for r in records:
        row = {
            'account_id': r.get('account_id'),
            'workspace_id': r.get('workspace_id'),
            'statement_id': r.get('statement_id'),
            'executed_by': r.get('executed_by'),
            'session_id': r.get('session_id'),
            'execution_status': r.get('status'),
            'compute': r.get('compute'),
            'executed_by_user_id': r.get('executed_by_user_id'),
            'statement_text': r.get('statement_text'),
            'statement_type': r.get('statement_type'),
            'error_message': r.get('error_message'),
            'client_application': r.get('client_application'),
            'client_driver': r.get('client_driver'),
            'total_duration_ms': r.get('metrics', {}).get('total_duration_ms'),
            'waiting_for_compute_duration_ms': r.get('metrics', {}).get('waiting_for_compute_duration_ms'),
            'waiting_at_capacity_duration_ms': r.get('metrics', {}).get('waiting_at_capacity_duration_ms'),
            'execution_duration_ms': r.get('metrics', {}).get('execution_duration_ms'),
            'compilation_duration_ms': r.get('metrics', {}).get('compilation_duration_ms'),
            'total_task_duration_ms': r.get('metrics', {}).get('total_task_duration_ms'),
            'result_fetch_duration_ms': r.get('metrics', {}).get('result_fetch_duration_ms'),
            'start_time': r.get('start_time'),
            'end_time': r.get('end_time'),
            'update_time': r.get('update_time'),
            'read_partitions': r.get('metrics', {}).get('read_partitions'),
            'pruned_files': r.get('metrics', {}).get('pruned_files'),
            'read_files': r.get('metrics', {}).get('read_files'),
            'read_rows': r.get('metrics', {}).get('read_rows'),
            'produced_rows': r.get('metrics', {}).get('produced_rows'),
            'read_bytes': r.get('metrics', {}).get('read_bytes'),
            'read_io_cache_percent': r.get('metrics', {}).get('read_io_cache_percent'),
            'from_result_cache': r.get('metrics', {}).get('from_result_cache'),
            'spilled_local_bytes': r.get('metrics', {}).get('spilled_local_bytes'),
            'written_bytes': r.get('metrics', {}).get('written_bytes'),
            'shuffle_read_bytes': r.get('metrics', {}).get('shuffle_read_bytes'),
            'query_source': r.get('query_source'),
            'executed_as_user_id': r.get('executed_as_user_id'),
            'executed_as': r.get('executed_as')
        }
        rows.append(row)

    # Convert to Spark DataFrame
    df = spark.createDataFrame(rows)
    return df

In [0]:
%sql
SELECT * FROM system.query.history LIMIT 1000