**Install or update Purview CLI and required packages (run if needed)**

In [2]:
# ONLY RUN ONCE IF NEEDED

#!pip install --upgrade purviewcli
#!pip install --upgrade pandas
#!pip install --upgrade matplotlib
#!pip install python-dotenv

In [3]:
import json
import pandas as pd
import os
import shutil
import numpy as np

#installed from package above
from dotenv import load_dotenv

**<mark>Setup Authentication</mark>**

In [4]:
#Create a .env file and add the following lines to it

#PURVIEW_NAME="Purview Account Name"
#AZURE_CLIENT_ID="Your client ID"
#AZURE_TENANT_ID="Your tenant ID"
#AZURE_CLIENT_SECRET="Your Client secret" 

#load environment variables from .env file
load_dotenv(dotenv_path='./.env', override=True)

True

**<mark>User Parameters</mark>**

In [5]:
# root working directory for this notebook
# THIS FOLDER HAS TO EXIST. IT WONT BE CREATED AUTOMATICALLY
# root_working_folder = 'C:\Projects\Purview\PurviewBulk'
root_working_folder = 'C:\Repos\PurviewBulk'

#filter string for Purview call
filter_json_str = '''
{
  "and": [
    {
      "not": {
        "or": [
          {
            "attributeName": "size",
            "operator": "eq",
            "attributeValue": 0
          },
          {
            "attributeName": "fileSize",
            "operator": "eq",
            "attributeValue": 0
          }
        ]
      }
    },
    {
      "not": {
        "classification": "MICROSOFT.SYSTEM.TEMP_FILE"
      }
    },
    {
      "not": {
        "or": [
          {
            "entityType": "AtlasGlossaryTerm"
          },
          {
            "entityType": "AtlasGlossary"
          }
        ]
      }
    }
  ]
}
'''

filter_bg_only_json_str = '''
{
  "and": [
    {
      "not": {
        "or": [
          {
            "attributeName": "size",
            "operator": "eq",
            "attributeValue": 0
          },
          {
            "attributeName": "fileSize",
            "operator": "eq",
            "attributeValue": 0
          }
        ]
      }
    },
    {
      "not": {
        "classification": "MICROSOFT.SYSTEM.TEMP_FILE"
      }
    },
    {
        "or": [
            {
                "entityType": "AtlasGlossaryTerm"
            },
            {
                "entityType": "AtlasGlossary"
            }
        ]
    }
  ]
}
'''


#facets to be returned
facet_json_str = '''
[{
    "facet": "assetType",
    "count": 0,
    "sort": {
        "count": "desc"
    }
}, {
    "facet": "classification",
    "count": 10,
    "sort": {
        "count": "desc"
    }
}, {
    "facet": "contactId",
    "count": 10,
    "sort": {
        "count": "desc"
    }
}, {
    "facet": "label",
    "count": 10,
    "sort": {
        "count": "desc"
    }
}, {
    "facet": "term",
    "count": 10,
    "sort": {
        "count": "desc"
    }
}, {
    "facet": "classificationCategory",
    "count": 0,
    "sort": {
        "count": "desc"
    }
}, {
    "facet": "fileExtension",
    "count": 0,
    "sort": {
        "count": "desc"
    }
}]
'''

#CAN BE LEFT AS DEFAULTS
#folder for creating temporary files. will be removed and re-created with each run
ephemeral_folder = 'ephemeral'

#default file names
asset_export_file_name_template = '{0}_purview_assets.csv'
asset_detail_export_file_name_template = '{0}_details_purview_assets.csv'
filter_file_name = 'filter.json'
filter_bg_only_file_name = 'filter_bg_only.json'
facet_file_name = 'facet.json'

#folder to create all json payloads
update_paylod_folder_name_template = '{0}_updates'

#separator character 
separator_char ='|'

#asset types do do nested table scan
nested_table_asset_entity_types = ['powerbi_dataset']

**Setup working folders, filters and facets**

In [6]:
ephemeral_full_path = os.path.join(root_working_folder, ephemeral_folder)
if os.path.exists(ephemeral_full_path):
    shutil.rmtree(ephemeral_full_path, ignore_errors=False)

os.mkdir(ephemeral_full_path)

filter_json_full_path = os.path.join(ephemeral_full_path, filter_file_name)
facet_json_full_path = os.path.join(ephemeral_full_path, facet_file_name)
filter_bg_only_json_full_path = os.path.join(ephemeral_full_path, filter_bg_only_file_name)


file = open(filter_json_full_path, "w") 
file.writelines(filter_json_str) 
file.close() 

file = open(facet_json_full_path, "w") 
file.writelines(facet_json_str) 
file.close() 

file = open(filter_bg_only_json_full_path, "w") 
file.writelines(filter_bg_only_json_str) 
file.close() 

In [7]:
def coalesce(iterable):
    for el in iterable:
        if el is not None:
            return el
    return None

In [8]:
#Will be hydrated later
purviewTermsDF = pd.DataFrame(columns = ['name', 'qualifiedName', 'entityType', 'id'])

In [9]:
def guidToFormalName(guidList):
    global purviewTermsDF
    result = []
    matched_rows = purviewTermsDF.loc[purviewTermsDF['id'].isin(guidList)]
    result.extend(matched_rows['qualifiedName'])

    return result

In [10]:
def formalNameToGuid(formalName):
    global purviewTermsDF
    result =''
    matched_rows = purviewTermsDF.loc[purviewTermsDF['qualifiedName']==formalName]
    if len(matched_rows)==1:
        result = matched_rows.iloc[0]['id']
    
    return result

In [11]:
def listToDataframe(listOfAssets):
    df = pd.DataFrame(columns = ['name', 'qualifiedName', 'entityType','id'])

    for ent in listOfAssets:

        if 'entityType' not in ent:
            ent['entityType']=np.nan      

        asset_row = {'name':ent['name']
        ,'qualifiedName':ent['qualifiedName']
        ,'entityType':ent['entityType']
        ,'id':ent['id']}
        df = df.append(asset_row, ignore_index=True)

    return df

In [12]:
def searchPurview(keyword, filter_file, batch_size = 100, recursive_read = False):
    offset = 0
    all_items = []
    this_read_count = -1

    while ((this_read_count == -1) or (this_read_count>0) and recursive_read == True):
        search_output = !pv search query --keywords "{keyword}" --limit {batch_size} --offset {offset} --filterFile  {filter_file} --facets-file {facet_json_full_path}
        search_json = json.loads(''.join(search_output))
        assets_this_read = search_json['value']
        this_read_count = len(assets_this_read)
        all_items.extend(assets_this_read)
        offset += this_read_count
    
    return all_items

In [13]:
def unpackNestedTables(dfEntities):
    columns_list = dfEntities.columns
    dfAssetsUnpacked = pd.DataFrame(columns = columns_list)

    for index, row in dfEntities.iterrows():
        dfAssetsUnpacked = dfAssetsUnpacked.append(row, ignore_index=True) #add current row
        
        #check if this assets belongs to a list entity types that have nested tables. For example Power BI Dataset
        thisAssetEntityType = dfEntities.loc[index]['entityType']
        if thisAssetEntityType not in nested_table_asset_entity_types:
            continue;

        #check to see if asset has nested tables
        thisAssetId = dfEntities.loc[index]['id']

        bulk_output = !pv entity readBulk --guid {thisAssetId}
        bulk_json = json.loads(''.join(bulk_output))
        if len(bulk_json['entities'])!=1:
            continue;

        thisAtlasObject =  bulk_json['entities'][0]
        if 'relationshipAttributes' in thisAtlasObject:
            thisRelAtt = thisAtlasObject['relationshipAttributes']
            if 'tables' in thisRelAtt:
                thisEntityTables = thisRelAtt['tables']
                #read all tables and a row in dataframe
               
                for aTbl in thisEntityTables: 
                    tblGuid = aTbl['guid']
                    tbl_output = !pv entity readBulk --guid {tblGuid}
                    tbl_json = json.loads(''.join(tbl_output))
                    if len(tbl_json['entities'])!=1:
                        continue;
                    
                    thisNestedTbl = tbl_json['entities'][0]
                    if 'attributes' in thisNestedTbl:
                        thisNestedAtt = thisNestedTbl['attributes']

                        thisTblId = thisNestedTbl['guid']
                        thisEntityType = thisNestedTbl['typeName']
                        thisTblName = thisNestedAtt['name']
                        thisTblQualifiedName = thisNestedAtt['qualifiedName']
                        
                        #add nested tables where applicable
                        asset_row = {
                        'name':thisTblName
                        ,'qualifiedName':thisTblQualifiedName
                        ,'entityType':thisEntityType
                        ,'id':thisTblId}

                        dfAssetsUnpacked = dfAssetsUnpacked.append(asset_row, ignore_index=True)


    return dfAssetsUnpacked

In [14]:
def searchPurviewAssets(keyword, batch_size = 50, recursive_read = False):
    result = searchPurview(keyword, filter_json_full_path, batch_size, recursive_read)
    dfSearchResult = listToDataframe(result)
    
    dfSearchResult = unpackNestedTables(dfSearchResult)
    return len(dfSearchResult), dfSearchResult

In [15]:
def searchGlossaryTerms(keyword, batch_size = 50, recursive_read = False):
    result = searchPurview(keyword, filter_bg_only_json_full_path, batch_size, recursive_read)
    dfSearchResult = listToDataframe(result)
    return len(dfSearchResult), dfSearchResult

In [16]:
def getColumnStructure(atlasEntity):
    cols = None
    refEnt = None

    thisAsset = atlasEntity['entities'][0]
    if 'relationshipAttributes' not in thisAsset:
        return cols, refEnt

    if 'referredEntities' in atlasEntity:
        refEnt = atlasEntity['referredEntities']

    if 'columns' in thisAsset['relationshipAttributes']:
        cols = thisAsset['relationshipAttributes']['columns']   
    elif 'table_columns' in thisAsset['relationshipAttributes']: #snowflake
        cols = thisAsset['relationshipAttributes']['table_columns']
    elif 'view_columns' in thisAsset['relationshipAttributes']: #oracle
        cols = thisAsset['relationshipAttributes']['view_columns']  
    elif 'fields' in thisAsset['relationshipAttributes']: #salesforce
        cols = thisAsset['relationshipAttributes']['fields']   
    else:
        schGuid = None

        if 'attachedSchema' in thisAsset['relationshipAttributes']:
            attSch = thisAsset['relationshipAttributes']['attachedSchema']
            if len(attSch)>0:
                schGuid = attSch[0]['guid']

        if 'tabular_schema' in thisAsset['relationshipAttributes']:
            tabSch = thisAsset['relationshipAttributes']['tabular_schema']
            if len(tabSch)>0:
                schGuid = tabSch['guid']
        
        if schGuid!=None:
            schema_output = !pv entity readBulk --guid {schGuid}
            schema_json = json.loads(''.join(schema_output))
            return getColumnStructure(schema_json) 
    
    return cols, refEnt
    

In [17]:
def extractAssetRowFromEntity(entJson):
    ent = entJson['entities'][0]
    att = ent['attributes']
    assetId = ent['guid'] 
    assetName = att['name'] 
    assetQualifiedName = att['qualifiedName']
    assetDescription = coalesce([ att['userDescription'], att['description'] ])
    assetEntityType = ent['typeName']

    if 'meanings' in ent:
        termGuidList = [itm['guid'] for itm in ent['meanings']]
        termFormalNameList = guidToFormalName(termGuidList)
    else:
        termFormalNameList = []

    assetTerms = separator_char.join(term for term in termFormalNameList)
    
    if 'classifications' in ent:
        assetClassification = separator_char.join(cItem['typeName'] for cItem in  ent['classifications'])
    else:
        assetClassification = None

    asset_row = {
    'name':assetName
    ,'qualifiedName':assetQualifiedName
    ,'classification':assetClassification
    ,'term':assetTerms
    ,'description':assetDescription
    ,'entityType':assetEntityType
    ,'id':assetId}

    return asset_row




In [18]:
purviewTermsCount, purviewTermsDF = searchGlossaryTerms('*', recursive_read=True)

In [20]:
print('Setup is complete!')

Setup is complete!
