# REST API example for "Bring your own data lineage" for Databricks AI/BI Dashboards

* Official documentation can be found here: https://learn.microsoft.com/en-us/azure/databricks/data-governance/unity-catalog/external-lineage
* REST API documentation: https://docs.databricks.com/api/azure/workspace/externalmetadata/getexternalmetadata

* Yes, lineage should be in the system tables, but you won’t find it in the visualized lineage yet ;D. That said, you can apply the same logic used here to other external systems.
* Keep in mind that this quickly created notebook includes one-time insert logic only. 

## Things to consider:
If you have multiple AI/BI dashboards, especially many that share the same tables, the data lineage can become messy when importing everything at once.

Instead of applying mass automation, consider using this process selectively for only the most relevant dashboards. Currently, the tool scans all dashboards by default.

### Import libraries

In [0]:
import requests
import json
import re
from dbruntime.databricks_repl_context import get_context

### Get auth token & server hostname

In [0]:
server_hostname = f"https://{get_context().browserHostName}"
token = get_context().apiToken
current_user = get_context().user 

### Functions

In [0]:
def run_rest_api(api_version: str, api_command: str, action_type: str,  payload: dict = {}) -> str:
    '''
    Dynamic rest api function. Since this is running in notebook, it's possible to use local server_hostname & token variables here. Keep in mind that I'm not breaking anything here, just returning error message if API call fails.
    '''
    try:
        assert action_type in ['POST', 'GET'], f'Only POST and GET are supported but you used {action_type}'
        url = f"{server_hostname}/api/{api_version}/{api_command}"
        headers = {'Authorization': 'Bearer %s' % token}
        session = requests.Session()

        resp = session.request(action_type, url, data=json.dumps(payload), verify=True, headers=headers)
        assert resp.status_code == 200, f"Running REST API has failed with an error message: {resp.json()}"
        result = resp.json()
        return result
    except Exception as e:
        return e

def get_all_dashboards() -> list:
    """
    Fetches all dashboards and returns a list of dictionaries,
    where each dictionary maps dashboard_id to a cleaned display_name.
    Cleaning:
    - Lowercase
    - Spaces to dashes
    - Special characters removed
    """
    
    def text_cleaner(text: str) -> str:
        text = text.lower()
        text = re.sub(r'\s+', '-', text)
        text = re.sub(r'[^a-z0-9\-]', '', text)
        text = re.sub(r'-{2,}', '-', text)
        return text.strip('-')
    
    api_version = '2.0'
    api_command = 'lakeview/dashboards'
    action_type = 'GET'
    
    result = run_rest_api(api_version=api_version, api_command=api_command, action_type=action_type)

    # Build list of dashboard_id: cleaned_display_name pairs
    dashboard_list = [
        {d['dashboard_id']: text_cleaner(d['display_name'])} 
        for d in result.get('dashboards', [])
    ]

    print("*" * 25)
    print("ALL AI/BI DASHBOARDS:")
    print(dashboard_list)
    print("*" * 25)

    return dashboard_list


def get_all_tables(dashboard_id: str) -> list:
    '''
    Get all tables from AI/BI Dashboard
    '''
    api_version = '2.0'
    api_command = f'lakeview/dashboards/{dashboard_id}'
    action_type = 'GET'
    result = run_rest_api(api_version=api_version, api_command=api_command, action_type=action_type)['serialized_dashboard']

    # Use Regex to find all tables (catalogName.schemaName.tableName)
    pattern = r'\b([\w\-]+)\.([\w\-]+)\.([\w\-]+)\b'
    matches = re.findall(pattern, result)

    # Rebuild full table names from matches
    tables = [f"{catalog}.{schema}.{table}" for catalog, schema, table in matches]

    # Validate and keep only existing tables
    validated_tables = []

    for t in tables:
        try:
            spark.sql(f"DESCRIBE TABLE EXTENDED {t}")
            validated_tables.append(t)
        except Exception as e:
            continue

    print("*"*25)
    print(f"Found the next tables for Dashboard ID {dashboard_id}:")
    print(validated_tables)
    print("*"*25)
    return validated_tables

def get_all_columns_for_table(table_name: str) -> list:
    column_list = spark.read.table(table_name).columns
    print("-"*25)
    print(f"Found the next columns for table {table_name}:")
    print(column_list)
    print("-"*25)
    return column_list

def create_external_metadata(dashboard_name: str, dashboard_id: str, column_list: list = []) -> None:
    api_version = '2.0'
    api_command = 'lineage-tracking/external-metadata'
    action_type = 'POST'
    payload = {
  #      "columns": column_list,                         # <-- Activate this if you want to add columns as well
        "description": "Databricks AI/BI Dashboard",
        "entity_type": "AI/BI Dashboard",
        "name": f"{dashboard_name}",
        "properties": {
                        "custom_properties": "Ikidata"
                        },
    #    "owner": current_user,  # Error message -> 'message': 'Must not supply an owner.' deactivated for purpose?
        "system_type": "OTHER",  # DATABRICKS is mentioned in REST API documentation but isn't working - blocked? :(
        "url": f"{server_hostname}/dashboardsv3/{dashboard_id}"
    }
    results = run_rest_api(api_version=api_version, api_command=api_command, action_type=action_type, payload=payload)
    print("*"*25)
    print(f"External metadata created successfully for dashboard '{dashboard_name}' ({dashboard_id})")
    print("*"*25)

def create_external_lineage(table_name: str, dashboard_name: str) -> str:
    api_version = '2.0'
    api_command = 'lineage-tracking/external-lineage'
    action_type = 'POST'
    payload = {
        "source": {
            "table": {
                "name": table_name
            }
        },
        "target": {
            "external_metadata": {
                "name": dashboard_name
            }
        }
    }
    result = run_rest_api(api_version=api_version, api_command=api_command, action_type=action_type, payload=payload)
    print("*"*25)
    print(f"Lineage added successfully for table {table_name} in dashboard '{dashboard_name}'")
    print("*"*25)

### Get all AI/BI Dashboards. 
Consider limiting the scope here

In [0]:
dashboard_list = get_all_dashboards()                                      # Get all AI/BI Dashboards
#dashboard_list = dashboard_list[0:1]

### Loop all selected dashboards and let the automation handle the rest

In [0]:
for dashboard in dashboard_list:                                           # Loop all AI/BI Dashboards
    dashboard_id = list(dashboard.keys())[0]                               # Get Dashboard ID
    dashboard_name = list(dashboard.values())[0]                           # Get Dashboard name
    table_list = get_all_tables(dashboard_id)                              # Get all tables Dashboard is using
    create_external_metadata(dashboard_name, dashboard_id)                 # Create external metadata, columns deactivated
    for table in table_list:                                               # Loop all tables
        #column_list = get_all_columns_for_table(table)      # Get all columns for the table <- here you could loop all columns
        create_external_lineage(table, dashboard_name)                     # Create external lineage for each table