## Fabric Databricks Mirroring notebook

Purpose: enable automation and finer control of creation of Fabric Databrick Mirror item, improving readability and audit

- Use UC APIs to collect appropriate metadata on catalog, schema, table and tag metadata - Note - specific schema names as input
- Assumption is that there are tags called Domain and Subdomain available in UC. If there is no domain tag, then "undefined domain" will be the name of the Databricks Mirror item
- Use Fabric APIs to create appropriate Databricks mirror items

#### Prereqs

- review https://learn.microsoft.com/en-us/fabric/mirroring/azure-databricks-tutorial
- ensure USE EXTERNAL SCHEMA at catalog or schema level in UC as appropriate to relevant users
- create appropriate connection in Fabric to connect to UC - use a naming convention to ease reuse


In [None]:
%pip install semantic-link-labs

In [None]:
import base64
import json
import requests
from collections import defaultdict
from notebookutils import mssparkutils
import sempy.fabric as fabric  


In [None]:
fabric_ws_akv = "https://fab279akv1.vault.azure.net/"
adb_ws_id_s = "databricks-ws-id"
adb_ws_token_s= "databricks-ws-token"

# get the Databricks workspace id and PAT
dbx_workspace = notebookutils.credentials.getSecret(
    fabric_ws_akv,
    adb_ws_id_s)

dbx_token = notebookutils.credentials.getSecret(
    fabric_ws_akv,
    adb_ws_token_s)

In [None]:
# Databricks workspace

# Unity Catalog
dbx_uc_catalog = "databricks279wus3"
#dbx_uc_schemas = '["bakers", "publicchange2", "nyctaxi"]'
dbx_uc_schemas = '["healthv"]'
#if using the current workspace leave this, otherwise replace it with target workspace id
workspace_id = fabric.get_workspace_id()  
print(f' Workspace id is : {workspace_id}') 
# Fabric 
fab_workspace_id = workspace_id
fab_adls_connection_id = ""
fab_dbx_workspace_connection_id = "469a508f-77d4-4a5f-a629-251fd0d3b4bf	"

#### Databricks Utilities
(explanation of code inside Utils class below)
###### a. get_dbx_uc_schemas(databricks_config)
- Purpose: Fetches schemas from a specified Databricks Unity Catalog, and for each schema, retrieves its associated tags.
- How it works:Reads Databricks workspace URL, token, catalog, and list of schemas from databricks_config.
Calls the Unity Catalog API to list schemas in the specified catalog.
Filters schemas to only those listed in dbx_uc_schemas.
For each schema, fetches its tags via another API call.
Returns a list of schema info dictionaries, each possibly containing a tags field.
###### b. _normalize_tag_map(tag_list)
- Purpose:Converts a list of tag dictionaries into a simple {key: value} mapping.
- How it works:Iterates over the tag list, extracting the key and value from each tag, handling different possible field names.
###### c. group_schemas_by_domain(schemas_with_tags)
- Purpose: Groups schemas by their domain and subdomain tags.
- How it works: For each schema, normalizes its tags.
Extracts domain and subdomain (defaults to "undefined_domain"/"undefined_subdomain" if missing).
Groups full schema names under their domain/subdomain.
Returns a nested dictionary: {domain: {subdomain: [full_schema_names]}}.
##### Fabric Utilities
###### a. _group_fullnames_by_catalog(fullnames)
- Purpose:Groups a list of catalog.schema strings by catalog.
- How it works:Splits each string at the first dot, grouping schemas under their catalog.
###### b. _get_fabric_token(audience)
- Purpose:Obtains an access token for Microsoft Fabric APIs.
- How it works:Uses notebookutils.credentials.getToken to get a token for the specified audience.
###### c. create_uc_mirroring(fabric_config, grouped_by_domain, auto_sync=True, create_if_missing=True)
- Purpose:For each group of schemas (by domain/subdomain), creates a mirrored Azure Databricks catalog in Microsoft Fabric via API.
- How it works:Reads Fabric workspace and connection IDs from fabric_config.
Gets an access token.
For each domain/subdomain group, groups schemas by catalog.
For each catalog, builds a payload describing the schemas to mirror.
Encodes the payload in base64 and wraps it in a higher-level definition.
Sends a POST request to the Fabric API to create the mirrored catalog.
Collects and returns the results of successful creations.


### Class and Functions

In [None]:
class Utils:
    FABRIC_API_ENDPOINT = "api.fabric.microsoft.com"
    ONELAKE_API_ENDPOINT = "onelake.dfs.fabric.microsoft.com"

    # ---------------------------
    # DBX utils
    # ---------------------------

    @staticmethod
    def get_dbx_uc_schemas(databricks_config):
        all_schemas = []

        dbx_workspace = databricks_config["dbx_workspace"].rstrip("/")
        dbx_token = databricks_config["dbx_token"]
        dbx_uc_catalog = databricks_config["dbx_uc_catalog"]
        dbx_uc_schemas = databricks_config["dbx_uc_schemas"]

        url = f"{dbx_workspace}/api/2.1/unity-catalog/schemas?catalog_name={dbx_uc_catalog}"
        headers = {
            "Authorization": f"Bearer {dbx_token}",
            "Content-Type": "application/json",
        }
        response = requests.get(url, headers=headers)

        if response.status_code != 200:
            print(f"! Upps [{response.status_code}] Cannot connect to Unity Catalog. Please review configs.")
            return None

        schemas = response.json().get("schemas", [])
        schemas_to_process = [s for s in schemas if s["name"] in dbx_uc_schemas]

        for schema_info in schemas_to_process:
            schema_name = schema_info["name"]
            full_schema_name = f"{dbx_uc_catalog}.{schema_name}"

            tags_url = f"{dbx_workspace}/api/2.1/unity-catalog/entity-tag-assignments/schemas/{full_schema_name}/tags"
            tag_response = requests.get(tags_url, headers=headers)

            if tag_response.status_code == 200:
                schema_info["tags"] = tag_response.json().get("tag_assignments", [])
            else:
                schema_info["tags"] = []
                print(f"! Failed to fetch tags for {full_schema_name}: {tag_response.status_code}")

            all_schemas.append(schema_info)

        return all_schemas

    @staticmethod
    def _normalize_tag_map(tag_list):
        tag_map = {}
        for t in tag_list or []:
            k = t.get("key") or t.get("tag_key") or t.get("name")
            v = t.get("value") or t.get("tag_value")
            if k is not None:
                tag_map[str(k)] = v
        return tag_map

    @staticmethod
    def group_schemas_by_domain(schemas_with_tags):
        grouped = defaultdict(lambda: defaultdict(list))

        for s in schemas_with_tags:
            catalog = s.get("catalog_name", "")
            schema_name = s.get("name", "")
            full_name = f"{catalog}.{schema_name}" if catalog and schema_name else s.get("entity_name", schema_name)

            tag_map = Utils._normalize_tag_map(s.get("tags", []))
            domain = tag_map.get("domain", "undefined_domain")
            # subdomain = tag_map.get("subdomain", "undefined_subdomain")
            subdomain = tag_map.get("subdomain", "")

            grouped[domain][subdomain].append(full_name)

        return {d: dict(sub) for d, sub in grouped.items()}

    # ---------------------------
    # Fabric utils
    # ---------------------------

    @staticmethod
    def _group_fullnames_by_catalog(fullnames):
        by_cat = defaultdict(list)
        for full in fullnames:
            if not full or "." not in full:
                raise ValueError(f"Bad schema fullname: {full!r} (expected 'catalog.schema')")
            cat, sch = full.split(".", 1)
            by_cat[cat].append(sch)
        return {c: sorted(set(schs)) for c, schs in by_cat.items()}

    @staticmethod
    def _get_fabric_token(audience="https://analysis.windows.net/powerbi/api"):
        from notebookutils import credentials
        return credentials.getToken(audience)

    @staticmethod
    def create_uc_mirroring(fabric_config, grouped_by_domain, auto_sync=True, create_if_missing=True):
        workspace_id = fabric_config["workspace_id"]
        adls_connection_id = fabric_config["adls_connection_id"]  # kept (even if unused)
        dbx_workspace_connection_id = fabric_config["dbx_workspace_connection_id"]

        access_token = fabric_config.get("access_token") or Utils._get_fabric_token()
        base_uri = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/mirroredAzureDatabricksCatalogs"

        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        results = []
        for domain, sub_map in (grouped_by_domain or {}).items():
            for subdomain, fullnames in (sub_map or {}).items():
                by_catalog = Utils._group_fullnames_by_catalog(fullnames)

                for catalog_name, schema_names in by_catalog.items():
                    #display_name = f"{domain}_{subdomain}"
                    display_name = f"{domain}{subdomain}"

                    payload = {
                        "$schema": "https://developer.microsoft.com/json-schemas/fabric/item/mirroredAzureDatabricksCatalog/definition/mirroredAzureDatabricksCatalogDefinition/1.0.0/schema.json",
                        "catalogName": catalog_name,
                        "databricksWorkspaceConnectionId": dbx_workspace_connection_id,
                        "mirroringMode": "Partial",
                        "autoSync": "Disabled",
                        "mirrorConfiguration": {
                            "schemas": [{"name": s, "mirroringMode": "Full"} for s in schema_names]
                        },
                    }

                    json_payload = json.dumps(payload)
                    encoded_content = base64.b64encode(json_payload.encode("utf-8")).decode("utf-8")

                    payload_dict = {
                        "displayName": f"{display_name}",
                        "description": "Auto-created",
                        "definition": {
                            "parts": [
                                {
                                    "path": "definition.json",
                                    "payload": encoded_content,
                                    "payloadType": "InlineBase64",
                                }
                            ]
                        },
                    }

                    json_payload = json.dumps(payload_dict)

                    resp = requests.post(base_uri, headers=headers, data=json_payload)
                    # handle responses              
                    respnse_json = resp.json()

                    if resp.status_code in (200, 201, 202):
                        results.append(resp.json())
                        print(f"∟ Item created for {display_name}. Catalog: {catalog_name} | Schemas: {schema_names}")
                    else:
                        error = respnse_json.get('errorCode')
                        message = respnse_json.get('message')
                        print(f"∟ Something went wrong in creating the mirrored items for item {display_name}\n  Catalog: {catalog_name} | Schemas: {schema_names}, response: {resp.status_code} with errorCode: {error} and message: {message} \n  -- check prereqs -- ")

        return results

#### Check UC and create Databricks mirrored item code

In [None]:
def sync_dbx_uc_schemas_to_fabric(databricks_config, fabric_config):
    print("Checking Unity Catalog schemas and tags...")
    schemas = Utils.get_dbx_uc_schemas(databricks_config)
    schemas_by_domain_subdomain = Utils.group_schemas_by_domain(schemas)
    print(f"∟ Tags/schemas found: {schemas_by_domain_subdomain}")
    print("Creating Unity Catalog mirroring items...")
    Utils.create_uc_mirroring(fabric_config, schemas_by_domain_subdomain)

#### Check UC and create Databricks mirrored item with config

In [None]:
databricks_config = {
    'dbx_workspace': dbx_workspace,
    'dbx_token': dbx_token,
    'dbx_uc_catalog': dbx_uc_catalog,
    'dbx_uc_schemas': json.loads(dbx_uc_schemas)
}

fabric_config = {
    'workspace_id': fab_workspace_id,
    'adls_connection_id': fab_adls_connection_id,
    "dbx_workspace_connection_id": fab_dbx_workspace_connection_id
}

sync_dbx_uc_schemas_to_fabric(databricks_config, fabric_config)