In [None]:
# import necessary modules
import os
import requests
import json
import jmespath
import pandas as pd

# import utility functions and pretty print function from notebookutils module
from notebookutils import mssparkutils
from pprint import pprint

# import necessary classes and functions from pyspark.sql module
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

# import functions for generating random strings, working with dates, and regular expressions
import random, string
import datetime as dtime
import re

# import function for sleeping or delaying execution
import time

 
purviewname = 'amcpurview'
lakedatabase = 'careemtest'
prefix = "https:\/\/{}.purview.azure.com".format(purviewname)

tenantid="xxxxx-xxx-xxxx-xxxxx-xxxxxxxxxx"
clientid="xxxxx-xxx-xxxx-xxxxx-xxxxxxxxxx"
secret=TokenLibrary.getSecret('', '')

def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str):
    # construct the URL for the Azure AD authentication endpoint
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
    
    # create the payload for the request
    payload = f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}'
    
    # create the headers for the request
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
        }
    
    # send the request and get the response
    response = requests.request("POST", url, headers=headers, data=payload)
    
    # extract the access token from the response
    access_token = json.loads(response.text)['access_token']
    
    # return the access token
    return access_token


def getguid(typeName: str, assetfqn: str, azuread_access_token: str):
    # construct the prefix for the URL
    prefix = f"https://{amcpurview}.catalog.purview.azure.com/api/atlas/v2/entity/uniqueAttribute/type"
    
    # create the full URL for the request
    url = f"{prefix}/{typeName}?attr:qualifiedName={assetfqn}&ignoreRelationships=True"
    
    # create the headers for the request
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    
    # send the request and get the response
    response = requests.request("GET", url, headers=headers)
    
    # print the response for debugging purposes
    print(response.text)
    
    # extract the guid from the response
    guid = json.loads(response.text)['entity']['guid']
    
    # return the guid
    return guid


def set_desc(assettype: str, assetfqn: str, description: str, azuread_access_token: str):
    # get the guid for the asset
    guid = getguid(assettype, assetfqn, azuread_access_token)
    
    # construct the URL for the request
    url = f"https://{amcpurview}.catalog.purview.azure.com/api/atlas/v2/entity/guid/{guid}?name=description"
    
    # print the URL for debugging purposes
    print(url)
    
    # create the headers for the request
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    
    # send the request and get the response
    response = requests.request("PUT", url, headers=headers, data= f'"{description}"')
    
    # parse the response into a dictionary
    li = json.loads(response.text)
    
    # return the dictionary
    return li


# example values for the variables
typename = "azure_synapse_serverless_sql_table"
assetfqn = "mssql://-ondemand.sql.azuresynapse.net//dbo/"

# get an Azure AD access token
token = azuread_auth(tenantid, clientid, secret, "https://purview.azure.net")


def getcolumns(typeName: str, assetfqn: str, azuread_access_token: str):
    # construct the prefix for the URL
    prefix = f"https://{amcpurview}.catalog.purview.azure.com/api/atlas/v2/entity/uniqueAttribute/type"
    
    # create the full URL for the request
    url = f"{prefix}/{typeName}?attr:qualifiedName={assetfqn}"
    
    # create the headers for the request
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    
    # send the request and get the response
    response = requests.request("GET", url, headers=headers)
    
    # parse the response into a dictionary
    result = json.loads(response.text)['referredEntities'].values()
    
    # create an empty list to store the columns
    mycolumns = []
    
    # iterate over the columns in the response
    for column in result:
        # create a dictionary for the current column
        item = {}
        item['typename'] = column['typeName']
        item['name'] = column['attributes']['name']
        item['guid'] = column['guid']
        
        # add the dictionary to the list
        mycolumns.append(item)
    
    # create a DataFrame from the list of dictionaries
    mycolumns = pd


def getcolguid(myname,columns):
    guid = columns[columns['name']==myname]['guid'].tolist()
    return guid[0]
def col_desc(columns, name: str, description: str, azuread_access_token: str):
    guid = getcolguid(name,columns)
    url = f"https://{amcpurview}.catalog.purview.azure.com/api/atlas/v2/entity/guid/{guid}?name=description"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    response = requests.request("PUT", url, headers=headers, data= f'"{description}"')
    li = json.loads(response.text)
    return li
typename = "azure_synapse_serverless_sql_table"
assetfqn = "mssql://-ondemand.sql.azuresynapse.net//dbo/"
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
columns = getcolumns(typename,assetfqn,token)
print(columns)

     
# get the name of the Synapse workspace
syn_workspace = mssparkutils.env.getWorkspaceName()

# construct the base URL for the Synapse API
baseurl = "https://" + syn_workspace + ".dev.azuresynapse.net"

# create the headers for the request
headers = {"Authorization": "Bearer " + mssparkutils.credentials.getToken("Synapse")}

# construct the full URL for the request to get the list of tables
constructed_url = baseurl + "/databases/" + lakedatabase + "/tables?api-version=2021-04-01"

# send the request and get the response
response = requests.get(constructed_url, headers=headers)

# parse the response into a dictionary
response = json.loads(response.text)

# iterate over the tables in the response
for item in response['items']:
    # get the name and description of the current table
    tablename = item['name']
    description = item['properties']['Properties']['Description']
    
    # replace newline characters in the description
    description = description.replace("\n", "\n")
    
    # set the asset type and fully qualified name for the table
    typename = "azure_synapse_serverless_sql_table"
    assetfqn = f"mssql://{syn_workspace}-ondemand.sql.azuresynapse.net/{lakedatabase}/dbo/{tablename}"
    
    # get an Azure AD access token
    token = azuread_auth(tenantid, clientid, secret, "https://purview.azure.net")
    
    # set the description of the table in Purview
    result = set_desc(typename, assetfqn, description, token)
    
    # get the columns of the table in Purview
    dfcolumns = getcolumns(typename, assetfqn, token)
    
    # check if there are any columns in the table
        for column in item['properties']['StorageDescriptor']['Columns']:
            columname = column["Name"]
            description = column["Description"]
            description = description.replace("\n","\n")
            typename = "azure_synapse_serverless_sql_column"
            result = col_desc(dfcolumns, columname, description, token)
        print(tablename)
    else:
        print(f"No columns/schema found in purview for table {tablename}")
    #print(result)



    def get_entity(typeName: str, assetfqn: str, azuread_access_token: str):
    prefix = f"https://{amcpurview}.catalog.purview.azure.com/api/atlas/v2/entity/uniqueAttribute/type"
    url = f"{prefix}/{typeName}?attr:qualifiedName={assetfqn}&ignoreRelationships=True"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    response = requests.request("GET", url, headers=headers)
    guid = json.loads(response.text)['entity']
    return guid
typename = "azure_synapse_serverless_sql_table"
assetfqn = "mssql://datamasked-ondemand.sql.azuresynapse.net/Siniestros_MPF/dbo/Channel"
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
result = get_entity(typename, assetfqn, token)
result
     

     def get_all_assets(path: str, data_catalog_name: str, azuread_access_token: str, max_depth=1):
    """
    Retrieves all scanned assets for the specified ADLS Storage Account Container.
    Note: this function intentionally recursively traverses until only assets remain (i.e. no folders are returned, only files).
    """
    # List all files in path 
    # The API chooses between type and path, being prioritized on the fitsr 
    url = f"https://{data_catalog_name}.catalog.purview.azure.com/api/browse?api-version=2021-05-01-preview"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    payload="""{"limit": 100,
                "offset": null,
                "path": "%s"
                }""" % (path)
    response = requests.request("POST", url, headers=headers, data=payload)
    li = json.loads(response.text)
    #print(li)
    # Return all files
    for x in jmespath.search("value", li):
        if jmespath.search("isLeaf", x):
            yield x
    # If the max_depth has not been reached, start
    # listing files and folders in subdirectories
    if max_depth > 1:
        for x in jmespath.search("value", li):
            if jmespath.search("isLeaf", x):
                continue
            for y in get_all_assets(jmespath.search("path", x), data_catalog_name, azuread_access_token, max_depth - 1):
                yield y
    # If max_depth has been reached,
    # return the folders
    else:
        for x in jmespath.search("value", li):
            if jmespath.search("!isLeaf", x):
                yield x



def get_all_type_assets(type: str, data_catalog_name: str, azuread_access_token: str, max_depth=1):
    """
    Retrieves all scanned assets for the specified ADLS Storage Account Container.
    Note: this function intentionally recursively traverses until only assets remain (i.e. no folders are returned, only files).
    """
    # List all files in path 
    # The API chooses between type and path, being prioritized on the fitsr 
    url = f"https://{data_catalog_name}.catalog.purview.azure.com/api/browse?api-version=2021-05-01-preview"
    headers = {
            'Authorization': f'Bearer {azuread_access_token}',
            'Content-Type': 'application/json'
            }
    payload="""{"entityType": "%s",
                "limit": 100,
                "offset": null
                }""" % (type)
    response = requests.request("POST", url, headers=headers, data=payload)
    li = json.loads(response.text)
    print(li)
    # Return all files
    for x in jmespath.search("value", li):
        if jmespath.search("isLeaf", x):
            yield x
    # If the max_depth has not been reached, start
    # listing files and folders in subdirectories
    if max_depth > 1:
        for x in jmespath.search("value", li):
            if jmespath.search("isLeaf", x):
                continue
            for y in get_all_type_assets(jmespath.search("path", x), data_catalog_name, azuread_access_token, max_depth - 1):
                yield y
    # If max_depth has been reached,
    # return the folders
    else:
        for x in jmespath.search("value", li):
            if jmespath.search("!isLeaf", x):
                yield x
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
items2=get_all_type_assets("azure_sql_column", amcpurview,token,max_depth=1)
mydict2 = []
for item in items2:
    mydict2.append(item)
tables=spark.createDataFrame(mydict2)
display(tables)





token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
items3=get_all_assets("https://sa4purview.dfs.core.windows.net/datalake/serverless/tpcdsext.customer_address_{N}.parquet#__tabular_schema//ca_address_id", purviewname,token,max_depth=6)
mydict3 = []
for item in items3:
    mydict3.append(item)
tables=spark.createDataFrame(mydict3)
display(tables)




# This function returns number of columns in purview
def get_schema(guid):
    url = f"https://{amcpurview}.purview.azure.com/catalog/api/atlas/v2/entity/guid/"+guid
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
        }      
    response = requests.request("GET", url, headers=headers).json()
    #print(response)
    return len(response["referredEntities"])

# Now lets create a table with the table name and columns and then one with empty columns
column_nb = []    
for table in tables.select('name','id').collect():
    column_nb.append({"name":table.name,"columns":get_schema(table.id)})
columndf = spark.createDataFrame(column_nb)
emptytables = columndf.where(col('columns')==0).join(tables,on='name')
display(emptytables)




for asset in result2.collect():
    if asset['entityType']=='azure_datalake_gen2_path' and asset['name'].endswith('.csv') :
        items3=get_all_assets(asset['path']+'#__tabular_schema', amcpurview,token,max_depth=6)
        for item3 in items3:
            print(item3)
     









In [ ]:

     
token = azuread_auth(tenantid,clientid,secret,"https://purview.azure.net")
def get_schema(guid,token):
    url = f"https://{amcpurview}.purview.azure.com/catalog/api/atlas/v2/entity/guid/"+guid
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
        }      
    response = requests.request("GET", url, headers=headers).json()
    #print(response)
    return response["entity"]['relationshipAttributes']['tabular_schema']
    #['relationshipGuid']
def get_guid(guid,token):
    url = f"https://{amcpurview}.purview.azure.com/catalog/api/atlas/v2/entity/guid/"+guid
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
        }      
    response = requests.request("GET", url, headers=headers).json()
    return response["entity"]
myschema=get_schema('6e9ac7fd-aabf-474d-b5ae-97554faa3db4',token)
print(myschema)
mytab=get_guid(myschema['guid'],token)
print(mytab)



