In [7]:

import yaml, requests, json

# Open the config file to read the client details
with open("../config/config.yaml", "r") as yamlfile:
    config = yaml.load(yamlfile, Loader=yaml.FullLoader)

client_id = config[0]['client_id']
client_secret = config[0]['client_secret']
tenant_id = config[0]['tenant_id']
purview_account_name = config[0]['purview_account_name']
scope = f"{config[0]['resource']}/.default"
purview_endpoint = f"https://{purview_account_name}.purview.azure.com"
authority = f"https://login.microsoftonline.com/{tenant_id}"

access_token = ""

print("Loaded config file.")

Loaded config file.


In [8]:
# Cell 3 - Acquire an Azure AD Access Token (with printout)

# Construct the token endpoint URL (using the older /oauth2/token for resource-based tokens)
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"

# Build the payload for client credentials grant
payload = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret,
    "resource": config[0]['resource']  # typically "https://purview.azure.net"
}

# Make the POST request to get the token
response = requests.post(token_url, data=payload)

if response.status_code == 200:
    access_token = response.json()["access_token"]
    print("Access token acquired successfully.")
    print("Access Token:", access_token)  # Printing the entire token for debugging
else:
    print("Failed to acquire token. Status code:", response.status_code)
    print("Response text:", response.text)
    raise SystemExit("Cannot continue without an access token.")

Access token acquired successfully.
Access Token: eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IkpETmFfNGk0cjdGZ2lnTDNzSElsSTN4Vi1JVSIsImtpZCI6IkpETmFfNGk0cjdGZ2lnTDNzSElsSTN4Vi1JVSJ9.eyJhdWQiOiJodHRwczovL3B1cnZpZXcuYXp1cmUubmV0IiwiaXNzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvMzMwMmRlMTktNjQ5Zi00NTgzLWJmNWItZDBhZjU3ODY3NWVjLyIsImlhdCI6MTc0MTc5OTA5MSwibmJmIjoxNzQxNzk5MDkxLCJleHAiOjE3NDE4MDI5OTEsImFpbyI6ImsyUmdZSGdmZE1KeitaYTFLeFF1TyszdG1DUFNDZ0E9IiwiYXBwaWQiOiI3MTRjZTg0Mi0yODRkLTQyYTYtODRmZC0xZTNiMzI1Y2EwNmEiLCJhcHBpZGFjciI6IjEiLCJpZHAiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC8zMzAyZGUxOS02NDlmLTQ1ODMtYmY1Yi1kMGFmNTc4Njc1ZWMvIiwiaWR0eXAiOiJhcHAiLCJvaWQiOiJhZjJiYmY2Yy03Mjc1LTRkZWYtOTE4Ny1lNTE0M2JlY2JkOGMiLCJyaCI6IjEuQVdFQkdkNENNNTlrZzBXX1c5Q3ZWNFoxN0o2VXduTXQybnBGbGdmOHhtVVppV2RpQVFCaEFRLiIsInN1YiI6ImFmMmJiZjZjLTcyNzUtNGRlZi05MTg3LWU1MTQzYmVjYmQ4YyIsInRpZCI6IjMzMDJkZTE5LTY0OWYtNDU4My1iZjViLWQwYWY1Nzg2NzVlYyIsInV0aSI6Ik1obGVlV3V5N0UtLTAxT2xRaC1uQUEiLCJ2ZXIiOiIxLjAiLCJ3aWRzIjpbIjA5OTdhMWQwLTBkMWQtNGFjY

In [11]:
import requests
import json
import datetime

# =============================================================================
# Purview Custom Lineage Process Creation Cell
#
# This cell creates a "Process" entity in Azure Purview to link multiple assets together.
# In this example, the process connects two input assets (e.g., a SQL table and an ADLS table)
# to a single output asset (e.g., a Power BI dataset), representing a data lineage flow.
#
# IMPORTANT: Update the GUID variables below with the actual GUIDs from your Purview UI.
# Also ensure that the variables 'purview_account_name' and 'access_token' are set (typically
# in an earlier configuration or token acquisition cell).
#
# Reference:
# https://docs.azure.cn/en-us/purview/legacy/how-to-purview-custom-lineage-api-user-guide#b-link-existing-entities-or-lineage-to-another-existing-entity-or-lineage
# =============================================================================

# ---------------------------
# User-Defined Variables: Update these with your asset GUIDs
# ---------------------------
# GUID of the first input asset (e.g., a SQL table)
source_asset_1_guid = "e8260355-9d77-4be6-b63d-abf6f6f60000"

# GUID of the second input asset (e.g., an ADLS table or file)
source_asset_2_guid = "4ad6a977-4a77-4c84-a7dc-ad67f4114424"

# GUID of the output asset (e.g., a Power BI dataset)
target_asset_guid = "670cfd5d-b4da-4c5b-954c-bbd8c0c3fedd"

# Ensure these variables are set from your configuration or earlier cells:
# purview_account_name: Your Azure Purview account name (e.g., "myaccount")
# access_token: A valid Azure AD access token acquired earlier.
# Example:
# purview_account_name = "your-purview-account-name"
# access_token = "your-access-token"

# ---------------------------
# Constructing the Process Entity Payload
# ---------------------------
# Generate a timestamp to ensure the process name is unique
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")

# Define a qualified name for the process entity. This uniquely identifies the process in Purview.
qualified_name = f"LineageProcess_{timestamp}"

# Build the JSON payload that defines the lineage process.
# 'inputs' lists all source asset GUIDs; 'outputs' lists the target asset GUID.
process_payload = {
    "entities": [
        {
            "status": "ACTIVE",  # Marks the process as active
            "version": 1,
            "typeName": "Process",  # Purview entity type for lineage processes
            "attributes": {
                "inputs": [
                    {"guid": source_asset_1_guid},
                    {"guid": source_asset_2_guid}
                ],
                "outputs": [
                    {"guid": target_asset_guid}
                ],
                "qualifiedName": qualified_name,
                "name": "Source to Target Lineage Process V2 (API)"  # Descriptive name for the process
            }
        }
    ]
}

# Optionally, print the payload to verify its structure
print("Process Payload:")
print(json.dumps(process_payload, indent=2))

# ---------------------------
# API Request to Create the Process Entity in Purview
# ---------------------------
# Construct the API endpoint URL using your Purview account name.
create_lineage_entities_url = (
    f"https://{purview_account_name}.purview.azure.com"
    "/catalog/api/atlas/v2/entity/bulk?api-version=2023-09-01"
)

# Setup the HTTP headers including the access token for authentication.
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

# Send the POST request to create the process entity linking your assets.
response = requests.post(
    create_lineage_entities_url,
    headers=headers,
    json=process_payload
)

# Print the HTTP status code to verify the request was sent successfully.
print("Status Code:", response.status_code)

# Try to parse and print the JSON response.
try:
    response_json = response.json()
    print("Response JSON:", json.dumps(response_json, indent=2))
except Exception as err:
    print("Error parsing JSON response. Raw Response:", response.text)


Process Payload:
{
  "entities": [
    {
      "status": "ACTIVE",
      "version": 1,
      "typeName": "Process",
      "attributes": {
        "inputs": [
          {
            "guid": "e8260355-9d77-4be6-b63d-abf6f6f60000"
          },
          {
            "guid": "4ad6a977-4a77-4c84-a7dc-ad67f4114424"
          }
        ],
        "outputs": [
          {
            "guid": "670cfd5d-b4da-4c5b-954c-bbd8c0c3fedd"
          }
        ],
        "qualifiedName": "LineageProcess_20250312171137",
        "name": "Source to Target Lineage Process V2 (API)"
      }
    }
  ]
}
Status Code: 200
Response JSON: {
  "mutatedEntities": {
    "CREATE": [
      {
        "typeName": "Process",
        "attributes": {
          "qualifiedName": "LineageProcess_20250312171137",
          "name": "Source to Target Lineage Process V2 (API)"
        },
        "lastModifiedTS": "1",
        "guid": "bbb58135-6a50-46dc-be20-6462e2b892ff",
        "status": "ACTIVE",
        "displayText": "Sou