In [None]:
pip install pyapacheatlas

In [None]:
import requests
import json
import re
import time
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lit
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import AtlasProcess, PurviewClient, AtlasException
from pyspark.sql.functions import col, monotonically_increasing_id, row_number
from pyspark.sql.window import Window


In [None]:
tenant_id = ""
client_id = ""
client_secret = ""
data_catalog_name = ""
resource_url = 'https://purview.azure.net'
guid_start = -1

In [None]:
def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str):
    """
    Authenticates Service Principal to the provided Resource URL, and returns the OAuth Access Token
    """
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
    payload= f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}'
    headers = {
    'Content-Type': 'application/x-www-form-urlencoded'
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    access_token = json.loads(response.text)['access_token']
    return access_token

In [None]:
def get_purview_assets(data_catalog_name: str, azuread_access_token: str, table_name: str):
    url = f"https://{data_catalog_name}.purview.azure.com/catalog/api/search/query?api-version=2022-08-01-preview"

    headers = {
        'Authorization': f'Bearer {azuread_access_token}',
        'Content-Type': 'application/json'
        }

    payload = """{
        "keywords": null,
        "limit": 1000,
        "filter": {
            "and": [
            {
                "attributeName": "qualifiedName",
                "operator": "contains",
                "attributeValue": "%s"
            }
            ]
        }
    }""" % (table_name)

    response = requests.request("POST", url, headers=headers, data=payload)
    data = response.json()

    # Regex pattern to match the desired qualifiedName format
    pattern = re.compile(r'^.+@https://.+\.azuredatabricks\.net/\?o$')

    # Filter the results to find the entity with the desired qualifiedName format only for hive_table entityType
    filtered_data = [item for item in data['value'] if item['entityType'] != 'hive_table' or pattern.match(item['qualifiedName'])]

    return filtered_data


In [None]:
df = spark.sql("""

SELECT nomeAmbiente, tipoOrigem, nomeCamada, nomeOrigem, nomeTabelaOrigem, nomeArquivoOrigem, nomeDatabaseDatabricks, nomeTabelaDatabricks
FROM 0_par.processos 
WHERE nomeTabelaOrigem IS NOT NULL 
AND nomeTabelaOrigem <> ''
AND nomeTabelaOrigem NOT LIKE '%teste%'

""")

schema = "guid_origem STRING, qualified_name_origem STRING, name_origem STRING, entityType_origem STRING, guid_destino STRING, qualified_name_destino STRING, name_destino STRING, entityType_destino STRING"
all_rows = []

for row in df.collect():
    origem_table = row['nomeTabelaOrigem']
    origem_file = row['nomeArquivoOrigem']
    destino_table = f"{row['nomeDatabaseDatabricks']}.{row['nomeTabelaDatabricks']}"

    try:
        azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url)
        origem_data = get_purview_assets(data_catalog_name, azuread_access_token, origem_table)

        if not origem_data:  
            origem_data = get_purview_assets(data_catalog_name, azuread_access_token, origem_file)
            if not origem_data:
                print(f"No results found for source: {origem_file}")

        if origem_data:
            origem_item = origem_data[0]
            origem_values = (origem_item['id'], origem_item['qualifiedName'].split("@")[0], origem_item['name'], origem_item['entityType'])
        else:
            origem_values = (None, None, None, None)

        azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url)
        destino_data = get_purview_assets(data_catalog_name, azuread_access_token, destino_table)

        if destino_data:
            destino_item = destino_data[0]
            destino_values = (destino_item['id'], destino_item['qualifiedName'].split("@")[0], destino_item['name'], destino_item['entityType'])
        else:
            destino_values = (None, None, None, None)
            print(f"No results found for destination: {destino_table}")

        row_values = origem_values + destino_values
        all_rows.append(Row(*row_values))

    except Exception as e:
        print(f"Error while processing tables: {origem_table}, {destino_table}. Error: {e}")

final_df = spark.createDataFrame(all_rows, schema)
display(final_df)

In [None]:
filter_condition = col('guid_origem').isNotNull()

other_columns = [
    'qualified_name_origem',
    'name_origem',
    'entityType_origem',
    'guid_destino',
    'qualified_name_destino',
    'name_destino',
    'entityType_destino'
]

distinct_final_df = final_df.filter(filter_condition)
distinct_final_df = distinct_final_df.withColumn('id', monotonically_increasing_id())
distinct_final_df = distinct_final_df.withColumn('id', row_number().over(Window.orderBy('id')))
distinct_final_df = distinct_final_df.distinct()
distinct_final_tb = distinct_final_df.select('id', 'guid_origem', 'qualified_name_origem','name_origem','entityType_origem','guid_destino','qualified_name_destino','name_destino','entityType_destino')

distinct_final_tb.write.format("delta").mode('overwrite').saveAsTable("0_par.lineage_data")

distinct_final_df = distinct_final_df.withColumn('action', lit(0))
distinct_final_df = distinct_final_df.select(col('id'), 'guid_origem', 'guid_destino', 'action')

#distinct_final_df.write.format("delta").mode('overwrite').saveAsTable("0_par.lineage_actions")

display(distinct_final_df)

In [None]:
distinct_final_df.createOrReplaceTempView("distinct_final_df_temp_view")

spark.sql("""
MERGE INTO 0_par.lineage_actions AS target
USING distinct_final_df_temp_view AS source
ON target.guid_origem = source.guid_origem AND target.guid_destino = source.guid_destino
WHEN NOT MATCHED THEN
  INSERT (id, guid_origem, guid_destino, action)
  VALUES (source.id, source.guid_origem, source.guid_destino, source.action)
""")

spark.sql('SELECT * FROM 0_par.lineage_actions').display()

In [None]:
def create_lineage(source_guid, destination_guid):
    global guid_start
    
    result = spark.sql(f"SELECT id, action FROM 0_par.lineage_actions WHERE guid_origem = '{source_guid}' AND guid_destino = '{destination_guid}'").collect()

    if result:
        row_start = result[0][0]
        if result[0][1] == 1:
            print(f"Lineage already exists for source_guid: {source_guid} and destination_guid: {destination_guid}, ID: {row_start}\n")
            return

    auth = ServicePrincipalAuthentication(
        tenant_id = "",
        client_id = "",
        client_secret = ""
    )

    client = PurviewClient(
        account_name="pview-ache-prd-westus",
        authentication=auth
    )
    
    source_entity = client.get_entity(source_guid)
    destination_entity = client.get_entity(destination_guid)
    
    existing_lineage = False
    
    if 'relationshipAttributes' in source_entity and 'outputTo' in source_entity['relationshipAttributes']:
        for relationship in source_entity['relationshipAttributes']['outputTo']:
            if relationship['guid'] == destination_guid:
                existing_lineage = True
                break

    if not existing_lineage:
        process_qn = f'Notebook: Purview - Create Lineage {row_start}'
        process_type_name = 'Process'

        new_lineage = AtlasProcess(
            name= f'Notebook Processing {row_start}',
            typeName=process_type_name,
            qualified_name=process_qn,
            inputs=[{"guid": source_guid}],
            outputs=[{"guid": destination_guid}],
            guid=guid_start
        )

        try:
            results = client.upload_entities(batch=[new_lineage])
            print(f"Lineage created for source_guid: {source_guid} and destination_guid: {destination_guid}\n")
            spark.sql(f"UPDATE 0_par.lineage_actions SET action = 1 WHERE guid_origem = '{source_guid}' AND guid_destino = '{destination_guid}'")
            guid_start -= 1
        except AtlasException as e:
            print(f"Failed to create lineage for source_guid: {source_guid} and destination_guid: {destination_guid}. Error message: {e}\n")
    else:
        print(f"Lineage already exists for source_guid: {source_guid} and destination_guid: {destination_guid}\n") 


In [None]:
df_f = spark.sql("SELECT * FROM 0_par.lineage_actions")

for index, row in enumerate(df_f.collect()):
    print(f"Processing row {index}: {row}")
    source_guid = row['guid_origem']
    destination_guid = row['guid_destino']

    lineage_result = create_lineage(source_guid, destination_guid)
    print(lineage_result)

    time.sleep(2)


In [None]:
source_entity = ''
destination_entity = ''

auth = ServicePrincipalAuthentication(
        tenant_id = "",
        client_id = "",
        client_secret = ""
    )

client = PurviewClient(
        account_name="",
        authentication=auth
    )


process_type_name = 'Process'
process_qn = f'Notebook: Purview - Create Lineage'

new_lineage = AtlasProcess(
            name= f'Notebook Processing',
            typeName=process_type_name,
            qualified_name=process_qn,
            inputs=[{"guid": source_entity}],
            outputs=[{"guid": destination_entity}],
            guid=-1
        )

results = client.upload_entities(batch=[new_lineage])
print(results)