In [None]:
import time
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait

import pandas as pd
import requests
from databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient

pd.set_option('display.max_colwidth', None)

In [None]:
catalog = "main"
schema = "incremental_dlt"

In [None]:
spark = DatabricksSession.builder.getOrCreate()

In [None]:
# Initialize client and auth
w = WorkspaceClient()
pipelines = w.pipelines.list_pipelines()
token_value = w.tokens.create(comment=f"sdk-{time.time_ns()}").token_value
host = w.config.host

In [None]:

# Prepare headers
headers = {
    "Authorization": f"Bearer {token_value}"
}

# Worker function to fetch events for a pipeline
def fetch_events(pipeline):
    """Fetch pipeline event logs from the Databricks API for a given pipeline.

    Args:
        pipeline: An object containing a `pipeline_id` attribute.

    Returns:
        A dictionary or JSON response containing pipeline events.

    """
    pipeline_id = pipeline.pipeline_id
    url = f"{host}/api/2.0/pipelines/{pipeline_id}/events"
    query_params = {}
    pipeline_events = []

    try:
        while True:
            response = requests.get(url, headers=headers, params=query_params)
            response.raise_for_status()
            resp = response.json()
            events = resp.get("events", [])
            if not events:
                break  # Skip if no events

            # Tag events with pipeline info
            for event in events:
                event["pipeline_id"] = pipeline_id
                event["pipeline_name"] = pipeline.name
                pipeline_events.append(event)

            # Set up pagination for the next page
            if 'next_page_token' in resp:
                query_params["page_token"] = resp["next_page_token"]
            else:
                break  # No more pages to fetch

        print(f"Fetched {len(pipeline_events)} events for {pipeline.name}")
        return pipeline_events
    except Exception as e:
        print(f"Failed for {pipeline.name}: {e}")
        return []

In [None]:
max_threads = 5
eventful_pipeline_limit = 100
eventful_pipeline_count = 0
all_events = []

start = time.time()

with ThreadPoolExecutor(max_workers=max_threads) as executor:
    # Generator to control submission
    pipeline_iter = iter(pipelines)
    running_futures = []

    while eventful_pipeline_count < eventful_pipeline_limit:
        # Fill up the thread pool
        while len(running_futures) < max_threads:
            try:
                p = next(pipeline_iter)
                future = executor.submit(fetch_events, p)
                running_futures.append(future)
            except StopIteration:
                break  # No more pipelines to submit

        # Wait for any future to complete
        done, _ = wait(running_futures, return_when=FIRST_COMPLETED)
        for future in done:
            running_futures.remove(future)
            result = future.result()
            if result:
                all_events.extend(result)
                eventful_pipeline_count += 1
                if eventful_pipeline_count >= eventful_pipeline_limit:
                    break

print(
    f"\nCollected events from {eventful_pipeline_count} pipelines in "
    f"{round(time.time() - start, 2)} seconds"
)

In [None]:
# Convert to DataFrame
all_pipelines_df = pd.DataFrame(all_events)
len(all_pipelines_df['pipeline_id'].unique())

In [None]:
spark_df = spark.createDataFrame(all_pipelines_df)
spark_df.createOrReplaceTempView("demo_event_log")

In [None]:
query = "SELECT id, sequence, origin, timestamp, message, level, event_type, maturity_level, pipeline_id, pipeline_name, error FROM demo_event_log"

result_df = spark.sql(query)
result_df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catalog}.{schema}.raw_event_log")

In [None]:
query = """
WITH refresh_method AS (
  SELECT 
    origin.org_id, 
    origin.pipeline_id,
    timestamp, 
    origin.pipeline_name, 
    REGEXP_EXTRACT(message, 'executed as ([A-Z_]+)', 1) AS refresh_type,

    -- maintenance type
    CASE 
      WHEN get(details.planning_information.technique_information, 0).is_chosen = true 
        THEN get(
          details.planning_information.technique_information, 0
        ).maintenance_type 
      WHEN get(details.planning_information.technique_information, 1).is_chosen = true 
        THEN get(
          details.planning_information.technique_information, 1
        ).maintenance_type 
      WHEN get(details.planning_information.technique_information, 2).is_chosen = true 
        THEN get(
          details.planning_information.technique_information, 2
        ).maintenance_type 
      WHEN get(details.planning_information.technique_information, 3).is_chosen = true 
        THEN get(
          details.planning_information.technique_information, 3
        ).maintenance_type 
      ELSE NULL 
    END AS chosen_maintenance_type,

    -- cost
    CASE 
      WHEN get(details.planning_information.technique_information, 0).is_chosen = true 
        THEN get(details.planning_information.technique_information, 0).cost
      WHEN get(details.planning_information.technique_information, 1).is_chosen = true 
        THEN get(details.planning_information.technique_information, 1).cost
      WHEN get(details.planning_information.technique_information, 2).is_chosen = true 
        THEN get(details.planning_information.technique_information, 2).cost
      WHEN get(details.planning_information.technique_information, 3).is_chosen = true 
        THEN get(details.planning_information.technique_information, 3).cost
      ELSE NULL 
    END AS cost,

    -- recompute reason
    CASE 
      WHEN (
        CASE 
          WHEN get(details.planning_information.technique_information, 0).is_chosen = true 
            THEN get(
              details.planning_information.technique_information, 0
            ).maintenance_type 
          WHEN get(details.planning_information.technique_information, 1).is_chosen = true 
            THEN get(
              details.planning_information.technique_information, 1
            ).maintenance_type 
          WHEN get(details.planning_information.technique_information, 2).is_chosen = true 
            THEN get(
              details.planning_information.technique_information, 2
            ).maintenance_type 
          WHEN get(details.planning_information.technique_information, 3).is_chosen = true 
            THEN get(
              details.planning_information.technique_information, 3
            ).maintenance_type 
          ELSE NULL 
        END
      ) = 'MAINTENANCE_TYPE_COMPLETE_RECOMPUTE' 
      THEN COALESCE(
        CASE 
          WHEN get(
            get(details.planning_information.technique_information, 1)
              .incrementalization_issues, 0
          ).issue_type = 'EXPECTATIONS_NOT_SUPPORTED' 
            THEN 'EXPECTATIONS_NOT_SUPPORTED' 
          ELSE NULL 
        END,
        CASE 
          WHEN get(
            get(details.planning_information.technique_information, 2)
              .incrementalization_issues, 0
          ).issue_type = 'EXPECTATIONS_NOT_SUPPORTED' 
            THEN 'EXPECTATIONS_NOT_SUPPORTED' 
          ELSE NULL 
        END,
        get(
          get(details.planning_information.technique_information, 0)
            .incrementalization_issues, 0
        ).issue_type,
        get(
          get(details.planning_information.technique_information, 1)
            .incrementalization_issues, 0
        ).issue_type,
        get(
          get(details.planning_information.technique_information, 2)
            .incrementalization_issues, 0
        ).issue_type,
        get(
          get(details.planning_information.technique_information, 3)
            .incrementalization_issues, 0
        ).issue_type,
        'UNKNOWN_ISSUE'
      )
      ELSE 'incremental recompute'
    END AS recompute_reason
  FROM demo_event_log
  WHERE event_type = 'planning_information'
),

flow_info AS (
  SELECT 
    origin.org_id, 
    details.flow_definition.output_dataset AS output_dataset,
    details.flow_definition.explain_text AS query,
    details.flow_definition.flow_type AS table_type
  FROM demo_event_log
  WHERE event_type = 'flow_definition'
)

SELECT 
  r.timestamp,
  r.pipeline_id,
  r.pipeline_name,
  r.refresh_type,
  r.chosen_maintenance_type,
  r.recompute_reason,
  r.cost,
  f.output_dataset, 
  f.query, 
  f.table_type
FROM refresh_method r
LEFT JOIN flow_info f 
  ON r.org_id = f.org_id
"""

In [None]:
result_df = spark.sql(query)
result_df.write.mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog}.{schema}.combined_event_log")