# Overview

This notebook uses pyapacheatlas to inject lineage between input and output assets 

In [None]:
# Install Atlas Python client (https://github.com/wjohnson/pyapacheatlas)
import pyapacheatlas
import json

In [None]:
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient

# enter your service principal credentials here
oauth = ServicePrincipalAuthentication(
    tenant_id= "",
    client_id="",
    client_secret=""
)


In [None]:
# Instantiate PurviewClient, enter your Purview account name here
client = PurviewClient(
    account_name =  "account-name-goes-here",
    authentication=oauth
)

In [None]:

from pyapacheatlas.core import AtlasEntity
from pyapacheatlas.core import AtlasProcess

import uuid

# Specify input(s) the output(s), and the process

# Define an existing entity or create a new entity
# You must provide a name, typeName, qualified_name, and guid
# the guid must be a negative number if it to be created

vuuid = "-"+ str(uuid.uuid4())
input01 = AtlasEntity(
    name="alpha.json",
    typeName="azure_datalake_gen2_path",
    qualified_name="https://somedatalake.dfs.core.windows.net/raw/acme/datasets/alpha.json",
    guid=vuuid
)
vuuid = "-"+ str(uuid.uuid4())
output01 = AtlasEntity(
    name="alpha",
    typeName="azure_datalake_gen2_resource_set",
    qualified_name="https://somedatalake.dfs.core.windows.net/base/acme/alpha/{SparkPartitions}",
    guid=vuuid
)

# only need to run the next line if either of the assets do not exist yet.  So long as the qualified name is the same as the scanned asset Purview will update rather than create duplicate
results = client.upload_entities(batch=[input01, output01])

In [None]:

# The Atlas Process is the lineage component that links the two
# entities together. The inputs and outputs need to be the "header"
# version of the atlas entities, so specify minimum = True to
# return just guid, qualifiedName, and typeName.

process_qn = "pyapacheatlas://acmebaseprocess"
process_type_name = "Process"
vuuid = "-"+ str(uuid.uuid4())

process = AtlasProcess(
    name="Synapse Spark - process raw",
    attributes={"description":"Spark job to transform raw files into standized format"},
    typeName=process_type_name,
    qualified_name=process_qn,
    inputs=[input01],
    outputs=[output01],
    guid=vuuid
)

# inject the following line after the attributes parameter above in order to assign experts and owners to the process
#contacts={"Expert":[{"id":"AAD OID","info":"some additional info goes here"}],"Owner":[{"id":"AAD OID","info":"some additional info goes here"}]},    

# Create the lineage component.
results = client.upload_entities(
    batch=[input01, output01, process]
)

print(json.dumps(results, indent=2))

In [None]:

from pyapacheatlas.core import AtlasEntity


# Here is an example of creating lineage from multiple inputs

input01 = AtlasEntity(
    name="alpha",
    typeName="azure_datalake_gen2_resource_set",
    qualified_name="https://somedatalake.dfs.core.windows.net/base/acme/alpha/{SparkPartitions}",
    guid="9f398956-febc-4643-bc55-b32d9d5b5f21"
)
input02 = AtlasEntity(
    name="coffee",
    typeName="azure_datalake_gen2_resource_set",
    qualified_name="https://somedatalake.dfs.core.windows.net/base/contonso/coffee/{Department}/{SparkPartitions}",
    guid="9f398956-febc-4643-bc55-b32d9d5b6f21"
)
output01 = AtlasEntity(
    name="alphacoffee",
    typeName="azure_datalake_gen2_resource_set",
    qualified_name="https://somedatalake.dfs.core.windows.net/curated/alphacoffee/{Department}/{SparkPartitions}",
    guid="9f398956-febc-4643-bc55-b32d9d5b5f21"
)

# only uncomment and run the next line if either of the assets do not exist yet. So long as the qualified name is the same as the scanned asset Purview will update rather than create duplicate
#results = client.upload_entities(batch=[input01, input02, output01])

In [None]:
from pyapacheatlas.core import AtlasProcess


# The Atlas Process is the lineage component that links the two
# entities together. The inputs and outputs need to be the "header"
# version of the atlas entities, so specify minimum = True to
# return just guid, qualifiedName, and typeName.

process_qn = "pyapacheatlas://curationprocess"
process_type_name = "Process"

process = AtlasProcess(
    name="Synapse Spark process",
    typeName=process_type_name,
    qualified_name=process_qn,
    inputs=[input01, input02],
    outputs=[output01],
    guid=-403
)

# Create lineage
results = client.upload_entities(
    batch=[input01, input02, output01, process]
)

print(json.dumps(results, indent=2))