# Purview Load Entity Notebook

This notebook loads custom entities and types into Microsoft Purview from JSON files stored in Azure Data Lake Storage.

## Prerequisites
- Microsoft Fabric workspace with permissions to run notebooks
- Azure Key Vault with secrets (automatically configured during deployment):
  - `client-id`: Service principal application ID
  - `client-secret`: Service principal password
  - `tenant-id`: Azure Active Directory tenant ID
- Service principal with:
  - Storage Blob Data Contributor role on storage account
  - Purview Data Curator role in Purview root collection

## Configuration
All sensitive credentials are retrieved from Azure Key Vault: `pccsakv6nvsfni5vtcj6`

Update the cell below only if you need to change:
- Storage account name
- Purview account name
- Container names
- File paths

In [None]:
# Install required Python package for Purview integration
%pip install pyapacheatlas

In [None]:
# Configuration - Retrieved from Azure Key Vault for security
from notebookutils import mssparkutils

# Key Vault URI (deployed with your infrastructure)
key_vault_uri = "https://pccsakv6nvsfni5vtcj6.vault.azure.net/"

# Storage configuration
blob_container_name = "pccsa"  # Container created during deployment
blob_account_name = "pccsast6nvsfni5vtcj6"  # Your deployed storage account
blob_relative_path = "incoming"  # Input files location
blob_processed = "processed"  # Processed files location

# Application configuration
app_name = "purviewspn"
purview_name = "edinmedi-purview-labs"

# Retrieve secrets from Key Vault (more secure than hardcoding)
TENANT_ID = mssparkutils.credentials.getSecret(key_vault_uri, "tenant-id")
CLIENT_ID = mssparkutils.credentials.getSecret(key_vault_uri, "client-id")
CLIENT_SECRET = mssparkutils.credentials.getSecret(key_vault_uri, "client-secret")

In [None]:
import json
import os
import sys
from pyspark.sql.types import *
from pyspark.sql.functions import *


# PyApacheAtlas packages
# Connect to Atlas via a Service Principal
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient, AtlasClassification, AtlasEntity, AtlasProcess, RelationshipTypeDef  # Communicate with your Atlas server
from pyapacheatlas.readers import ExcelConfiguration, ExcelReader
from pyapacheatlas.core.util import GuidTracker,AtlasException
from pyapacheatlas.core import AtlasAttributeDef, AtlasEntity, PurviewClient
from pyapacheatlas.core.typedef import EntityTypeDef
from notebookutils import mssparkutils

In [None]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.setMaster("local").setAppName(app_name)
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
 .builder\
 .appName(app_name)\
 .config(conf = myconf) \
 .getOrCreate()


Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(app_name)
adls_home = 'abfss://%s@%s.dfs.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
adls_processed = 'abfss://%s@%s.dfs.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_processed)

In [None]:
def log_msgs(msg_type,msg):
        
        if msg_type.upper() == "ERROR":
            print('ERROR: %s' % repr(msg))
            mylogger.error(repr(msg))
        else:
            print('INFO: %s' % repr(msg))
            mylogger.info(repr(msg))
           

In [None]:
    # Authenticate against your Atlas server
oauth = ServicePrincipalAuthentication(
       tenant_id= TENANT_ID, 
       client_id=CLIENT_ID, 
       client_secret=CLIENT_SECRET 
  )
client = PurviewClient(
        account_name = os.environ.get("PURVIEW_NAME", purview_name),
        authentication=oauth
    )
gt = GuidTracker()

In [None]:
def Get_Rel_Inputs(json_obj):
    try:
        att_value = json_obj['relationshipAttributes']['inputs']
        return True
    except:
        return False

def Get_Rel_Outputs(json_obj):
    try:
        att_value = json_obj['relationshipAttributes']['outputs']
        return True
    except:
        return False

def Get_Outputs(json_obj):
    try:
        att_value = json_obj['attributes']['outputs']
        return True
    except:
        return False

def Get_Inputs(json_obj):
    try:
        att_value = json_obj['attributes']['inputs']
        return True
    except:
        return False
def Get_Rel_Parents(json_obj):
    try:
        att_value = json_obj['relationshipAttributes']['parent']
        return True
    except:
        return False

In [None]:
def search_entities_byQuilifiedName(name):
    results = client.search_entities('qualifiedName\:%s' % name.replace(":","\:").replace("/","\/").replace("{","\{").replace("}","\}"))
    guid = None
    for result in results:
        if result['qualifiedName'] == name:
                guid= result['id']
                if result['entityType'] != 'purview_custom_connector_generic_entity':
                    #print(result)
                    break
    return guid

In [None]:
def removeDummyEntity(name):
    try:
        results = client.search_entities('qualifiedName\:%s' % name.replace(":","\:").replace("/","\/"))
        for result in results:
            if result['qualifiedName'] ==name:
                entity = client.get_entity(guid=result['id'])
                if len(entity['entities']) > 0 :
                    if len(entity['entities'][0]['relationshipAttributes']['inputToProcesses']) == 0 and len(entity['entities'][0]['relationshipAttributes']['outputFromProcesses']) == 0:
                        log_msgs("INFO",'removeDummyEntity: Deleted Dummy entity%s' % (entity['entities'][0]))
                        client.delete_entity(entity['entities'][0]['guid'])
    except:
        log_msgs("ERROR",'Build_Entity_Json: %s' % (str(e)))

In [None]:
def Create_Generic_Entity(dummy):
    try:
        log_msgs("INFO",'Create_Generic_Entity - Dummy entity: %s' % dummy)
        _qualifiedname = 'dummy://%s' % dummy['uniqueAttributes']['qualifiedName']
        output_guid = search_entities_byQuilifiedName(_qualifiedname)
        if output_guid==None:
            attributes = {}
            qualifiedName = dummy['uniqueAttributes']['qualifiedName']
            attributes["purview_qualifiedName"]= dummy['uniqueAttributes']['qualifiedName']
            typeName = ""
            if "typeName" in dummy['uniqueAttributes']:
                typeName = dummy['uniqueAttributes']['typeName']
            if "source" in dummy['uniqueAttributes']:
                attributes["original_source"]= dummy['uniqueAttributes']['source']
            tmepname = qualifiedName.split('/')
            Name= tmepname[len(tmepname)-1]
            if "Name" in dummy['uniqueAttributes']:
                Name = dummy['uniqueAttributes']['Name']
            if "name" in dummy['uniqueAttributes']:
                Name = dummy['uniqueAttributes']['name']

            generic_entity = AtlasEntity(
                name = Name,
                qualified_name = _qualifiedname,
                typeName = "purview_custom_connector_generic_entity",
                attributes = attributes,
                guid = gt.get_guid()
                )
            upload_results = client.upload_entities(batch=[generic_entity])
            if 'mutatedEntities' in upload_results:
                if 'CREATE' in upload_results['mutatedEntities']:
                    if len(upload_results['mutatedEntities']['CREATE']) >0:
                        log_msgs("INFO",'Create_Generic_Entity: Entities Created/Updated')
                        log_msgs("INFO",'Create_Generic_Entity: %s' % upload_results)
                        return upload_results['mutatedEntities']['CREATE'][0]['guid']
                    else:
                        log_msgs("ERROR",'Create_Generic_Entity: Fail to retrieve gui')
                        log_msgs("ERROR",json.dump(upload_results))
                        return None
                else:
                    log_msgs("ERROR",'Create_Generic_Entity: Fail to retrieve CREATE')
                    log_msgs("ERROR",json.dump(upload_results))
                    return None
            log_msgs("ERROR",'Create_Generic_Entity: Fail to retrieve mutatedEntities')
            log_msgs("ERROR", json.dump(upload_results))
            return None
        else:
            return output_guid
    except Exception as e:
      log_msgs("ERROR",'Create_Generic_Entity: %s' % (str(e)))
    return None


In [None]:
def Load_Entity_Json(json_file):
   try:
    purview_load_entities=[]
    for i in json_file:
        json_obj = json.loads(i)
        json_obj
        purview_load_entities.append(json_obj)
            
    upload_results = client.upload_entities(batch=purview_load_entities)
    log_msgs("INFO",'Entities Created/Updated')
    #print(json.dumps(upload_results, indent=2))
    return True
   except Exception as e:
      log_msgs("ERROR",'Load_Entity_Json: %s' % (str(e)))
   return False

def Load_Entity_Json_fromJson(_json):
   try:
    purview_load_entities=[]
    for i in _json:
        purview_load_entities.append(i)
    
    log_msgs("INFO",(purview_load_entities))
    upload_results = client.upload_entities(batch=purview_load_entities)
    log_msgs("INFO",'Entities Created/Updated')
#    print(json.dumps(upload_results, indent=2))
    return True
   except Exception as e:
      log_msgs("ERROR",'Load_Entity_Json_fromJson: %s' % (str(e)))
   return False


In [None]:
def Build_Entity_Json(_json):
    
    _parent = gt.get_guid()
    my_parents={}
    final_json = []
    rel_obj = {}
    try:
        for i in _json:
            _qualifiedname=None
            _typename = None
            json_obj = json.loads(i)
            try:
                if 'typeName' in json_obj:
                    _typename = json_obj['typeName']
                if _typename == None:
                    log_msgs('ERROR','JSON dont have typeName of the entity')
                    return False
            except:
                log_msgs('ERROR','JSON dont have typeName of the entity')
                return False
            try:
                _qualifiedname = json_obj['attributes']['qualifiedName']
            except:
                log_msgs('ERROR','JSON dont have attributes/qualifiedName of the entity')
                return False

            if _qualifiedname != None and _typename != None:
                entity = client.get_entity(qualifiedName=_qualifiedname,typeName=_typename)
                if len(entity) > 0:
                    entity_guid = entity["entities"][0]["guid"]
                    print(entity_guid)
                    json_obj['guid'] = entity_guid
                    my_parents[_typename]=entity_guid
                else:
                    if not 'guid' in json_obj:
                        json_obj['guid'] = gt.get_guid()
                    my_parents[_typename]=json_obj['guid']

                if Get_Outputs(json_obj):
                    for each in json_obj['attributes']['outputs']:
                        _qualifiedname = each['uniqueAttributes']['qualifiedName']
                        dummyEntities.append('dummy://%s' % _qualifiedname)
                        output_guid = search_entities_byQuilifiedName(_qualifiedname)
                        if output_guid==None:
                            refguid = Create_Generic_Entity(each) 
                            if refguid != None:
                                each['uniqueAttributes']['guid'] = refguid
                                rel_obj[_qualifiedname] = refguid
                            else:
                                log_msgs("ERROR",'Build_Entity_Json - output - results: Can\'t Create Dummy '.join(each))
                                return False
                        else:
                            each['uniqueAttributes']['guid'] = output_guid
                            rel_obj[_qualifiedname] = output_guid

                if Get_Inputs(json_obj):
                    for each in json_obj['attributes']['inputs']:
                        _qualifiedname = each['uniqueAttributes']['qualifiedName']
                        dummyEntities.append('dummy://%s' % _qualifiedname)
                        input_guid = search_entities_byQuilifiedName(_qualifiedname)
                        if input_guid==None:
                            refguid = Create_Generic_Entity(each) 
                            if refguid != None:
                                each['uniqueAttributes']['guid'] = refguid
                                rel_obj[_qualifiedname] = refguid
                            else:
                                log_msgs("ERROR",'Build_Entity_Json - Input - results: Can\'t Create Dummy '.join(each))
                                return False
                        else:
                            each['uniqueAttributes']['guid'] = input_guid
                            rel_obj[_qualifiedname] = input_guid


                if Get_Rel_Inputs(json_obj):
                    for each in json_obj['relationshipAttributes']['inputs']:
                        _qualifiedname = each['qualifiedName']
                        each['guid'] = rel_obj[_qualifiedname]

                if Get_Rel_Outputs(json_obj):
                    for each in json_obj['relationshipAttributes']['outputs']:
                        _qualifiedname = each['qualifiedName']
                        each['guid'] = rel_obj[_qualifiedname]

                if Get_Rel_Parents(json_obj):
                    json_obj['relationshipAttributes']['parent']['guid'] = my_parents[json_obj['relationshipAttributes']['parent']['typeName']]
                final_json.append(json_obj)
            else:
                return False
    except Exception as e:
        log_msgs("ERROR",'Build_Entity_Json: %s' % (str(e)))
        return False
    try:
        Load_Entity_Json_fromJson(final_json)
        return True
    except Exception as e:
        log_msgs("ERROR",'Build_Entity_Json: %s' % (str(e)))
        return False

In [None]:
havefiles = True
inicialnumfiles = 0
dummyEntities = []
while havefiles:
    havefiles = False
    files = mssparkutils.fs.ls(adls_home)
    numoffiles = len(files)
    processedfiles = 0
    failfiles=0
    for file in files:
        if file.size > 0:
            havefiles = True
            i=0
            filepath = ""
            fileparts = file.path.split('/')
            for filepart in fileparts:
                if i < len(fileparts)-1:
                    filepath+='%s/' % filepart
                i+=1
            
            filepath='%s/%s' % (adls_processed,file.name)
            load_json = False
            readComplexJSONDF=None
            try:
                readComplexJSONDF = spark.read.option("multiLine","true").json(file.path)
                load_json=True
            except Exception as e:
                log_msgs('ERROR','Invalid Json: %s /r %s' % file.path,e.args[0])

            if load_json:
                j = readComplexJSONDF.toJSON().collect()
                log_msgs('INFO','Loading File: %s' % file.path)
                if Build_Entity_Json(j):
                    try:
                        deletfile = mssparkutils.fs.rm(filepath)
                        
                    except:
                        log_msgs('INFO','No file to delete')
                    movefile = mssparkutils.fs.mv(src=file.path,dest=filepath)
                    processedfiles+=1
                else:
                    failfiles+=1
    if failfiles > 0  and processedfiles == 0:
        print('Exit all files loaded')
        break
if len(dummyEntities) > 0:
    for dummyEntitie in dummyEntities:
        removeDummyEntity(dummyEntitie)