# Notebook to translate comments form lake database to purview
## When some of the code will be used for other Entity types, we must replace the entityType definitions


In [1]:
import os
import requests
import json
import jmespath
import pandas as pd

from notebookutils import mssparkutils
from pprint import pprint

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

import random, string
import datetime as dtime
import re
import time
 
purviewname = '<purviewname>'
lakedatabase = '<lakedbname>'
prefix = "https:\/\/{}.purview.azure.com".format(purviewname)

tenantid="xxxxx-xxx-xxxx-xxxxx-xxxxxxxxxx"
clientid="xxxxx-xxx-xxxx-xxxxx-xxxxxxxxxx"
secret=TokenLibrary.getSecret('<keyvaultname>', '<secretname>')

def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str):
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
    payload= f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}'
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
        }
    response = requests.request("POST", url, headers=headers, data=payload)
    access_token = json.loads(response.text)['access_token']
    return access_token

In [2]:
def getguid(typeName: str, assetfqn: str, azuread_access_token: str):
    prefix = f"https://{purviewname}.catalog.purview.azure.com/api/atlas/v2/entity/uniqueAttribute/type"
    url = f"{prefix}/{typeName}?attr:qualifiedName={assetfqn}&ignoreRelationships=True"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    response = requests.request("GET", url, headers=headers)
    print(response.text)
    guid = json.loads(response.text)['entity']['guid']
    return guid

def set_desc(assettype: str, assetfqn: str, description: str, azuread_access_token: str):
    guid = getguid(assettype,assetfqn,azuread_access_token)
    #url = f"https://{purviewname}.catalog.purview.azure.com/api/atlas/v2/entity/guid/{guid}?description={description}"
    url = f"https://{purviewname}.catalog.purview.azure.com/api/atlas/v2/entity/guid/{guid}?name=description"
    print(url)
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    response = requests.request("PUT", url, headers=headers, data= f'"{description}"')
    li = json.loads(response.text)
    return li

typename = "azure_synapse_serverless_sql_table"
assetfqn = "mssql://<workspase>-ondemand.sql.azuresynapse.net/<schema>/dbo/<table>"
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
#result = set_desc(typename, assetfqn,"channel description stuff", token)
#result




In [3]:
def getcolumns(typeName: str, assetfqn: str, azuread_access_token: str):
    prefix = f"https://{purviewname}.catalog.purview.azure.com/api/atlas/v2/entity/uniqueAttribute/type"
    url = f"{prefix}/{typeName}?attr:qualifiedName={assetfqn}"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    response = requests.request("GET", url, headers=headers)
    result = json.loads(response.text)['referredEntities'].values()
    mycolumns = []
    for column in result:
        item = {}
        item['typename'] = column['typeName']
        item['name'] = column['attributes']['name']
        item['guid'] = column['guid']
        mycolumns.append(item)
    mycolumns = pd.DataFrame.from_dict(mycolumns)
    return mycolumns
def getcolguid(myname,columns):
    guid = columns[columns['name']==myname]['guid'].tolist()
    return guid[0]
def col_desc(columns, name: str, description: str, azuread_access_token: str):
    guid = getcolguid(name,columns)
    url = f"https://{purviewname}.catalog.purview.azure.com/api/atlas/v2/entity/guid/{guid}?name=description"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    response = requests.request("PUT", url, headers=headers, data= f'"{description}"')
    li = json.loads(response.text)
    return li
typename = "azure_synapse_serverless_sql_table"
assetfqn = "mssql://<workspase>-ondemand.sql.azuresynapse.net/<schema>/dbo/<table>"
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
columns = getcolumns(typename,assetfqn,token)
print(columns)
#·guid = getcolguid('CommunicationId',columns)
#guid

In [4]:
syn_workspace = mssparkutils.env.getWorkspaceName()
baseurl="https://"+syn_workspace+".dev.azuresynapse.net"
headers = {"Authorization": "Bearer "+mssparkutils.credentials.getToken("Synapse")}
constructed_url = baseurl + "/databases/"+lakedatabase+"/tables?api-version=2021-04-01"
response = requests.get(constructed_url, headers=headers)
response = json.loads(response.text)
#print(response)
for item in response['items']:
    tablename=item['name']
    description=item['properties']['Properties']['Description']
    description = description.replace("\n","\\n")
    typename = "azure_synapse_serverless_sql_table"
    assetfqn = f"mssql://{syn_workspace}-ondemand.sql.azuresynapse.net/{lakedatabase}/dbo/{tablename}"
    token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
    result = set_desc(typename, assetfqn,description,token)
    dfcolumns = getcolumns(typename, assetfqn,token)
    if dfcolumns.shape[0] > 0:
        for column in item['properties']['StorageDescriptor']['Columns']:
            columname = column["Name"]
            description = column["Description"]
            description = description.replace("\n","\\n")
            typename = "azure_synapse_serverless_sql_column"
            result = col_desc(dfcolumns, columname, description, token)
        print(tablename)
    else:
        print(f"No columns/schema found in purview for table {tablename}")
    #print(result)


In [40]:
def get_entity(typeName: str, assetfqn: str, azuread_access_token: str):
    prefix = f"https://{purviewname}.catalog.purview.azure.com/api/atlas/v2/entity/uniqueAttribute/type"
    url = f"{prefix}/{typeName}?attr:qualifiedName={assetfqn}&ignoreRelationships=True"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    response = requests.request("GET", url, headers=headers)
    guid = json.loads(response.text)['entity']
    return guid
typename = "azure_synapse_serverless_sql_table"
assetfqn = "mssql://datamasked-ondemand.sql.azuresynapse.net/Siniestros_MPF/dbo/Channel"
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
result = get_entity(typename, assetfqn, token)
result

In [2]:
def get_all_assets(path: str, data_catalog_name: str, azuread_access_token: str, max_depth=1):
    """
    Retrieves all scanned assets for the specified ADLS Storage Account Container.
    Note: this function intentionally recursively traverses until only assets remain (i.e. no folders are returned, only files).
    """
    # List all files in path 
    # The API chooses between type and path, being prioritized on the fitsr 
    url = f"https://{data_catalog_name}.catalog.purview.azure.com/api/browse?api-version=2021-05-01-preview"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    payload="""{"limit": 100,
                "offset": null,
                "path": "%s"
                }""" % (path)
    response = requests.request("POST", url, headers=headers, data=payload)
    li = json.loads(response.text)
    #print(li)
    # Return all files
    for x in jmespath.search("value", li):
        if jmespath.search("isLeaf", x):
            yield x
    # If the max_depth has not been reached, start
    # listing files and folders in subdirectories
    if max_depth > 1:
        for x in jmespath.search("value", li):
            if jmespath.search("isLeaf", x):
                continue
            for y in get_all_assets(jmespath.search("path", x), data_catalog_name, azuread_access_token, max_depth - 1):
                yield y
    # If max_depth has been reached,
    # return the folders
    else:
        for x in jmespath.search("value", li):
            if jmespath.search("!isLeaf", x):
                yield x

#top_path = f"/azure_storage_account#{storage_account}.core.windows.net/azure_datalake_gen2_service#{storage_account}.dfs.core.windows.net/azure_datalake_gen2_filesystem#{container}"
#root = 'abfss://%s@%s.dfs.core.windows.net/' % (container, storage_account)
# For lake databases: azure_synapse_serverless_sql_column

In [3]:
def get_all_type_assets(type: str, data_catalog_name: str, azuread_access_token: str, max_depth=1):
    """
    Retrieves all scanned assets for the specified ADLS Storage Account Container.
    Note: this function intentionally recursively traverses until only assets remain (i.e. no folders are returned, only files).
    """
    # List all files in path 
    # The API chooses between type and path, being prioritized on the fitsr 
    url = f"https://{data_catalog_name}.catalog.purview.azure.com/api/browse?api-version=2021-05-01-preview"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    payload="""{"entityType": "%s",
                "limit": 100,
                "offset": null
                }""" % (type)
    response = requests.request("POST", url, headers=headers, data=payload)
    li = json.loads(response.text)
    print(li)
    # Return all files
    for x in jmespath.search("value", li):
        if jmespath.search("isLeaf", x):
            yield x
    # If the max_depth has not been reached, start
    # listing files and folders in subdirectories
    if max_depth > 1:
        for x in jmespath.search("value", li):
            if jmespath.search("isLeaf", x):
                continue
            for y in get_all_type_assets(jmespath.search("path", x), data_catalog_name, azuread_access_token, max_depth - 1):
                yield y
    # If max_depth has been reached,
    # return the folders
    else:
        for x in jmespath.search("value", li):
            if jmespath.search("!isLeaf", x):
                yield x
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
items2=get_all_type_assets("azure_sql_column", purviewname,token,max_depth=1)
mydict2 = []
for item in items2:
    mydict2.append(item)
tables=spark.createDataFrame(mydict2)
display(tables)

In [None]:
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
items3=get_all_assets("https://sa4purview.dfs.core.windows.net/datalake/serverless/tpcdsext.customer_address_{N}.parquet#__tabular_schema//ca_address_id", purviewname,token,max_depth=6)
mydict3 = []
for item in items3:
    mydict3.append(item)
tables=spark.createDataFrame(mydict3)
display(tables)

In [None]:
# This function returns number of columsn in purview
def get_schema(guid):
    url = f"https://{purviewname}.purview.azure.com/catalog/api/atlas/v2/entity/guid/"+guid
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
        }      
    response = requests.request("GET", url, headers=headers).json()
    #print(response)
    return len(response["referredEntities"])

# Now lets create a table with the table name and columns and then one with empty columns
column_nb = []    
for table in tables.select('name','id').collect():
    column_nb.append({"name":table.name,"columns":get_schema(table.id)})
columndf = spark.createDataFrame(column_nb)
emptytables = columndf.where(col('columns')==0).join(tables,on='name')
display(emptytables)

In [None]:
for asset in result2.collect():
    if asset['entityType']=='azure_datalake_gen2_path' and asset['name'].endswith('.csv') :
        items3=get_all_assets(asset['path']+'#__tabular_schema', purviewname,token,max_depth=6)
        for item3 in items3:
            print(item3)

In [None]:
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
def get_schema(guid,token):
    url = f"https://{purviewname}.purview.azure.com/catalog/api/atlas/v2/entity/guid/"+guid
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
        }      
    response = requests.request("GET", url, headers=headers).json()
    #print(response)
    return response["entity"]['relationshipAttributes']['tabular_schema']
    #['relationshipGuid']
def get_guid(guid,token):
    url = f"https://{purviewname}.purview.azure.com/catalog/api/atlas/v2/entity/guid/"+guid
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
        }      
    response = requests.request("GET", url, headers=headers).json()
    return response["entity"]
myschema=get_schema('6e9ac7fd-aabf-474d-b5ae-97554faa3db4',token)
print(myschema)
mytab=get_guid(myschema['guid'],token)
print(mytab)
