# **Bi-directional integration between Databricks Unity Catalog and Microsoft Purview**
### Discoverability and interoperability samples notebook

![image](https://cdn-images-1.medium.com/max/1000/1*3Kw_fP46Wu64rwOkCWVMcA.png)

__Changes:__

* [November 05, 2024] Initial release

__Requirements:__

* DBR 13.3+
* Python 3.7+

__Resources:__

* [PyApacheAtlas](https://github.com/wjohnson/pyapacheatlas) | [Documentation](https://wjohnson.github.io/pyapacheatlas-docs/latest/)
* [Azure Purview Data Map SDK for Python](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-purview-catalog/1.0.0b4/index.html)

---

_Author:_ Dave Geyer | <dave.geyer@databricks.com>     
_Last Modified:_ November 05, 2024

***

This notebook accompanies the following blog post on Medium: [Bi-directional integration between Databricks Unity Catalog and Microsoft Purview](https://medium.com/p/79452911f2f5)

### 0. Introduction
Azure Databricks Unity Catalog and Microsoft Purview together offer a comprehensive framework for data cataloging and governance. Through programmatic interaction with these platforms, we can go beyond basic discoverability to perform tasks like bulk loading and creating custom lineage.

This notebook provides a guide on how to use Microsoft Purview with Azure Databricks Unity Catalog from within an Azure Databricks Notebook. This guide covers various aspects such as:
* Initial setup and authentication for Azure Databricks and Microsoft Purview.
* Utilizing PyApacheAtlas and the Microsoft Purview Data Map SDK for data asset exploration and searching.
* Synchronizing data classifications between Microsoft Purview and Azure Databricks Unity Catalog
* Enhancing Microsoft Purview with AI-generated table and column descriptions from Azure Databricks Unity Catalog.


From the [Microsoft Purview documentation](https://medium.com/r/?url=https%3A%2F%2Flearn.microsoft.com%2Fen-us%2Fpurview%2Ftutorial-custom-types), an asset is a metadata element that describes a digital or physical resource. Microsoft Purview's flexible type system, based on [Apache Atlas](https://medium.com/r/?url=https%3A%2F%2Fatlas.apache.org%2F2.0.0%2FTypeSystem.html), allows for an expansive definition of assets, including databases, files, business policies, and more. By recognizing the properties and inheritance of these types, users can customize and extend these definitions to suit emerging business needs.

All metadata objects (assets) managed by Microsoft Purview are modeled using type definitions. Understanding the Type System is fundamental to creating new custom types in Microsoft Purview.

Essentially, a Type can be seen as a Class from Object Oriented Programming (OOP):
* It defines the properties that represent that type.
* Each type is uniquely identified by its name.
* A type can inherit from a superType. This is an equivalent concept as inheritance from OOP. A type that extends a superType will inherit the attributes of the superType.

Apache Atlas has a few pre-defined system types that are commonly used as superTypes. For example:
* **Referenceable**: This type represents all entities that can be searched for using a unique attribute called qualifiedName.
* **Asset**: This type extends from Referenceable and has other attributes such as name, description, and owner.
* **DataSet**: This type extends Referenceable and Asset. Conceptually, it can be used to represent a type that stores data. Types that extend DataSet can be expected to have a Schema. For example, a SQL table.
* **Lineage**: Lineage information helps one understand the origin of data and the transformations it may have gone through before arriving in a file or table. Lineage is calculated through DataSet and Process: DataSets (input of process) impact some other DataSets (output of process) through Process.

![image](https://cdn-images-1.medium.com/max/1000/0*UHC5GX4ZRUQ5o5jT.png)

In this notebook, we will focus on the DataSet superType and will not cover asset relationships or how to define custom entity types, such as ML models or Databricks Volumes. I plan to cover that in a future post.

### 1. Getting started
To start, we must initialize the necessary libraries and authenticate with Azure to ensure that our environment is ready for the following steps. All code is run within an Azure Databricks Notebook.

Please note, you must use DBR 13.3+, and Python 3.7+.

#### 1a. Initialize libraries
We will need the following libraries:
1. [_azure-identity_](https://pypi.org/project/azure-identity/) - The Azure Identity library provides Microsoft Entra ID (formerly Azure Active Directory) token authentication support across the Azure SDK.
2. [_pyapacheatlas_](https://pypi.org/project/pyapacheatlas/) - PyApacheAtlas lets you work with the Azure Purview and Apache Atlas APIs in a Pythonic way. Supporting bulk loading, custom lineage, custom type definition and more from an SDK and Excel templates / integration.
3. [_azure-purview-account_](https://pypi.org/project/azure-purview-account/) - Azure Purview Account client library for Python
4. [_azure-purview-datamap_](https://pypi.org/project/azure-purview-datamap/) - Microsoft Purview Data Map provides the foundation for data discovery and data governance. Microsoft Purview Data Map is a cloud native PaaS service that captures metadata about enterprise data present in analytics and operation systems on-premises and cloud. 

In [0]:
pip install azure-identity pyapacheatlas azure-purview-account azure-purview-datamap

#### 1b. Authentication

Next, we will define the credentials used to authenticate into Azure. PyApacheAtlas and the Purview Data Map SDK have different authentication functions, resulting in the creation of two separate credentials.

In [0]:
from pyapacheatlas.auth import ServicePrincipalAuthentication
from azure.identity import ClientSecretCredential

def authenticate(tenant_id, client_id, client_secret):
    atlas_credential = ServicePrincipalAuthentication(tenant_id, client_id, client_secret)
    datamap_credential = ClientSecretCredential(tenant_id, client_id, client_secret)
    return atlas_credential, datamap_credential

credentials = authenticate(
    dbutils.secrets.get("scope_name", "tenant_id"),
    dbutils.secrets.get("scope_name", "client_id"),
    dbutils.secrets.get("scope_name", "client_secret")
)

Learn more about secret management in the Azure Databricks documentation: [Secret management](https://medium.com/r/?url=https%3A%2F%2Flearn.microsoft.com%2Fen-us%2Fazure%2Fdatabricks%2Fsecurity%2Fsecrets%2F).

#### 1c. Create clients

Next, we will create three separate clients to build our requests and send these requests to Purview using the `send_request` method.

We will create three clients:
1. [Purview Client](https://wjohnson.github.io/pyapacheatlas-docs/latest/core/client/purviewclient.html) - PyApacheAtlas - Explore type definitions
2. [Purview Discovery Client](https://wjohnson.github.io/pyapacheatlas-docs/latest/core/api/pyapacheatlas.core.discovery.purview.PurviewDiscoveryClient.html#pyapacheatlas.core.discovery.purview.PurviewDiscoveryClient) - PyApacheAtlas - Search and browse assets
3. [Purview Datamap Client](https://learn.microsoft.com/en-us/rest/api/purview/datamapdataplane/operation-groups?view=rest-purview-datamapdataplane-2023-09-01) - Azure Purview Datamap SDK - Search and browse assets

In [0]:
from pyapacheatlas.core import PurviewClient
from pyapacheatlas.core.discovery import PurviewDiscoveryClient
from azure.purview.datamap import DataMapClient

account_name = "purviewUC"

atlas_client = PurviewClient(account_name, authentication=credentials[0])
discovery_client = PurviewDiscoveryClient(f"https://{account_name}.purview.azure.com/catalog/api", authentication=credentials[0])
datamap_client = DataMapClient(f"https://{account_name}.purview.azure.com", credential=credentials[1])

### 2. Exploring and searching data assets

#### 2a. PyApacheAtlas type definition retreival
Let's start by reviewing the definitions for the different entity types in Microsoft Purview. This understanding is important for efficiently using the APIs available to us.

First, we will get all type definitions from Purview by calling the PyApacheAtlas client's `get_all_typedefs()` method and flatten the results into a single-level dictionary.

In [0]:
def get_type_definitions(client):
    all_types = atlas_client.get_all_typedefs()
    return [{key: value for key, value in flatten_dict(type_def).items()} for type_def in all_types.get("entityDefs", [])]

def flatten_dict(d, parent_key='', sep='.'):
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

get_type_definitions(atlas_client)

The schema for entity types is shown below:

In [None]:
root
 |-- attributeDefs: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- businessAttributeDefs.PurviewDataQuality: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- category: string (nullable = true)
 |-- createTime: long (nullable = true)
 |-- createdBy: string (nullable = true)
 |-- description: string (nullable = true)
 |-- guid: string (nullable = true)
 |-- lastModifiedTS: string (nullable = true)
 |-- name: string (nullable = true)
 |-- options.purviewEntityExtDef: string (nullable = true)
 |-- options.schemaAttributes: string (nullable = true)
 |-- relationshipAttributeDefs: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- serviceType: string (nullable = true)
 |-- subTypes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- superTypes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- typeVersion: string (nullable = true)
 |-- updateTime: long (nullable = true)
 |-- updatedBy: string (nullable = true)
 |-- version: long (nullable = true)
 |-- options.defaultRenderedLineage: string (nullable = true)
 |-- options.schemaElementsAttribute: string (nullable = true)
 |-- options.dataTypeAttribute: string (nullable = true)
 |-- options.derivedLineageSources: string (nullable = true)
 |-- options.searchNameAttribute: string (nullable = true)
 |-- options.schemaElementsAttributesList: string (nullable = true)
 |-- options.displayTextAttribute: string (nullable = true)
 |-- businessAttributeDefs.test: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

Using the type definition schema above, let's look at some of the entity properties:

* **Category** field describes in what category your type is. The list of categories supported by Apache Atlas can be found here.
* **ServiceType** field is useful when browsing assets by source type in Microsoft Purview. The service type will be an entry point to find all assets that belong to the same service type - as defined on their type definition.
* **SuperTypes** describes the "parent" types you want to "inherit" from.
* **schemaElementsAttributes** from options influences what appears in the Schema tab of your asset in Microsoft Purview.
* **relationshipAttributeDefs** are calculated through the relationship type definitions. In our JSON, we can see that schemaElementsAttributes points to the relationship attribute called columns - which is one of elements from relationshipAttributeDefs array, as shown below:

In [None]:
display(spark_df.select("category","serviceType","superTypes","`options.schemaElementsAttribute`","relationshipAttributeDefs"))

![image.png](https://cdn-images-1.medium.com/max/1000/0*Gu43zWP5WfcNc9QT.png)

#### 2b. Retrieve Type Definitions with Purview Data Map SDK
Here's how to replicate the above process using the Microsoft Purview Data Map SDK:

In [0]:
all_type_defs = datamap_client.type_definition.get().as_dict()
entity_defs = all_type_defs.get("enumDefs", [])
flattened_entity_defs = []

for entity_def in entity_defs:
    flattened_def = flatten_dict(entity_def)
    flattened_entity_defs.append(flattened_def)

table_df = pd.DataFrame(flattened_entity_defs)
spark_df = spark.createDataFrame(flattened_entity_defs)

Although you can retrieve entity type definitions using both PyApacheAtlas and the Purview Data Map SDK, the Purview Data Map SDK is simpler to use, and better suited for building custom integrations with Purview.

### 3. PyApacheAtlas Discovery
Now that we have a better understanding of Purview's type system, we will use the PyApacheAtlas Search API to search existing assets.

#### 3a. Search for data assets with PyApacheAtlas
Let's define a search term, this time, "stats", which will look for any entity with the term "stats" in it. We'll flatten the response and format the output into a single, user-friendly dataframe.

In [0]:
def process_search_results(search_results):
    entity_defs = list(search_results)
    
    flattened_search_results = []

    for entity_def in entity_defs:
        flattened_def = flatten_dict(entity_def)
        flattened_search_results.append(flattened_def)
    
    spark_df = spark.createDataFrame(flattened_search_results)
    
    return spark_df

In [0]:
search_term = "stats"
search_results = discovery_client.search_entities(query=search_term)
processed_results = process_search_results(search_results)
display(processed_results)

We get the following output, in which we see the entity's description, name, and entity type, among other things:

![image.png](https://cdn-images-1.medium.com/max/1000/0*sBnWAGvAkxJXUFPy.png)

#### 3b. Browse Data Assets with PyApacheAtlas
Similar to the search functionality, we can also browse entities across different facets within Microsoft Purview. In this case, let's look at all entities that are classified as a databricks_view.

In [0]:
def process_browse_results(browse_results):
    entities = browse_results
    flattened_data = [flatten_dict(entity) for entity in entities]
    df = pd.DataFrame(flattened_data)
    return df

In [None]:
entityType = 'databricks_view'
browse_results = discovery_client.browse(entityType=entityType, api_version='2022-03-01-preview')
browse_results_filtered = browse_results["value"]
processed_results = process_browse_results(browse_results_filtered)
display(processed_results)

We get the following output, showing all of the different registered views:

![image.png](https://cdn-images-1.medium.com/max/1000/0*IGMNl5wl_UJVnQel.png)

### 4. Discovery with Purview Data Map Client SDK
Using the Purview Data Map Client SDK, we can use a single "query" method to search for/browse our assets (instead of the two separate queries that we used with PyApacheAtlas). This time, let's look for entities with the keyword "orders," although you can define a search body that combines multiple search parameters into a single query.

In [0]:
def flatten_dict(d, parent_key='', sep='.'):
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list) or isinstance(v, tuple):
            for i, item in enumerate(v):
                new_item_key = f"{new_key}{sep}{i}" if new_key else str(i)
                items.extend(flatten_dict({i: item}, new_item_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
  

In [0]:
body = {
  "keywords": "orders",
  "limit": 1000
}

In [0]:
search_results = datamap_client.discovery.query(body)
search_results_values = list(search_results.value)

flattened_search_results = [flatten_dict(entity) for entity in search_results_values]
flattened_df = spark.createDataFrame(flattened_search_results)
display(flattened_df)

We get the following output:

![image](https://cdn-images-1.medium.com/max/1000/1*XchqUceMtDt64UMiTLO1RA.png)

We can see from the results that we have multiple tables with the same name - from a discoverability standpoint, how can we differentiate whether this is a function of poor data quality or due to different teams needing similar table names? We have a couple of approaches.

First, the results have unique identifiers that we can use to distinguish between entities. Additionally, not shown above, the entities belong to different collections and are represented by a unique identifier.

We can modify our query to search for entities "orders", and also that belong to a specific `collectionId`. The `collectionId` can be extracted from the URL by navigating to the collection in your browser. The URL has the format of: `https://purview.microsoft.com/datamap/governance/main/datasource/domains?tid=<domainID>&collection=<collectionId>`

In [0]:
body = {
  "keywords": "orders",
  "limit": 1000,
  "filter": {
    "collectionId": "nqddbh"
  }
}

Much better! Let's take a look at the output:

![image](https://cdn-images-1.medium.com/max/1000/1*ZkFl9CDrZJY2qJnLohpHaw.png)

### 5. Synchronizing Data Classifications and Tags

#### 5a. Pull Purview Data Classifications
Improving discoverability in Purview by adding additional business attributes, such as classifications for different entities, can significantly enhance metadata quality in Unity Catalog. This enhancement not only makes data easier to find but also improves the semantic engine within Databricks. These advancements result in increased productivity for your team and strengthen your governance practices.

Let's begin by examining the table of interest in its current state within the Unity Catalog Explorer UI. We can see that the table currently has no associated tags.

![image](https://cdn-images-1.medium.com/max/1000/0*f1GrbujaCdxsVA-T.png)

Let's start by extracting out all entities where we have a classification.

In [0]:
update_classification = flattened_df.where("`classification.0.0` IS NOT NULL")
result = datamap_client.entity.get(guid='guid').entity
flattened_result = flatten_dict(result)

Next, let's define a User-Defined Function (UDF) that will allow us to extract the entity attributes from Purview, and apply this to many entities at once.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, MapType
from pyspark.sql.functions import udf

@udf(returnType=MapType(StringType(), StringType()))
def fetch_and_flatten(guid):
    result = datamapClient.entity.get(guid=guid).entity
    flattened_result = flatten_dict(result)
    return flattened_result

flattened_df = update_classification.withColumn("flattened_data", fetch_and_flatten(update_classification.id))

merged_keys = flattened_df.select("flattened_data").first()[0].keys()

schema = StructType([
StructField(key, StringType(), True) for key in merged_keys
])

final_df = flattened_df.rdd.map(lambda x: tuple(x.flattened_data.values())).toDF(schema)

combined_df = update_classification.join(final_df, update_classification.id == final_df.guid)

display(combined_df)

We also add a column that concatenates the table catalog, schema, and table name in order to define the fully qualified name, which will help us apply multiple tags at once.

In [None]:
from pyspark.sql.functions import col, concat, lit

filtered_df = combined_df.select("`attributes.catalogName`", "`attributes.schemaName`", "name", "`classification.0.0`", "`attributes.owner`", "`attributes.tableType`")

# Concatenate attributes.catalogName, attributes.schemaName, and name with "." character
concatenated_col = concat(col("`attributes.catalogName`"), lit("."), col("`attributes.schemaName`"), lit("."), col("name"))
filtered_df_with_concat = filtered_df.withColumn("fq_name", concatenated_col)

Iterate through and apply tags to tables or views within UC using the classification of the entity from Purview. This can also be extended to column-level comments.

In [0]:
for row in filtered_df_with_concat.collect():
    fq_name = row["fq_name"]
    classification_value = row["classification.0.0"].split(".")[-1]
    obj_type = row["attributes.tableType"]

    if obj_type == "MANAGED":
        spark.sql(f"ALTER TABLE {fq_name} SET TAGS ('Purview' = '{classification_value}')")
    else:
        print(f"Skipping {fq_name} because it's not a table or a view")

After the command finishes, go back and review the table in the Catalog Explorer UI. We can see that the tags were automatically added to the table.

![image](https://cdn-images-1.medium.com/max/1000/0*y9zJLh1tUy60AunH.png)

---

### 6. Update Microsoft Purview Unity Catalog with AI-Generated Comments

Adding comments and documentation to your enterprise data is a thankless task but when your organization's tables are sparsely documented, both humans and AI struggle to find the right data for accurately answering your data questions. [AI-generated comments](https://medium.com/r/?url=https%3A%2F%2Fdocs.databricks.com%2Fen%2Fcatalog-explorer%2Fai-comments.html) address this by automating the manual process of adding descriptions to tables and columns through the magic of generative AI.

We recently made significant improvements to the underlying algorithms supporting [AI-generated comments](https://medium.com/r/?url=https%3A%2F%2Fdocs.databricks.com%2Fen%2Fcatalog-explorer%2Fai-comments.html) in Unity Catalog. Learn more about this update: [How we improved DatabricksIQ LLM quality for AI-generated table comments](https://medium.com/r/?url=https%3A%2F%2Fwww.databricks.com%2Fblog%2Fhow-we-improved-databricksiq-llm-quality-ai-generated-table-comments).

Currently, the integration between Unity Catalog and Purview is a "pull" mechanism, in which Purview must scan Unity Catalog for any changes. However, Microsoft Purview does not currently support [incremental data scans](https://medium.com/r/?url=https%3A%2F%2Flearn.microsoft.com%2Fen-us%2Fpurview%2Fregister-scan-azure-databricks-unity-catalog) from Unity Catalog - only full scans.

We can take advantage of the Microsoft Purview APIs to "push" ad-hoc changes back to Purview without doing a full data scan.

#### 6a. Update table comments
We can also see that the current state of the table in Purview has no table description. Let's change this! We will use Unity Catalog's AI-generated table comment to update the Purview asset. Accept the recommendation in the UI, and then, let's update the entity in Purview to resemble these changes. In the future, this should be something that can be done via API.

Unity Catalog             |  Purview
:-------------------------:|:-------------------------:
![](https://imgur.com/ekFI081.png)  |  ![](https://imgur.com/ePv4IxZ.png)

You should start by fetching the table comment from Unity Catalog and passing it to the Purview API.

In [0]:
from pyspark.sql.functions import col
table_def=spark.sql("DESCRIBE EXTENDED davegeyer.iot_platform.turbine_training_dataset_ml")
table_comment = table_def.filter(col("col_name") == "Comment").select("data_type").first()[0]
table_comment = str(table_comment)

In [0]:
from azure.core.exceptions import HttpResponseError

fqn = "databricks://<entity-guid>/catalogs/<catalog-name>/schemas/<schema-name>/tables/<table-name>"

entity_input = {
  "referredEntities": {},
  "entity": {
    "typeName": "databricks_table",
    "attributes": {
        'qualifiedName': f"{fqn}",
        'name': '<table-name>',
        "userDescription": table_comment,
    }
  }
}

response = datamapClient.entity.create_or_update(body=entity_input)

Let's check Purview again. The asset description now matches the description in Unity Catalog!

![Table comment updated](https://imgur.com/V7AqGZ4.png)

---

#### 6b. Update column comments
We can take the same approach to update the column descriptions of a table within Purview. Let's use the AI-generated column comments in Unity Catalog, and push these updates to Purview.

Unity Catalog             |  Purview
:-------------------------:|:-------------------------:
![](https://imgur.com/9WA5njE.png)  |  ![](https://imgur.com/FOWSS8J.png)

This time, let's extract all the column comments using `system.information_schema`.

In [0]:
table_catalog = 'davegeyer'
table_schema = 'iot_platform'
table_name = 'turbine_training_dataset_ml'

columnComments = spark.sql(f"SELECT * FROM system.information_schema.columns WHERE table_catalog = '{table_catalog}' AND table_schema = '{table_schema}' AND table_name = '{table_name}'")

In [0]:
fqtable_name = "databricks://<entity-id>/catalogs/davegeyer/schemas/iot_platform/tables/turbine_training_dataset_ml"
columnComments = columnComments.withColumn("column_qn", concat(lit(fqtable_name + "/columns/"), col("column_name")))
columnCommentsDict = columnComments.select("column_qn", "comment", "column_name").toPandas().set_index("column_qn").to_dict(orient="index")

In [0]:
for column_qn, values in columnCommentsDict.items():
    body = {
        "referredEntities": {},
        "entity": {
            "typeName": "databricks_table_column",
            "attributes": {
                "qualifiedName": column_qn,
                "name": values["column_name"], 
                "userDescription": values["comment"],
            }
        }
    }
    response = datamapClient.entity.create_or_update(body=body)

Refresh Purview, and take another look at the column descriptions. Woohoo! It's updated.

![Column comments updated](https://imgur.com/dN6KT9a.png)

---

### 7. Conclusion
In this notebook, we saw how to easily search for data assets stored within Microsoft Purview, synchronize classifications with Azure Databricks Unity Catalog, and push incremental changes back to Purview to enhance and maintain the most up-to-date metadata for your data assets.
The integration of Purview and Unity Catalog is getting better (Lineage was added in October 2024), but there are still times when using these APIs is the way to go. For example: 
* **Existing Purview Users:** If your organization already uses Purview and you want to add Databricks as a data source, you may want to use the APIs to incorporate into your data stewardship and metadata curation process.
* **Syncing Tags to Unity Catalog:** For those with existing tags and/or business glossaries in Purview, using APIs to sync tags to Unity Catalog is essential. These tag integration benefits include data discoverability, cost management and attribution, and governance.

Have any comments or suggestions? Please reach out and let me know.

Note: The views/opinions expressed in the blog are my own and do not necessarily represent the views/opinions of Databricks.

---