we query the **schema_registry** table and list all the downstream information in the **lineage_data** table for dashboard.

In [0]:
# Step 1: Create a text widget for the table name
dbutils.widgets.text("table_name", "ds_training_1.default.schema_registry") #,"Enter Table Name")  heading

In [0]:
# Step 2: Get the table name from the widget
table_name_variable = dbutils.widgets.get("table_name")

In [0]:
# Required Libraries
import requests
import json
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import split

# Step 1: Fetch distinct table names from SQL
table_names_query = f"""
SELECT DISTINCT CONCAT(catalog_name, ".", schema_name, ".", table_name) AS table_name
FROM {table_name_variable}
-- WHERE DATE(check_timestamp) = CURRENT_DATE()
"""

table_names_df = spark.sql(table_names_query)
table_names = [row.table_name for row in table_names_df.collect()]
print(table_names)

# Step 2: Define schema for lineage records
schema = StructType([
    StructField("source_table", StringType(), True),
    StructField("lineage_type", StringType(), True),
    StructField("workspace_id", LongType(), True),
    StructField("entity_type", StringType(), True),
    StructField("entity_id", StringType(), True),
    StructField("related_table", StringType(), True),
    StructField("catalog_name", StringType(), True),
    StructField("schema_name", StringType(), True),
    StructField("lineage_timestamp", StringType(), True)
])

# Set your API endpoint and headers
databricks_instance = "adb-2376768479807879.19.azuredatabricks.net"
databricks_token = "dapi4f38881870140599337cb3a2d4855f16-3"

headers = {
    "Authorization": f"Bearer {databricks_token}",
    "Content-Type": "application/json"
}

# Step 3: Function to extract lineage details from the API response
def extract_lineage_data(lineage_data, source_table, lineage_type):
    records = []
    for lineage_entity in lineage_data:
        table_info = lineage_entity.get("tableInfo", {})
        job_infos = lineage_entity.get("jobInfos", [])
        notebook_infos = lineage_entity.get("notebookInfos", [])
        query_infos = lineage_entity.get("queryInfos", [])
        dashboard_infos = lineage_entity.get("dashboardV3Infos", [])
        file_info = lineage_entity.get("fileInfo", {})

        # Collect lineage records for the table
        if table_info.get("name"):
            #full_table_name = f"{table_info.get('catalog_name', '')}.{table_info.get('schema_name', '')}.{table_info.get('name', '')}"
            records.append(Row(
                source_table=source_table,
                lineage_type=lineage_type,
                workspace_id=None,
                entity_type="TABLE",
                entity_id=None,
                related_table=table_info.get("name", ""), #full_table_name,
                catalog_name=table_info.get("catalog_name", ""),
                schema_name=table_info.get("schema_name", ""),
                lineage_timestamp=None
            ))

        # Collect information for jobs, notebooks, queries, dashboards, and files
        for job in job_infos:
            records.append(Row(
                source_table=source_table,
                lineage_type=lineage_type,
                workspace_id=job.get("workspace_id"),
                entity_type="JOB",
                entity_id=str(job.get("job_id")),
                related_table=table_info.get("name", ""),
                catalog_name=table_info.get("catalog_name", ""),
                schema_name=table_info.get("schema_name", ""),
                lineage_timestamp=job.get("lineage_timestamp", "")
            ))

        for notebook in notebook_infos:
            records.append(Row(
                source_table=source_table,
                lineage_type=lineage_type,
                workspace_id=notebook.get("workspace_id"),
                entity_type="NOTEBOOK",
                entity_id=str(notebook.get("notebook_id")),
                related_table=table_info.get("name", ""),
                catalog_name=table_info.get("catalog_name", ""),
                schema_name=table_info.get("schema_name", ""),
                lineage_timestamp=notebook.get("lineage_timestamp", "")
            ))

        for query in query_infos:
            records.append(Row(
                source_table=source_table,
                lineage_type=lineage_type,
                workspace_id=query.get("workspace_id"),
                entity_type="DBSQL_QUERY",
                entity_id=query.get("query_id"),
                related_table=table_info.get("name", ""),
                catalog_name=table_info.get("catalog_name", ""),
                schema_name=table_info.get("schema_name", ""),
                lineage_timestamp=query.get("lineage_timestamp", "")
            ))

        for dashboard in dashboard_infos:
            records.append(Row(
                source_table=source_table,
                lineage_type=lineage_type,
                workspace_id=dashboard.get("workspace_id"),
                entity_type="DBSQL_DASHBOARD",
                entity_id=dashboard.get("dashboard_id"),
                related_table=table_info.get("name", ""),
                catalog_name=table_info.get("catalog_name", ""),
                schema_name=table_info.get("schema_name", ""),
                lineage_timestamp=dashboard.get("lineage_timestamp", "")
            ))
            
        if file_info:
            records.append(Row(
                source_table=source_table,
                lineage_type=lineage_type,
                workspace_id=None,
                entity_type="FILE",
                entity_id=file_info.get("securable_name", ""),
                related_table=table_info.get("name", ""),
                catalog_name=table_info.get("catalog_name", ""),
                schema_name=table_info.get("schema_name", ""),
                lineage_timestamp=file_info.get("lineage_timestamp", "")
            ))

    return records

# Step 4: Initialize an empty list to hold all lineage records
all_lineage_records = []

# Step 5: Loop through each table name to get lineage data
for table_name in table_names:
    # API endpoint for lineage tracking
    url = f"https://{databricks_instance}/api/2.0/lineage-tracking/table-lineage"

    # Data payload for the API
    payload = {
        "table_name": table_name,
        "include_entity_lineage": True
    }

    # Call the API to get the lineage data
    response = requests.get(url, headers=headers, json=payload)

    # Ensure successful response
    if response.status_code == 200:
        lineage_data = response.json()
        # Extract downstream lineage records
        downstream_records = extract_lineage_data(lineage_data.get("downstreams", []), table_name, "downstream")
        all_lineage_records.extend(downstream_records)
    else:
        print(f"Failed to fetch lineage data for {table_name}: {response.status_code} - {response.text}")

# Step 6: Create a DataFrame with all lineage records
if all_lineage_records:
    lineage_df = spark.createDataFrame(all_lineage_records, schema=schema)
    display(lineage_df)

    # Step 1: Split source_table into catalog_name, schema_name, and table_name
    # Create a new DataFrame by splitting the source_table in lineage_df
    new_df = lineage_df.select(
        split(lineage_df.source_table, '\\.').getItem(0).alias("catalog_name"),
        split(lineage_df.source_table, '\\.').getItem(1).alias("schema_name"),
        split(lineage_df.source_table, '\\.').getItem(2).alias("table_name"),
        lineage_df.entity_type,
        lineage_df.entity_id)
    

    # Create a global temporary view from the lineage DataFrame


    # Step 7: Upsert lineage data into the lineage_data table using SQL
    spark.sql("""
    CREATE TABLE IF NOT EXISTS ds_training_1.default.lineage_data (
        catalog_name STRING,
        schema_name STRING,
        table_name STRING,
        entity_type STRING,
        entity_id STRING
    )
    """)



# Step 2: Create a global temporary view from the new DataFrame
new_df.createOrReplaceTempView("lineage_info")

# Step 3: Create the target table if it doesn't exist
spark.sql("""
-- CREATE TABLE if NOT EXISTS ds_training_1.default.lineage_data
CREATE OR REPLACE TABLE ds_training_1.default.lineage_data (
    catalog_name STRING,
    schema_name STRING,
    table_name STRING,
    entity_type STRING,
    entity_id STRING
)
""")

# Step 4: Perform an Upsert using MERGE INTO to insert or update records
spark.sql("""
MERGE INTO ds_training_1.default.lineage_data AS target
USING (
    SELECT DISTINCT
        catalog_name,
        schema_name,
        table_name,
        entity_type,
        entity_id
    FROM lineage_info
) AS source
ON target.catalog_name = source.catalog_name
   AND target.schema_name = source.schema_name
   AND target.table_name = source.table_name
   and target.entity_id = source.entity_id
WHEN MATCHED THEN
  UPDATE SET
    target.catalog_name = source.catalog_name,
    target.schema_name = source.schema_name,
    target.table_name = source.table_name,
    target.entity_type = source.entity_type,
    target.entity_id = source.entity_id
WHEN NOT MATCHED THEN
  INSERT (catalog_name, schema_name, table_name, entity_type, entity_id)
  VALUES (source.catalog_name, source.schema_name, source.table_name, source.entity_type, source.entity_id);
""")



['ds_training_1.ds_gold.daily_business_metrics_brindavivek', 'ds_training_1.ds_gold.customer_purchase_insights_brinda', 'ds_training_1.ds_silver.customer_silver_vishal', 'ds_training_1.ds_gold.book_author', 'ds_training_1.ds_silver.book', 'ds_training_1.ds_gold.customer_purchase_insights', 'ds_training_1.ds_silver.book_parquet', 'ds_training_1.ds_bronze.book_xml_vol', 'ds_training_1.ds_silver.employee_details_valid_records_new5']


source_table,lineage_type,workspace_id,entity_type,entity_id,related_table,catalog_name,schema_name,lineage_timestamp
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,1108203924966037,,,,2024-09-18 09:52:17.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,274536131047225,,,,2024-09-18 09:50:18.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,694574183887404,,,,2024-09-18 09:38:53.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,975119935895425,,,,2024-09-18 09:37:14.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,789011513787645,,,,2024-09-18 09:35:27.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,12326224624388,,,,2024-09-18 09:33:26.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,976713508584610,,,,2024-09-18 09:28:53.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,282366352711397,,,,2024-09-18 08:34:08.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,365407680350675,,,,2024-09-17 11:25:37.0
ds_training_1.ds_gold.daily_business_metrics_brindavivek,downstream,2376768479807879.0,JOB,48135754077660,,,,2024-09-17 11:24:50.0


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]