## Environment Set Up

In [14]:
import os
from pycelonis import get_celonis
import pandas as pd
import json
import re
import shutil
import subprocess
import zipfile
import yaml
import numpy as np
from datetime import datetime
from pycelonis.ems import ColumnTransport, ColumnType
import traceback
import glob

In [15]:
url = os.environ['CELONIS_URL'] = 'https://lineage.develop.celonis.cloud'
#API Key for this specific url
api_token = os.environ['CELONIS_API_TOKEN'] = 'MDE5YTE2M2UtZmU5YS03NTFkLWFjYmYtZGQ0NWQxODJjZmYzOmc2ZTY2UUkyU3R2RFkxVTA2L3VNc0tiZUxaVmZneHR0RVRuVS9ETFJWS3or'

In [39]:
""" url = os.environ['CELONIS_URL'] = 'https://snap-tiger-team-celonis-com.eu-1.celonis.cloud'
#API Key for this specific url
api_token = os.environ['CELONIS_API_TOKEN'] = 'Nzk3MTliYWUtM2U4ZC00MmJmLWFlMGMtMzY5YmIxZWRhNDgxOjVMQ1lxaGwyeGZLbWxvbGVOeTkzYzcrZEQ1cTgzSnlDN29CUXpFNlRRMXZG' """

" url = os.environ['CELONIS_URL'] = 'https://snap-tiger-team-celonis-com.eu-1.celonis.cloud'\n#API Key for this specific url\napi_token = os.environ['CELONIS_API_TOKEN'] = 'Nzk3MTliYWUtM2U4ZC00MmJmLWFlMGMtMzY5YmIxZWRhNDgxOjVMQ1lxaGwyeGZLbWxvbGVOeTkzYzcrZEQ1cTgzSnlDN29CUXpFNlRRMXZG' "

In [None]:
""" url = os.environ['CELONIS_URL'] = 'https://ems-two.eu-1.celonis.cloud'
#API Key for this specific url
api_token = os.environ['CELONIS_API_TOKEN'] = 'MWFkMTk0YzctMmJjNC00MWMwLTg2NzAtY2M5YjdjOGViZDE4OkpSVFVkQVpIQ0lGcGd4RExQMjN6RUhPcEw2MVFHV2lDMm0ydFlDSStIaFBV' """

In [16]:
#Initializing Celonis object
c = get_celonis(base_url = url, api_token = api_token, key_type='USER_KEY') # adjust base_url and api_token accordingly

## Studio Lineage (Views + Knowledge Model)

In [17]:
def get_knowledge_model_key(view_node):
    km_key = json.loads(view_node.serialized_content)['metadata'].get('knowledgeModelKey')
    return km_key

def get_data_model_variable(knowledge_model):
    try:
        serialized = json.loads(knowledge_model.serialized_content)
        data_model_expr = serialized.get("dataModelId")
        match = None
        if isinstance(data_model_expr, str):
            match = re.match(r'\${{([a-zA-Z0-9_]+)}}', data_model_expr)
        if not match:
            print('ERROR: No valid Data Model Variable')
            return None
        return match.group(1)
    except (KeyError, json.JSONDecodeError, AttributeError) as e:
        print('ERROR: No Data Model Assigned')
        return None

data_pools = c.data_integration.get_data_pools()

def find_data_pool_id(data_model_id):
    data_pool_id = None
    for data_pool in data_pools:
        data_models = data_pool.get_data_models()
        for data_model in data_models:
            if data_model.id == data_model_id:
                data_pool_id = data_pool.id
                break
    return data_pool_id

In [18]:
# Lineage API call
def get_lineage(celonis, knowledge_model_key):
    try:
        lineage = celonis.client.request(
            method='get',
            url=f'/semantic-layer/api/usage/by-semantic-model/{knowledge_model_key}',
            parse_json =True
        )
        return lineage
    except Exception as e:
        print(e)
        return {}

In [19]:
def get_km_attribute_details(knowledge_model, data_model): 
    """
    Extracts attribute details and finds the 'table_schema' for each table.
    FIXED: Manually constructs schema string to avoid 'bound method' errors.
    """
    attribute_details = {}
    kpi_details = {}
    
    try:
        km_content = json.loads(knowledge_model.serialized_content)
        
        # --- Create a lookup map for table properties ---
        table_properties = {}
        if data_model:
            pool_id = find_data_pool_id(data_model.id)
            for table in data_model.get_tables():
                # SAFELY construct the schema string
                # This matches your Backend logic: pool_id + "_" + data_source_id
                if table.data_source_id:
                    safe_schema = f"{pool_id}_{table.data_source_id}"
                else:
                    safe_schema = pool_id

                # Store schema by lowercase table alias
                table_properties[table.alias_or_name.lower()] = {
                    'table_schema': safe_schema
                }
        
        # Process record attributes
        if 'records' in km_content:
            for record in km_content['records']:
                record_id = record.get('id') # This is the table alias
                table_info = table_properties.get(record_id.lower(), {})
                
                if 'attributes' not in record: continue
                
                for attr in record['attributes']:
                    attr_id = attr.get('id')
                    pql = attr.get('pql')
                    key = f"{record_id}.{attr_id}".lower()
                    
                    if pql and pql.strip(): source_type = 'CALCULATED'
                    else: source_type = 'AUTO_GENERATED'
                    
                    attribute_details[key] = {
                        'source_type': source_type,
                        'pql': pql if pql else '',
                        'table_schema': table_info.get('table_schema', '') # Defaults to empty string, not None
                    }
        
        # Process KPIs 
        if 'kpis' in km_content:
            for i, kpi in enumerate(km_content['kpis']):
                kpi_id = kpi.get('id')
                kpi_pql = kpi.get('pql', '')
                kpi_name = kpi.get('displayName', kpi_id)
                if kpi_id:
                    kpi_details[kpi_id] = {'name': kpi_name, 'pql': kpi_pql, 'format': kpi.get('format', '')}
            
    except Exception as e:
        print(f"Warning: Could not extract details: {e}")
        traceback.print_exc()
    
        
    return attribute_details, kpi_details

In [20]:
def update_with_view_usages(lineage, usage: list, metadata: dict, attribute_details: dict, kpi_details: dict):
    ## View usages
    
    # Record --> View
    parent_key = 'viewUsages'
    key = "recordAttributeReferences"
    for record, attributes in lineage[parent_key][key].items():
        for attribute, views in attributes.items():
            # Get attribute details
            attr_key = f"{record}.{attribute}".lower()
            attr_info = attribute_details.get(attr_key, {'source_type': 'AUTO_GENERATED', 'pql': ''})
            
            for view in views:
                usage.append({
                    'UNIQUE_SOURCE_ID': f'{metadata.get('knowledge_model_id')}.{record}.{attribute}'.lower(),
                    'UNIQUE_TARGET_ID': f'{metadata.get("root_id")}.{view.get("nodeId")}'.lower(),
                    'SOURCE_ID': f'{record}.{attribute}', 
                    'SOURCE_NAME': record,
                    'SOURCE_ATTRIBUTE': attribute,
                    'SOURCE_NODE_TYPE': 'ATTRIBUTES',
                    'SOURCE_TYPE': attr_info['source_type'],
                    'SOURCE_PQL': attr_info['pql'],
                    'SOURCE_STUDIO_ASSET_ID': metadata.get("knowledge_model_id"),
                    'SOURCE_STUDIO_ASSET_TYPE': 'KNOWLEDGE_MODEL',
                    'TARGET_ID': view.get('nodeId'),
                    'TARGET_NAME': view.get('assetName'),
                    'TARGET_ATTRIBUTE': None,
                    'TARGET_NODE_TYPE': 'VIEW',
                    'TARGET_STUDIO_ASSET_ID': view.get('nodeId'),
                    'TARGET_STUDIO_ASSET_TYPE': 'VIEW',
                    'DATA_SOURCE_ID': '',
                    'KNOWLEDGE_MODEL_KEY': metadata.get('knowledge_model_key'),
                    'KNOWLEDGE_MODEL_ID': metadata.get("knowledge_model_id"),
                    'DATA_MODEL_ID': metadata.get('data_model_id'),
                    'DATA_MODEL_NAME': metadata.get('data_model_name'),
                    'DATA_POOL_ID': metadata.get('data_pool_id'),
                    'DATA_POOL_NAME': metadata.get('data_pool_name')
                })

    # KPI --> View
    key = 'kpiReferences'
    
    for kpi, views in lineage[parent_key][key].items():
        kpi_info = kpi_details.get(kpi, {'name': kpi, 'pql': ''})
        
        for view in views:
            usage.append({
                'UNIQUE_SOURCE_ID': f'{metadata.get('knowledge_model_id')}.{kpi}'.lower(),
                'UNIQUE_TARGET_ID': f'{metadata.get("root_id")}.{view.get("nodeId")}'.lower(),
                'SOURCE_ID': f'{kpi}', 
                'SOURCE_NAME': kpi,
                'SOURCE_ATTRIBUTE': None,
                'SOURCE_NODE_TYPE': 'KPI',
                'SOURCE_TYPE': '',
                'SOURCE_PQL': kpi_info['pql'],
                'SOURCE_STUDIO_ASSET_ID': metadata.get("knowledge_model_id"),
                'SOURCE_STUDIO_ASSET_TYPE': 'KNOWLEDGE_MODEL',
                'TARGET_ID': view.get('nodeId'),
                'TARGET_NAME': view.get('assetName'),
                'TARGET_NODE_TYPE': 'VIEW',
                'TARGET_STUDIO_ASSET_ID': view.get('nodeId'),
                'TARGET_STUDIO_ASSET_TYPE': 'VIEW',
                'DATA_SOURCE_ID': '',
                'KNOWLEDGE_MODEL_KEY': metadata.get('knowledge_model_key'),
                'KNOWLEDGE_MODEL_ID': metadata.get("knowledge_model_id"),
                'DATA_MODEL_ID': metadata.get('data_model_id'),
                'DATA_MODEL_NAME': metadata.get('data_model_name'),
                'DATA_POOL_ID': metadata.get('data_pool_id'),
                'DATA_POOL_NAME': metadata.get('data_pool_name')
            })
    
    return usage

def update_with_km_usages(lineage, usage, metadata, attribute_details: dict, kpi_details: dict):
    ## Knowledge Model usages
    
    # Record --> KM property
    parent_key = 'knowledgeModelUsages'
    key = "recordAttributeUsages"
    properties = ['kpis', 'filters', 'attributes', 'flags']
    map_id = {'kpis': 'id', 'filters': 'id', 'attributes': 'recordId', 'flags': 'id'}

    for property in properties:
        for record, attributes in lineage[parent_key][key][property].items():
            for attribute, props in attributes.items():
                # Get attribute details
                attr_key = f"{record}.{attribute}".lower()
                attr_info = attribute_details.get(attr_key, {'source_type': 'AUTO_GENERATED', 'pql': ''})
                
                for prop in props:
                    prop_id = prop.get(map_id.get(property), 'id')
                    prop_id = f'{prop_id}.{prop.get("attributeId")}' if prop.get('attributeId') else prop_id
                    usage.append({
                        'UNIQUE_SOURCE_ID': f'{metadata.get('knowledge_model_id')}.{record}.{attribute}'.lower(),
                        'UNIQUE_TARGET_ID': f'{metadata.get("knowledge_model_id")}.{prop_id}'.lower(),
                        'SOURCE_ID': f'{record}.{attribute}'.lower(), 
                        'SOURCE_NAME': record,
                        'SOURCE_ATTRIBUTE': attribute,
                        'SOURCE_NODE_TYPE': 'ATTRIBUTES',
                        'SOURCE_TYPE': attr_info['source_type'],
                        'SOURCE_PQL': attr_info['pql'],
                        'SOURCE_STUDIO_ASSET_ID': metadata.get("knowledge_model_id"),
                        'SOURCE_STUDIO_ASSET_TYPE': 'KNOWLEDGE_MODEL',
                        'TARGET_ID': f'{prop_id}'.lower(),
                        'TARGET_NAME': prop.get('displayName'),
                        'TARGET_ATTRIBUTE': prop.get('attributeId'),
                        'TARGET_NODE_TYPE': property.upper(),
                        'TARGET_STUDIO_ASSET_ID': metadata.get("knowledge_model_id"),
                        'TARGET_STUDIO_ASSET_TYPE':'KNOWLEDGE_MODEL',
                        'DATA_SOURCE_ID': '',
                        'KNOWLEDGE_MODEL_KEY': metadata.get('knowledge_model_key'),
                        'KNOWLEDGE_MODEL_ID': metadata.get("knowledge_model_id"),
                        'DATA_MODEL_ID': metadata.get('data_model_id'),
                        'DATA_MODEL_NAME': metadata.get('data_model_name'),
                        'DATA_POOL_ID': metadata.get('data_pool_id'),
                        'DATA_POOL_NAME': metadata.get('data_pool_name')
                    })
    
    # KPI --> KM property
    key = 'kpiUsages'
    for property in properties:
        for kpi, props in lineage[parent_key][key][property].items():
            kpi_info = kpi_details.get(kpi, {'name': kpi, 'pql': ''})
            
            for prop in props:
                prop_id = prop.get(map_id.get(property), 'id')
                prop_id = f'{prop_id}.{prop.get("attributeId")}' if prop.get('attributeId') else prop_id
                usage.append({
                        'UNIQUE_SOURCE_ID': f'{metadata.get('knowledge_model_id')}.{kpi}'.lower(),
                        'UNIQUE_TARGET_ID': f'{metadata.get("knowledge_model_id")}.{prop_id}'.lower(),
                        'SOURCE_ID': f'{kpi}'.lower(), 
                        'SOURCE_NAME': kpi,
                        'SOURCE_ATTRIBUTE': None,
                        'SOURCE_NODE_TYPE': 'KPI',
                        'SOURCE_TYPE': '',
                        'SOURCE_PQL': kpi_info['pql'],
                        'SOURCE_STUDIO_ASSET_ID': metadata.get("knowledge_model_id"),
                        'SOURCE_STUDIO_ASSET_TYPE': 'KNOWLEDGE_MODEL',
                        'TARGET_ID': f'{prop_id}'.lower(),
                        'TARGET_NAME': prop.get('displayName'),
                        'TARGET_ATTRIBUTE': prop.get('attributeId'),
                        'TARGET_NODE_TYPE': property.upper(),
                        'TARGET_STUDIO_ASSET_ID': metadata.get("knowledge_model_id"),
                        'TARGET_STUDIO_ASSET_TYPE':'KNOWLEDGE_MODEL',
                        'DATA_SOURCE_ID': '',
                        'KNOWLEDGE_MODEL_KEY': metadata.get('knowledge_model_key'),
                        'KNOWLEDGE_MODEL_ID': metadata.get("knowledge_model_id"),
                        'DATA_MODEL_ID': metadata.get('data_model_id'),
                        'DATA_MODEL_NAME': metadata.get('data_model_name'),
                        'DATA_POOL_ID': metadata.get('data_pool_id'),
                        'DATA_POOL_NAME': metadata.get('data_pool_name')
                    })
                
    return usage

## Bridge Table - Integration between backend and frontend

In [23]:
def create_bridge_links(data_model, knowledge_model_id, pool_table_lookup, data_pool_id, data_pool_name):
    """
    Optimized version that accepts data_pool_id directly instead of searching for it.
    """
    links = []
    try:
        dm_id = data_model.id
        
        # Iterate tables in the Data Model (Logical layer)
        for table in data_model.get_tables():
            dm_table_alias = table.alias_or_name
            
            # Look up physical info from our cache (Physical layer)
            physical_info = pool_table_lookup.get(table.name.lower())
            
            if physical_info:
                dict_schema = physical_info['schema'].lower()
                dict_name = physical_info['name'].lower()
                
                source_node_raw = f"{dict_schema}_{dict_name}"
                source_node_prefixed = f"DATA_MODEL_TABLE_{dict_schema}_{dm_id}_{dm_table_alias}"
            else:
                continue

            # Create Links for Columns
            for column in table.get_columns():
                target_node = f"{knowledge_model_id}.{dm_table_alias}.{column.name}".lower()
                
                link_data = {
                    "target_node": target_node,
                    "task_target": "DATA_MODEL_COLUMN",
                    "data_pool_id": data_pool_id,
                    "data_pool_name": data_pool_name,
                    "data_model_id": dm_id, 
                    "data_model_name": data_model.name
                }
                
                links.append({"source_node": source_node_raw, **link_data})
                links.append({"source_node": source_node_prefixed, **link_data})
                
    except Exception as e:
        print(f"    Warning: Could not create bridge links for DM {data_model.name}: {e}")
    
    return links

## Execution Block - Scan the environment and generate lineage and link table

In [None]:
print("Pre-mapping Data Models to Data Pools...")
dm_id_mapping = {} 

all_pools = c.data_integration.get_data_pools()
for pool in all_pools:
    try:
        # Pre-fetch models so we don't have to search later
        for dm in pool.get_data_models():
            dm_id_mapping[dm.id] = {'pool': pool, 'model': dm}
    except:
        pass
print(f"Mapped {len(dm_id_mapping)} Data Models. Starting Scan...")


# Main Execution Block ---
pool_table_cache = {} 
usage = []
all_bridge_links = []
all_metadata = []
error_km_count = 0
processed_km_count = 0

for space in c.studio.get_spaces():
    print(f"Space: {space.name}")
    
    for package in space.get_packages():
        print(f"  Package: {package.name}")

        try:
            knowledge_models = package.get_knowledge_models()
            if not knowledge_models: continue
            
            # Get variables once per package
            try:
                package_variables = package.get_variables()
            except:
                package_variables = []

        except Exception as e:
            print(f"    Error accessing package: {e}")
            continue
        
        for knowledge_model in knowledge_models:
            try:
                # 1. Resolve Data Model ID
                dm_variable = get_data_model_variable(knowledge_model)
                if not dm_variable: continue # Skip silently
                
                data_model_id = None
                try:
                    if package_variables:
                        variable_obj = package_variables.find(dm_variable, "key")
                        if variable_obj: data_model_id = variable_obj.value
                except: pass

                if not data_model_id: continue

                # 2. FAST LOOKUP (No API calls)
                cached_data = dm_id_mapping.get(data_model_id)
                
                if cached_data:
                    data_pool = cached_data['pool']
                    data_model = cached_data['model']
                    data_pool_id = data_pool.id

                    # 3. POOL TABLE CACHE
                    if data_pool_id in pool_table_cache:
                        pool_table_lookup = pool_table_cache[data_pool_id]
                    else:
                        print(f"    Caching tables for pool: {data_pool.name}")
                        pool_table_lookup = {}
                        try:
                            # Robust table fetching
                            for pool_table in data_pool.get_tables():
                                phys_schema = getattr(pool_table, 'schema_name', f"{data_pool.id}_{pool_table.data_source_id}")
                                pool_table_lookup[pool_table.name.lower()] = {'schema': phys_schema, 'name': pool_table.name}
                            pool_table_cache[data_pool_id] = pool_table_lookup
                        except Exception as e:
                            print(f"    Error caching tables: {e}")
                    
                    # 4. Generate Links
                    bridge_links = create_bridge_links(
                        data_model, 
                        knowledge_model.id, 
                        pool_table_lookup,
                        data_pool.id,      # Pass ID directly
                        data_pool.name     # Pass Name directly
                    )
                    all_bridge_links.extend(bridge_links)

                    # 5. Metadata & Lineage
                    metadata = {
                        'data_model_id': data_model_id, 
                        'data_model_name': data_model.name,
                        'data_pool_id': data_pool.id,
                        'data_pool_name': data_pool.name,
                        'root_id': package.id, 
                        'space_id': space.id,
                        'knowledge_model_key': knowledge_model.key,
                        'knowledge_model_id': knowledge_model.id
                    }

                    attribute_details, kpi_details = get_km_attribute_details(knowledge_model, data_model)
                    lineage = get_lineage(c, knowledge_model.root_with_key)
                    
                    if lineage and 'viewUsages' in lineage:
                        update_with_view_usages(lineage, usage, metadata, attribute_details, kpi_details)
                        update_with_km_usages(lineage, usage, metadata, attribute_details, kpi_details)
                        all_metadata.append(metadata)
                        processed_km_count += 1
                        print(f"      Success: {knowledge_model.name}")
                    else:
                        print(f"      Skipped (No Lineage): {knowledge_model.name}")

                else:
                    # DM ID exists but wasn't found in our pre-scan (permission issue?)
                    error_km_count += 1

            except Exception as e:
                # DEBUG PRINT ENABLED
                print(f"    Error processing KM '{knowledge_model.name}': {e}")
                error_km_count += 1

print(f"\nProcessing complete!")
print(f"Successfully processed {processed_km_count} knowledge models")

df_studio = pd.DataFrame(usage)
df_bridge = pd.DataFrame(all_bridge_links).drop_duplicates()
df_bridge.to_csv('bridge_lineage_mapping.csv', index=False)
print(f"Saved bridge_lineage_mapping.csv with {len(df_bridge)} rows")

Step 1: Pre-mapping Data Models to Data Pools...
Mapped 27 Data Models. Starting Scan...
Space: ahilmer - lineage test
  Package: Monitoring
ERROR: No valid Data Model Variable
ERROR: No valid Data Model Variable
ERROR: No valid Data Model Variable
Space: Flo
  Package: Monitoring
ERROR: No valid Data Model Variable
ERROR: No valid Data Model Variable
ERROR: No valid Data Model Variable
Space: FS
  Package: Data Lineage
    ...Caching tables for pool: Monitoring Pool
      Success: KM - Data Lineage
Space: Paula's Space
  Package: Data Lineage - Test
    ...Caching tables for pool: test pool - data lineage app [PC]
      Success: KM
  Package: Data Lineage - Monitoring Pool version
      Success: KM - Data Lineage
  Package: Monitoring - Data Lineage
      Success: KM - Data Lineage
  Package: Monitoring - Data Lineage Sarvesh Test
    ...Caching tables for pool: Sarvesh Monitoring Pool
      Success: KM - Data Lineage
      Success: Studio Lineage
  Package: Demo
    ...Caching tables

## Data Models Loads - Extract all data

In [24]:
all_metadata

[{'data_model_id': 'da1f57a6-3485-4eb3-944d-12825f56687e',
  'data_model_name': 'Data Lineage Monitoring',
  'data_pool_id': '831db6ab-da5e-4d57-9967-c97f4320d2d6',
  'data_pool_name': 'Monitoring Pool',
  'root_id': '34ca0a04-a73c-4ea8-ba73-f1863cf8471d',
  'space_id': '24b90297-f60c-4ad2-b700-87ad401b9961',
  'knowledge_model_key': 'km-data-lineage',
  'knowledge_model_id': 'dd35ad1d-5ba1-44dc-b9a0-533587643b35'},
 {'data_model_id': '1e846159-f3a9-42a3-9d91-b3c7c0c7f904',
  'data_model_name': 'data lineage - test',
  'data_pool_id': 'ebdcfc35-b1c0-4a8c-bf5c-ae3c5f995bab',
  'data_pool_name': 'test pool - data lineage app [PC]',
  'root_id': 'c9b98018-21f1-4f3b-86b0-e1ba393125a5',
  'space_id': '4e7bf097-7bea-48e5-af3d-035576e47309',
  'knowledge_model_key': 'dm_data_lineage_test-km',
  'knowledge_model_id': '59b95b69-c4b2-4696-b4a7-b1a4cf1235b5'},
 {'data_model_id': 'da1f57a6-3485-4eb3-944d-12825f56687e',
  'data_model_name': 'Data Lineage Monitoring',
  'data_pool_id': '831db6ab-da5

In [25]:
# Create a unique set of data model IDs we need to find
unique_data_model_ids = {item['data_model_id'] for item in all_metadata}
print(f"Found {len(unique_data_model_ids)} unique Data Models to scan for.")
unique_data_model_ids

Found 11 unique Data Models to scan for.


{'088283c6-9918-4b59-9a4b-96d9cb38bb15',
 '0ad397d5-ca2a-4a57-a0ab-76dc042117a0',
 '13483c35-4b6a-4e90-b06c-9230b25e7b02',
 '1c366528-fd3e-479a-b304-63aa1c0db354',
 '1e846159-f3a9-42a3-9d91-b3c7c0c7f904',
 '24562740-9c61-4566-93ac-21f23f95a157',
 '5575dbb7-41c4-4828-9fcc-314c7a771410',
 '80fea3cc-e1ba-4bbf-a29a-da1d2e50557f',
 '904789ed-fa95-4ee4-81fa-b1cb1a8987e2',
 'bbcfb903-9134-433a-9c91-08fb75ad3d3f',
 'da1f57a6-3485-4eb3-944d-12825f56687e'}

In [26]:
# Create a unique set of data model IDs we need to find
unique_KM_ids = {item['knowledge_model_id'] for item in all_metadata}
print(f"Found {len(unique_KM_ids)} unique Knowledge Models to scan for.")
unique_KM_ids

Found 15 unique Knowledge Models to scan for.


{'11ede6d9-76bc-458e-b2e7-4f6a83f8f988',
 '25d4909d-d74a-45ca-96af-0d3044dd6e00',
 '423975bc-05cb-45fe-b0e3-58cb7cd1c4ed',
 '4d63f4e9-1e92-454e-a128-f161f43f18d6',
 '59b95b69-c4b2-4696-b4a7-b1a4cf1235b5',
 '6fb03d9f-d533-4eb9-bcd3-3560729240f5',
 '75b1bc36-70b5-4f57-91d7-bbdef6ed410c',
 '75fa6a46-61c2-455b-b04b-c4e42762ce5a',
 '7de213a8-ba42-4b92-8d16-ab35f98a8b38',
 '88efd602-7ee7-4fb6-97b3-5e1bb9f113f3',
 '97ac8f46-c6f4-4b60-ba05-e7bf59863ddf',
 'b2afef54-36f0-438c-9074-f92929c0b606',
 'c23d579b-e085-4181-83cc-6a756e3cea7f',
 'dba5e290-fa7e-463e-8d1f-267762c01a41',
 'dd35ad1d-5ba1-44dc-b9a0-533587643b35'}

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_datamodel_metadata(all_metadata, dm_id_mapping):
    """
    Optimized version:
    1. Uses pre-fetched dm_id_mapping to avoid re-fetching Data Pools/Models.
    2. Uses Threading to fetch columns for multiple tables in parallel.
    """
    print("Starting Optimized Data Model Scan...")
    
    # 1. Identify unique Data Model IDs we need to process
    unique_dm_ids = set(m['data_model_id'] for m in all_metadata if m.get('data_model_id'))
    print(f"Found {len(unique_dm_ids)} unique Data Models to scan.")

    final_data = []

    # Helper function for threading
    def process_table_columns(table, dm_obj, pool_obj):
        """Fetches columns for a single table. Runs in a thread."""
        rows = []
        try:
            # This is the slow API call we are parallelizing
            columns = table.get_columns() 
            
            for column in columns:
                unique_id = f"{dm_obj.id}.{table.name}.{column.name}".lower()
                rows.append({
                    "unique_id": unique_id,
                    "d_pool_id": pool_obj.id,
                    "d_pool_name": pool_obj.name,
                    "d_model_id": dm_obj.id,
                    "d_model_name": dm_obj.name,
                    "table_name": table.name,
                    "column_name": column.name
                })
        except Exception as e:
            # print(f"Warning: Error fetching columns for table {table.name}: {e}")
            pass
        return rows

    # 2. Main Loop
    for dm_id in unique_dm_ids:
        # Fast Lookup from previous step
        cached_entry = dm_id_mapping.get(dm_id)
        
        if not cached_entry:
            print(f"  Skipping DM {dm_id} (Not found in cache - permission issue?)")
            continue
            
        data_model = cached_entry['model']
        data_pool = cached_entry['pool']
        
        print(f"  Scanning Data Model: {data_model.name}...")

        try:
            # Get all tables (usually one API call)
            tables = data_model.get_tables()
            
            # 3. Parallel Execution for Columns
            # We fetch columns for up to 10 tables at once
            with ThreadPoolExecutor(max_workers=10) as executor:
                futures = []
                for table in tables:
                    futures.append(executor.submit(process_table_columns, table, data_model, data_pool))
                
                for future in as_completed(futures):
                    result_rows = future.result()
                    final_data.extend(result_rows)
                    
        except Exception as e:
            print(f"    Error accessing tables for DM {data_model.name}: {e}")
            continue

    # Convert to DataFrame
    print(f"\nData Model scan complete. Found {len(final_data)} columns.")
    return pd.DataFrame(final_data).drop_duplicates(subset=['unique_id'], keep='first')

In [None]:
# --- EXECUTION ---
# Pass the 'dm_id_mapping' we created in the beginning
df_data_models = get_datamodel_metadata(all_metadata, dm_id_mapping)

# Save
df_data_models.to_csv('data_models.csv', index=False)
print("Saved data_models.csv")

Starting Optimized Data Model Scan...
Found 11 unique Data Models to scan.
  Scanning Data Model: data lineage - test...
  Scanning Data Model: Data Lineage Monitoring - Latest_V2...
  Scanning Data Model: full lineage...
  Scanning Data Model: full_lineage...
  Scanning Data Model: Data Lineage Minotring...
  Scanning Data Model: tomas_datamodel...
  Scanning Data Model: Data Lineage Monitoring...
  Scanning Data Model: test:perspective_celonis_AccountsReceivable...
  Scanning Data Model: Data Lineage Monitoring - Latest...
  Scanning Data Model: dummy_peopleDM...
  Scanning Data Model: Test Data Model...

Data Model scan complete. Found 1933 columns.
Saved data_models.csv


## Studio Lineage - Action Flows

### Extract Blueprints

In [None]:
def download_blueprint_api(
    source, # This should be a pycelonis connection object
    package, # This should be a pycelonis package object
    node_data: dict,
    source_space: object, # This is your custom config object
    published: bool,
) -> None:

    """Exports blueprint and stores it in blueprint folder with User key

    Args:
        package: package where blueprint is
        node_data: dict containing serializedContent
        source_space: config object with blueprint paths
        published: whether to get published or draft version
    """

    def extract_blueprint_api(
        source,
        package,
        parsed_node_data: dict,  # Already parsed JSON
        published: bool,
    ) -> dict:
        """Function extracts the blueprint given the Action Flow data

        Args:
            source: Celonis object for source Team
            package: Celonis object for package where the Action Flow is stored
            parsed_node_data: Already parsed node data from serializedContent

        Returns:
            a json with the blueprint of the Action Flow
        """

        def get_blueprint_versions(source, package, parsed_node_data):
            base_url = source.client.base_url
            blueprint_url = f"{base_url}/ems-automation/api/root/{parsed_node_data['rootNodeId']}/asset/{parsed_node_data['key']}/proxy/api/v2/scenarios/{parsed_node_data['scenarioId']}/blueprints"
            blueprints_version = source.client.request(
                url=blueprint_url, method="get", parse_json=True
            )
            return blueprints_version

        def convert_to_timestamp(timestamp_string):
            timestamp = datetime.strptime(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ")
            return int(timestamp.timestamp())

        base_url = source.client.base_url

        # Get the blueprint's versions
        blueprints = get_blueprint_versions(source, package, parsed_node_data)
        blueprints["scenariosBlueprints"] = sorted(
            blueprints["scenariosBlueprints"],
            key=lambda x: x["created"],
            reverse=True,
        )

        blueprint_url = f"{base_url}/ems-automation/api/root/{parsed_node_data['rootNodeId']}/asset/{parsed_node_data['key']}/proxy/api/v2/scenarios/{parsed_node_data['scenarioId']}/blueprint"
        
        if published:
            # try getting the published version
            blueprint_info = [
                b for b in blueprints["scenariosBlueprints"] if not b["draft"]
            ]
            # if not possible -> get the draft version
            if not blueprint_info:
                blueprint_info = [
                    b for b in blueprints["scenariosBlueprints"] if b["draft"]
                ]
        else:
            # try getting the draft version
            blueprint_info = [
                b for b in blueprints["scenariosBlueprints"] if b["draft"]
            ]
            # if not possible -> get the published version
            if not blueprint_info:
                blueprint_info = [
                    b for b in blueprints["scenariosBlueprints"] if not b["draft"]
                ]

        if blueprint_info:
            id_timestamp = convert_to_timestamp(blueprint_info[0].get("created"))
            query_url = (
                f"blueprintId={blueprint_info[0].get('version')}&_={id_timestamp}"
            )

            blueprint = source.client.request(
                url=f"{blueprint_url}?{query_url}", method="get", parse_json=True
            )

            if "response" in blueprint:
                return blueprint["response"]["blueprint"]

    # Parse the serialized content ONCE at the beginning
    parsed_node_data = json.loads(node_data["serializedContent"])
    
    # Extract the key from parsed data
    AF_key = parsed_node_data["key"]
    
    # Get the blueprint using the parsed data
    blueprint = extract_blueprint_api(source, package, parsed_node_data, published)
    
    # All blueprints (both draft and published) go directly into "blueprints" folder
    bp_path = "blueprints"
        
    if not blueprint:
        # Create a blank blueprint
        blueprint = json.loads(
            """
            {
                "flow": [
                    {
                        "id": null,
                        "module": "placeholder:Placeholder",
                        "metadata": {
                            "designer": {
                                "x": 0,
                                "y": 0
                            }
                        }
                    }
                ],
                "metadata": {
                    "instant": false,
                    "version": 1,
                    "scenario": {
                        "roundtrips": 1,
                        "maxErrors": 3,
                        "autoCommit": true,
                        "autoCommitTriggerLast": true,
                        "sequential": false,
                        "confidential": false,
                        "dataloss": false,
                        "dlq": false
                    },
                    "designer": {
                        "orphans": []
                    },
                    "zone": "integromat.try.k8s.celonis.cloud"
                }
            }"""
        )
    
    else:
        file_name = f"{AF_key}&{package.key}.json"
        print(f"Blueprint extracted. Saving to: {file_name}")

    with open(bp_path + f"/{AF_key}&{package.key}.json", "w") as out:
        json.dump(blueprint, out)

In [None]:
def download_all_blueprints():
    """
    Download all Action Flow blueprints from all spaces and packages.
    First deletes the existing blueprints directory and then creates a fresh one.
    """
    print("-"*80)
    print("DOWNLOADING ACTION FLOW BLUEPRINTS")
    print("-"*80)
    
    # Delete existing blueprints directory if it exists
    if os.path.exists("blueprints"):
        print("Removing existing blueprints directory...")
        shutil.rmtree("blueprints")
        print("Existing blueprints directory removed.")
    
    # Create fresh blueprints directory
    print("Creating new blueprints directory...")
    os.makedirs("blueprints")
    
    total_blueprints_downloaded = 0
    
    # Iterate through all spaces and packages
    for space in c.studio.get_spaces():
        print(f"\nSpace: {space.name}")
        
        for package in space.get_packages():
            print(f"  Package: {package.name}")
            
            try:
                # Get all nodes in the package
                nodes = package.get_content_nodes()
                
                # Filter for SCENARIO nodes (Action Flows)
                scenario_nodes = [node for node in nodes if hasattr(node, 'asset_type') and node.asset_type == 'SCENARIO']
                
                if not scenario_nodes:
                    continue
                
                print(f"    Found {len(scenario_nodes)} Action Flow(s)")
                
                # Process each Action Flow
                for node in scenario_nodes:
                    try:
                        print(f"      Processing: {node.name}")
                        
                        # Prepare node_data dict
                        node_data = {
                            'serializedContent': node.serialized_content
                        }
                        
                        # Download blueprint
                        download_blueprint_api(
                            source=c,
                            package=package,
                            node_data=node_data,
                            source_space=None,
                            published=False
                        )
                        
                        total_blueprints_downloaded += 1
                        print(f"        Blueprint downloaded")
                        
                    except Exception as e:
                        print(f"        Error: {str(e)[:100]}")
                
            except Exception as e:
                print(f"    Error accessing package: {e}")
    
    print(f"\n{'-'*80}")
    print(f"SUMMARY: Downloaded {total_blueprints_downloaded} blueprints")
    print(f"{'-'*80}")

In [None]:
# Execute
download_all_blueprints()

--------------------------------------------------------------------------------
DOWNLOADING ACTION FLOW BLUEPRINTS
--------------------------------------------------------------------------------
Removing existing blueprints directory...
Existing blueprints directory removed.
Creating new blueprints directory...

Space: ahilmer - lineage test
  Package: Monitoring

Space: Flo
  Package: Monitoring

Space: FS
  Package: Data Lineage

Space: Paula's Space
  Package: Data Lineage - Test
    Found 1 Action Flow(s)
      Processing: action_flow_test
Blueprint extracted. Saving to: 08ae264c_d22b_4da9_b80f_c25fc9eb004a.action-flow-test&08ae264c_d22b_4da9_b80f_c25fc9eb004a.json
        Blueprint downloaded
  Package: Data Lineage - Monitoring Pool version
  Package: Monitoring - Data Lineage
  Package: Monitoring - Data Lineage Sarvesh Test
  Package: Demo
  Package: Monitoring - Data Lineage Copy
  Package: Data Lineage - New Version
  Package: Celonis Data Lineage Latest
  Package: full lin

### Extract celonis modules from blueprints

In [None]:
def get_celonis_module_positions(blueprint: str) -> list:
    """
    Recursively searches through entire JSON structure to find all modules 
    that start with 'celonis:', regardless of nesting depth.
    
    Args:
        blueprint: Path to the JSON file

    Returns:
        List of dictionaries containing module data for each Celonis module
    """
    def find_celonis_modules(obj, path=""):
        """Recursively search for celonis modules in nested structures"""
        modules = []
        
        if isinstance(obj, dict):
            # Check if this dict has a 'module' key starting with 'celonis:'
            if 'module' in obj and isinstance(obj['module'], str) and obj['module'].startswith('celonis:'):
                # Found a Celonis module - store the entire object
                modules.append({
                    'data': obj,
                    'path': path,
                    'module': obj['module'],
                    'id': obj.get('id')
                })
            
            # Recursively search all values in this dict
            for key, value in obj.items():
                new_path = f"{path}.{key}" if path else key
                modules.extend(find_celonis_modules(value, new_path))
        
        elif isinstance(obj, list):
            # Recursively search all items in this list
            for i, item in enumerate(obj):
                new_path = f"{path}[{i}]"
                modules.extend(find_celonis_modules(item, new_path))
        
        return modules
    
    try:
        with open(blueprint, 'r') as f:
            data = json.load(f)
        
        # Start recursive search from root
        celonis_modules = find_celonis_modules(data)
        
        return celonis_modules
    
    except Exception as e:
        print(f"Error reading file: {e}")
        import traceback
        traceback.print_exc()
        return []

In [None]:
def extract_celonis_module_details(blueprint: str) -> list:
    """
    Extracts details from Celonis modules in an Action Flow blueprint.
    Recursively searches through entire JSON structure.
    
    Args:
        blueprint: Path to the blueprint JSON file
        
    Returns:
        List of dictionaries containing extracted details for each Celonis module
    """
    try:
        # Get all Celonis modules (regardless of nesting)
        celonis_modules = get_celonis_module_positions(blueprint)
        
        module_details = []
        
        for module_info in celonis_modules:
            module_data = module_info['data']
            module_name = module_info['module']
            mapper = module_data.get('mapper', {})
            metadata = module_data.get('metadata', {})  
            
            detail = {
                'path': module_info['path'],
                'module': module_name,
                'id': module_info['id'],
                'mapper': mapper,      
                'metadata': metadata   
            }
            
            # Rule 1: celonis:getKPIs
            if module_name == 'celonis:getKPIs':
                detail['kpis'] = mapper.get('kpis', [])
                detail['knowledgeModelKey'] = mapper.get('knowledgeModelKey', '')
            
            # Rule 2: celonis:getRows
            elif module_name == 'celonis:getRows':
                km_columns = mapper.get('kmColumns', [])
                column_names = [col.get('columnName', '') for col in km_columns]
                detail['column_names'] = column_names
                detail['dataOrKnowledgeModel'] = mapper.get('dataOrKnowledgeModel', '')
                detail['knowledgeModelKey'] = mapper.get('knowledgeModelKey', '')  
            
            # Rule 3: celonis:updateAugmentedAttribute
            elif module_name in ['celonis:updateAugmentedAttribute', 'celonis:updateAugmentedAttributeV2']:
                detail['record'] = mapper.get('record', '')
                detail['augmentedAttributeId'] = mapper.get('augmentedAttributeId', '')                
                detail['knowledgeModelKey'] = mapper.get('knowledgeModelKey', '')
            
            
            # Rule 4: celonis:queryData
            elif module_name == 'celonis:queryData':
                columns = mapper.get('columns', [])
                column_details = []
                for col in columns:
                    column_name = col.get('columnName', '')
                    column_formula = col.get('formula', '')
                    column_details.append({
                        'columnName': column_name,
                        'formula': column_formula
                    })
                
                detail['columns'] = column_details
                detail['dataPool'] = mapper.get('dataPool', '')
                detail['dataModel'] = mapper.get('dataModel', '')
                
                # Extract only filterExpression from each filter
                filters = mapper.get('filter', [])
                filter_expressions = [f.get('filterExpression', '') for f in filters]
                detail['filterExpression'] = filter_expressions
            
            module_details.append(detail)
        
        return module_details
    
    except Exception as e:
        print(f"Error extracting module details: {e}")
        import traceback
        traceback.print_exc()
        return []

### Extract pql (e.g. if you have a celonis:extractQuery)

In [None]:
def extract_tables_and_columns_from_pql(pql: str) -> dict:
    """
    Extracts table and column references from a PQL query.
    
    Args:
        pql: PQL query string
        
    Returns:
        Dictionary with:
        - 'table_column_pairs': List of tuples (table, column)
        - 'tables_only': List of table names without column references
    """
    
    # Pattern for "TABLE"."COLUMN"
    table_column_pattern = r'"([^"]+)"\s*\.\s*"([^"]+)"'
    
    # Find all table.column pairs
    table_column_matches = re.findall(table_column_pattern, pql)
    
    # Pattern for standalone "TABLE" 
    table_only_pattern = r'(?<!\.\s)(?<!\.)"([A-Z_][A-Z0-9_]*)"(?!\s*\.\s*")'
    
    # Find all standalone tables
    table_only_matches = re.findall(table_only_pattern, pql)
    
    # Remove tables that are already in table.column pairs
    tables_in_pairs = {table for table, _ in table_column_matches}
    # Also remove columns that appear in table.column pairs
    columns_in_pairs = {column for _, column in table_column_matches}
    
    tables_only = [table for table in table_only_matches 
                   if table not in tables_in_pairs and table not in columns_in_pairs]
    
    return {
        'table_column_pairs': list(set(table_column_matches)),  # Remove duplicates
        'tables_only': list(set(tables_only))  # Remove duplicates
    }

### Execution Block for the Action Flows

In [None]:
# =============================================================================
# 1. PRE-FETCH: Build lookup dictionary
# =============================================================================
km_lookup = {item['knowledge_model_key']: item for item in all_metadata}
print(f"Found {len(km_lookup)} Knowledge Models in lookup.")

# List to store all lineage relationships
af_lineage = []

# Get all blueprint JSON files
blueprint_files = glob.glob('blueprints/*.json')
print(f"Processing {len(blueprint_files)} blueprint files...")

for blueprint_path in blueprint_files:
    try:
        # --- 1. FILENAME PARSING ---
        filename = os.path.basename(blueprint_path)
        clean_filename = filename.replace('.json', '')
        
        # Regex to find a UUID (Action Flow ID)
        uuid_match = re.search(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', clean_filename)
        af_id = uuid_match.group(0) if uuid_match else clean_filename

        # --- 2. GET NAME FROM JSON ---
        try:
            with open(blueprint_path, 'r') as f:
                blueprint_data = json.load(f)
                af_name = blueprint_data.get('name', 'Unknown_Action_Flow')
        except Exception:
            af_name = 'Unknown_Action_Flow'
            blueprint_data = {}

        # Call your existing function
        module_details = extract_celonis_module_details(blueprint_path)
        
        if not module_details:
            continue
        
        relationships = []
        
        for detail in module_details:
            module_name = detail['module']
            module_number = detail.get('id', 'unknown') 
            
            # --- CLEAN MODULE TYPE NAME ---
            # Converts "celonis:getRows" -> "getRows"
            clean_type = module_name.split(':')[-1]
            
            # Rule 1: celonis:getKPIs
            if module_name == 'celonis:getKPIs':
                km_key = detail.get('knowledgeModelKey', '')
                if km_key.startswith('KNOWLEDGE_MODEL-'):
                    km_key = km_key.replace('KNOWLEDGE_MODEL-', '', 1)
                    
                for kpi in detail.get('kpis', []):
                    relationships.append({
                        'source_id': f'{kpi}'.lower(),
                        'source_name': kpi,
                        'source_attr': None,
                        'source_type': 'KPI',
                        'source_node_type': '',
                        'source_asset_id': km_key,
                        'source_asset_type': 'KNOWLEDGE_MODEL',
                        'target_module_id': module_number,
                        'target_module_type': clean_type, # Pass clean type
                        'pql': '',
                        'km_key': km_key,
                        'dm_id': '',
                        'dp_id': ''
                    })
            
            # Rule 2: celonis:getRows
            elif module_name == 'celonis:getRows':
                km_key = detail.get('knowledgeModelKey', '') or detail.get('dataOrKnowledgeModel', '')
                if km_key.startswith('KNOWLEDGE_MODEL-'):
                    km_key = km_key.replace('KNOWLEDGE_MODEL-', '', 1)

                for column in detail.get('column_names', []):
                    if '.' in column:
                        parts = column.split('.', 1)
                        source_name = parts[0]
                        source_attr = parts[1] if len(parts) > 1 else None
                    else:
                        source_name = column
                        source_attr = None
                    
                    relationships.append({
                        'source_id': f'{column}'.lower(),
                        'source_name': source_name.lower(),
                        'source_attr': source_attr.lower() if source_attr else None,
                        'source_type': 'KM_COLUMN',
                        'source_node_type': 'ATTRIBUTES',
                        'source_asset_id': km_key,
                        'source_asset_type': 'KNOWLEDGE_MODEL',
                        'target_module_id': module_number,
                        'target_module_type': clean_type,
                        'pql': '',
                        'km_key': km_key,
                        'dm_id': '',
                        'dp_id': ''
                    })
            
            # Rule 3: celonis:updateAugmentedAttribute (V1 & V2)
            elif 'updateAugmentedAttribute' in module_name:
                mapper = detail.get('mapper', {})
                metadata = detail.get('metadata', {})
                
                km_key = mapper.get('knowledgeModelKey', '')
                aug_attr_id_full = mapper.get('augmentedAttributeId', '')  
                
                # --- EXTRACT LABEL FROM METADATA ---
                aug_attr_label = ''
                try:
                    restore_data = metadata.get('restore', {})
                    aug_attr_metadata = restore_data.get('expect', {}).get('augmentedAttributeId', {})
                    aug_attr_label = aug_attr_metadata.get('label', '')  # "Aug Atts Test"
                except:
                    pass
                
                # --- PARSE THE AUGMENTED ATTRIBUTE ID ---
                record = ''
                attr_name = ''
                
                if aug_attr_id_full and '.' in aug_attr_id_full:
                    parts = aug_attr_id_full.split('.')
                    if len(parts) >= 3:
                        record = parts[1].lower()  # "vbak"
                
                # --- BUILD THE ATTRIBUTE NAME ---
                if aug_attr_label:
                    # Convert "Aug Atts Test" -> "aug_atts_test"
                    attr_name_base = aug_attr_label.lower().replace(' ', '_')
                    # Add "aug_" prefix -> "aug_aug_atts_test"
                    attr_name = f"aug_{attr_name_base}"
                
                # --- BUILD THE FINAL SOURCE ID ---
                # Format: "record_attr_name.value" (with .value suffix!)
                # Example: "vbak_aug_aug_atts_test.value"
                if record and attr_name:
                    source_id = f"{record}_{attr_name}.value"  
                    full_aug_attr = f"{record}_{attr_name}"    # Keep without .value for UNIQUE_SOURCE_ID
                else:
                    # Fallback to the full ID if parsing fails
                    source_id = aug_attr_id_full.lower()
                    full_aug_attr = aug_attr_id_full.lower()

                if km_key.startswith('KNOWLEDGE_MODEL-'):
                    km_key = km_key.replace('KNOWLEDGE_MODEL-', '', 1)
                if '.' in km_key:
                    km_key = km_key.split('.')[-1]

                # 1. READ Flow: KM Attribute -> Action Flow
                relationships.append({
                    'source_id': source_id,           
                    'source_name': f"{record}_{attr_name}",           
                    'source_attr': "value",  
                    'source_type': 'AUGMENTED_ATTRIBUTE',
                    'source_node_type': 'ATTRIBUTES',
                    'source_asset_id': km_key,
                    'source_asset_type': 'KNOWLEDGE_MODEL',
                    'target_module_id': module_number,
                    'target_module_type': clean_type,
                    'pql': '',
                    'km_key': km_key,
                    'dm_id': '',
                    'dp_id': '',
                    'reverse': False
                })

                # 2. WRITE Flow: Action Flow -> KM Attribute
                # FIX: Keep 'target_module_id' as the Module Number so ID generation works
                relationships.append({
                    'source_id': source_id,                # Same Attribute ID
                    'source_name': f"{record}_{attr_name}",# Same Attribute Name
                    'source_attr': "value",                # Same Attribute Value
                    'source_type': 'AUGMENTED_ATTRIBUTE',
                    'source_node_type': 'ATTRIBUTES',
                    'source_asset_id': km_key,
                    'source_asset_type': 'KNOWLEDGE_MODEL',
                    'target_module_id': module_number,     # <--- KEEP THIS AS AF MODULE ID
                    'target_module_type': clean_type,
                    'pql': '',
                    'km_key': km_key,
                    'dm_id': '',
                    'dp_id': '',
                    'reverse': True
                })
            
            
            # Rule 4: celonis:queryData
            elif module_name == 'celonis:queryData':
                dp_id = detail.get('dataPool', '')
                dm_id = detail.get('dataModel', '')
                
                def add_dm_rel(src_id, src_name, src_attr, node_type, pql_str):
                    relationships.append({
                        'source_id': src_id.lower(),
                        'source_name': src_name.lower(),
                        'source_attr': src_attr.lower() if src_attr else None,
                        'source_type': 'DATA_MODEL',
                        'source_node_type': node_type,
                        'source_asset_id': dm_id,
                        'source_asset_type': 'DATA_MODEL',
                        'target_module_id': module_number,
                        'target_module_type': clean_type,
                        'pql': pql_str,
                        'km_key': '',
                        'dm_id': dm_id,
                        'dp_id': dp_id
                    })

                for col in detail.get('columns', []):
                    formula = col.get('formula', '')
                    if formula:
                        pql_refs = extract_tables_and_columns_from_pql(formula)
                        for table, column in pql_refs['table_column_pairs']:
                            add_dm_rel(f'{table}.{column}', table, column, 'TABLE_COLUMN', formula)
                        for table in pql_refs['tables_only']:
                            add_dm_rel(f'{table}', table, None, 'TABLE', formula)
                
                for filter_expr in detail.get('filterExpression', []):
                    if filter_expr:
                        pql_refs = extract_tables_and_columns_from_pql(filter_expr)
                        for table, column in pql_refs['table_column_pairs']:
                            add_dm_rel(f'{table}.{column}', table, column, 'TABLE_COLUMN', filter_expr)
                        for table in pql_refs['tables_only']:
                            add_dm_rel(f'{table}', table, None, 'TABLE', filter_expr)

        # -------------------------------------------------------------
        # 3. CONVERT TO FINAL FORMAT
        # -------------------------------------------------------------
        for rel in relationships:
            is_reversed = rel.get('reverse', False)
            km_key = rel.get('km_key', '')
            
            meta = km_lookup.get(km_key, {})
            km_id = meta.get('knowledge_model_id', '') if km_key else ''
            dm_id = rel['dm_id'] if rel['dm_id'] else meta.get('data_model_id', '')
            dp_id = rel['dp_id'] if rel['dp_id'] else meta.get('data_pool_id', '')
            
            module_id = rel['target_module_id']
            module_type = rel['target_module_type']

            # --- ID CONSTRUCTION (Updated with Source Info) ---
            # Format: ActionFlowID.ActionFlowName.ModuleID.SourceName
            source_part = rel['source_name'].replace(' ', '_') if rel['source_name'] else 'unknown'
            af_unique_node_id = f"{af_id}.{af_name}.{module_id}".lower()
            
            # --- TARGET NAME CONSTRUCTION ---
            # Format: "Module 1 (getRows)"
            target_node_name = f"Module {module_id} ({module_type})"

            if is_reversed:
                # AF -> KM (Augmented Attribute Update)
                # SOURCE is now the Action Flow (calculated correctly above)
                unique_source_id = af_unique_node_id
                
                # TARGET is now the KM Attribute (using the stored source info)
                unique_target_id = f'{km_id}.{rel["source_id"]}'.lower() if km_id else f'{km_key}.{rel["source_id"]}'.lower()
                
                af_lineage.append({
                    'UNIQUE_SOURCE_ID': unique_source_id,
                    'UNIQUE_TARGET_ID': unique_target_id,
                    'SOURCE_ID': af_id,
                    'SOURCE_NAME': f"{af_name}",
                    'SOURCE_ATTRIBUTE': target_node_name,
                    'SOURCE_NODE_TYPE': 'ACTION_FLOW',
                    'SOURCE_TYPE': 'ACTION_FLOW',
                    'SOURCE_PQL': rel['pql'],
                    'SOURCE_STUDIO_ASSET_ID': af_id,
                    'SOURCE_STUDIO_ASSET_TYPE': 'ACTION_FLOW',
                    
                    # FIX: Map TARGET columns to the original Source details
                    'TARGET_ID': rel['source_id'],      # Attribute ID
                    'TARGET_NAME': rel['source_name'],  # Attribute Name
                    'TARGET_ATTRIBUTE': rel['source_attr'], # "value"
                    
                    'TARGET_NODE_TYPE': 'AUGMENTED_ATTRIBUTE',
                    'TARGET_STUDIO_ASSET_ID': km_id or km_key,
                    'TARGET_STUDIO_ASSET_TYPE': 'KNOWLEDGE_MODEL',
                    'DATA_SOURCE_ID': '',
                    'KNOWLEDGE_MODEL_KEY': km_key,
                    'KNOWLEDGE_MODEL_ID': km_id,
                    'DATA_MODEL_ID': dm_id,
                    'DATA_POOL_ID': dp_id
                })
            else:
                # KM -> AF
                if km_id:
                    unique_source_prefix = km_id
                elif dm_id:
                    unique_source_prefix = dm_id
                elif km_key:
                    unique_source_prefix = km_key
                else:
                    unique_source_prefix = rel['source_asset_id']
                
                unique_source_id = f'{unique_source_prefix}.{rel["source_id"]}'.lower() if unique_source_prefix else rel['source_id'].lower()
                unique_target_id = af_unique_node_id
                
                af_lineage.append({
                    'UNIQUE_SOURCE_ID': unique_source_id,
                    'UNIQUE_TARGET_ID': unique_target_id,
                    'SOURCE_ID': rel['source_id'],
                    'SOURCE_NAME': rel['source_name'],
                    'SOURCE_ATTRIBUTE': rel['source_attr'],
                    'SOURCE_NODE_TYPE': rel['source_node_type'],
                    'SOURCE_TYPE': rel['source_type'],
                    'SOURCE_PQL': rel['pql'],
                    'SOURCE_STUDIO_ASSET_ID': km_id or rel['source_asset_id'],
                    'SOURCE_STUDIO_ASSET_TYPE': rel['source_asset_type'],
                    'TARGET_ID': str(module_id),
                    'TARGET_NAME': f'{af_name} ({module_id})', 
                    'TARGET_ATTRIBUTE': target_node_name,
                    'TARGET_NODE_TYPE': 'ACTION_FLOW',
                    'TARGET_STUDIO_ASSET_ID': af_id,
                    'TARGET_STUDIO_ASSET_TYPE': 'ACTION_FLOW',
                    'DATA_SOURCE_ID': '',
                    'KNOWLEDGE_MODEL_KEY': km_key,
                    'KNOWLEDGE_MODEL_ID': km_id,
                    'DATA_MODEL_ID': dm_id,
                    'DATA_POOL_ID': dp_id
                })
    
    except Exception as e:
        print(f"Error processing {filename}: {str(e)}")
        continue

print(f"\nExtracted {len(af_lineage)} lineage records from Action Flows")

# Create DataFrame
df_af_lineage = pd.DataFrame(af_lineage)

if not df_af_lineage.empty:
    df_af_lineage['UNIQUE_KEY'] = (
        df_af_lineage['DATA_MODEL_ID'].fillna('') + '.' + 
        df_af_lineage['SOURCE_NAME'].fillna('') + '.' + 
        df_af_lineage['SOURCE_ATTRIBUTE'].fillna('')
    ).str.lower()

df_af_lineage.to_csv('action_flow_lineage.csv', index=False)
print(f"Saved to action_flow_lineage.csv ({len(df_af_lineage)} records)")

Found 10 Knowledge Models in lookup.
Processing 3 blueprint files...

Extracted 16 lineage records from Action Flows
Saved to action_flow_lineage.csv (16 records)


## Append KM + Views + Action Flows

In [None]:
df_combined = pd.concat([df_studio, df_af_lineage], ignore_index=True)

# 3. Save the complete lineage table
df_combined.to_csv('lineage_studio.csv', index=False)

print("Saved lineage_studio.csv")

Saved lineage_studio.csv


## JOIN Data Models data with Studio Lineage Data

In [None]:
## JOIN Data Models data with Studio Lineage Data

# --- 1. DEDUPLICATE INVENTORY (Essential Primary Key Fix) ---
# Ensure the left table has a unique key (it should be 1:1)
df_data_models = df_data_models.drop_duplicates(subset=['unique_id'], keep='first')
print(f"df_data_models rows (Unique Inventory): {len(df_data_models)}")


# --- 2. PREPARE STUDIO USAGE (Group usage data before merge) ---
df_studio['JOIN_KEY'] = (
    df_studio['DATA_MODEL_ID'] + '.' + 
    df_studio['SOURCE_NAME'] + '.' + 
    df_studio['SOURCE_ATTRIBUTE']
).str.lower()

# Create a clean list of ONLY the unique keys that were used (deduplicating multiple uses)
df_studio_unique_usage = df_studio.drop_duplicates(subset=['JOIN_KEY'])
print(f"df_studio_unique rows (Unique Inventory): {len(df_studio_unique_usage)}")

# --- 3. PERFORM LEFT JOIN (Inventory size guaranteed) ---
# Join the unique inventory (left) with the unique list of used keys (right)
df_final = pd.merge(
    df_data_models,
    df_studio_unique_usage[['JOIN_KEY']], # Only need the key from the right side
    left_on='unique_id',
    right_on='JOIN_KEY',
    how='left',  # This guarantees len(df_final) == len(df_data_models)
    indicator=True,
    suffixes=('_dm', '_studio')  # Add suffixes
)

# --- 4. CREATE USED/NOT_USED FLAG ---
df_final['USED_NOT_USED'] = df_final['_merge'].map({
    'both': 'USED',              # Column exists in inventory AND was found in studio usage
    'left_only': 'NOT_USED',     # Column exists in inventory but was NOT found in studio usage
    # 'right_only' is impossible with a Left Join on unique keys
})

# Drop the merge indicator and JOIN_KEY columns (keep unique_id)
df_final = df_final.drop(columns=['_merge', 'JOIN_KEY'])


# --- FINAL COLUMN SELECTION AND CLEANUP ---

# Define the columns we need, using the '_dm' suffix where necessary 
columns_to_keep = [
    'd_pool_id', 
    'd_pool_name', 
    'd_model_id', 
    'd_model_name', 
    'table_name', 
    'column_name', 
    'USED_NOT_USED'
]

# Select and rename columns for the final report
final_output_cols = []
for col in columns_to_keep:
    if col + '_dm' in df_final.columns:
         final_output_cols.append(col + '_dm')
    else:
         final_output_cols.append(col)

# Reorder columns and select only the required ones
df_final = df_final[final_output_cols]

# Save the final result
df_final.to_csv('tables_usage_report.csv', index=False)
print(f"Table_usage_report rows: {len(df_final)}")
print("\n Final complete lineage report saved to 'tables_usage_report.csv'")

df_data_models rows (Unique Inventory): 1869
df_studio_unique rows (Unique Inventory): 480
Table_usage_report rows: 1869

 Final complete lineage report saved to 'tables_usage_report.csv'


In [None]:
df_combined['NODE_TYPE_COMBINED'] = (
    df_combined['SOURCE_NODE_TYPE'].astype(str) + '-' + 
    df_combined['SOURCE_TYPE'].astype(str)
)

# Create complete source nodes list with all columns from studio
df_source_nodes = df_combined[[
    'UNIQUE_SOURCE_ID', 
    'SOURCE_ID', 
    'NODE_TYPE_COMBINED',
    'SOURCE_STUDIO_ASSET_TYPE',
    'SOURCE_STUDIO_ASSET_ID',
    'DATA_MODEL_ID',
    'DATA_MODEL_NAME',
    'DATA_POOL_ID',
    'DATA_POOL_NAME',
    'KNOWLEDGE_MODEL_ID',
    'KNOWLEDGE_MODEL_KEY'
]].rename(columns={
    'UNIQUE_SOURCE_ID': 'node',
    'SOURCE_ID': 'node_name',
    'DATA_POOL_ID': 'data_pool_id',
    'DATA_POOL_NAME': 'data_pool_name',
    'DATA_MODEL_ID': 'data_model_id',
    'DATA_MODEL_NAME': 'data_model_name',
    'KNOWLEDGE_MODEL_ID': 'knowledge_model_id',
    'KNOWLEDGE_MODEL_KEY': 'knowledge_model_key',
    'NODE_TYPE_COMBINED': 'category',
    'SOURCE_STUDIO_ASSET_TYPE': 'asset_type',
    'SOURCE_STUDIO_ASSET_ID': 'asset_id',
})

# Create complete target nodes list
df_target_nodes = df_combined[[
    'UNIQUE_TARGET_ID', 
    'TARGET_NAME',
    'TARGET_NODE_TYPE', 
    'TARGET_STUDIO_ASSET_TYPE',
    'TARGET_STUDIO_ASSET_ID',
    'DATA_MODEL_ID',
    'DATA_MODEL_NAME',
    'DATA_POOL_ID',
    'DATA_POOL_NAME',
    'KNOWLEDGE_MODEL_ID',
    'KNOWLEDGE_MODEL_KEY'
]].rename(columns={
    'UNIQUE_TARGET_ID': 'node',
    'TARGET_NAME': 'node_name',
    'DATA_POOL_ID': 'data_pool_id',
    'DATA_POOL_NAME': 'data_pool_name',
    'DATA_MODEL_ID': 'data_model_id',
    'DATA_MODEL_NAME': 'data_model_name',
    'KNOWLEDGE_MODEL_ID': 'knowledge_model_id',
    'KNOWLEDGE_MODEL_KEY': 'knowledge_model_key',
    'TARGET_NODE_TYPE': 'category',
    'TARGET_STUDIO_ASSET_TYPE': 'asset_type',
    'TARGET_STUDIO_ASSET_ID': 'asset_id',
})

# Combine ALL nodes from lineage
df_mapping_nodes = pd.concat([df_source_nodes, df_target_nodes]).drop_duplicates(subset=['node']).reset_index(drop=True)

# Save and show stats
df_mapping_nodes.to_csv('mapping_nodes_studio.csv')

## Creating Table in Celonis

In [None]:
data_pools = c.data_integration.get_data_pools()
data_pools

[
	DataPool(id='50d46c0a-39c5-4fe5-aa16-6b248fb2ac4f', name='test-pool-2'),
	DataPool(id='c840c791-823a-4d34-9571-f39dae182078', name='Sarvesh Monitoring Pool'),
	DataPool(id='e41f9df7-8011-4408-b63f-0d61ac6b8331', name='MaxPool'),
	DataPool(id='1e5bd13f-8b92-4ec3-8519-b356378840ec', name='LineageTest.dataPool 2025-05-04 13:17:49 [Hop Rod Rye]'),
	DataPool(id='ebdcfc35-b1c0-4a8c-bf5c-ae3c5f995bab', name='test pool - data lineage app [PC]'),
	DataPool(id='f6497957-5eac-49e9-9d44-270417bedaab', name='test-pool'),
	DataPool(id='6cd287bb-c8da-419d-9faa-2b42e8a08ec4', name='OCPM Data Pool'),
	DataPool(id='f5ab5019-fd07-4626-8a32-034b08df6f15', name='test-team-copy-for-lineage'),
	DataPool(id='8e16eac2-d9db-4f55-b061-f28e1110e58b', name='LineageTest.dataPool 2025-05-04 14:04:03 [St. Bernardus Abt 12]'),
	DataPool(id='9fed379e-98b9-4855-8127-21634200f675', name='LineageTest.dataPool 2025-05-04 14:31:56 [Péché Mortel]'),
	DataPool(id='0847069a-7fdc-49a5-9580-1baf0e92fde2', name='LineageTest.da

In [None]:
dp = c.data_integration.get_data_pool('c840c791-823a-4d34-9571-f39dae182078')
dp.get_data_models()

[
	DataModel(id='4ef2ec4b-aa72-4896-b53c-3d678c9683b5', name='test_tomas', pool_id='c840c791-823a-4d34-9571-f39dae182078'),
	DataModel(id='088283c6-9918-4b59-9a4b-96d9cb38bb15', name='full lineage', pool_id='c840c791-823a-4d34-9571-f39dae182078'),
	DataModel(id='81ce4087-c580-4da2-89f0-a00c1650b792', name='test_lineage', pool_id='c840c791-823a-4d34-9571-f39dae182078'),
	DataModel(id='13483c35-4b6a-4e90-b06c-9230b25e7b02', name='dummy_peopleDM', pool_id='c840c791-823a-4d34-9571-f39dae182078'),
	DataModel(id='80fea3cc-e1ba-4bbf-a29a-da1d2e50557f', name='Test Data Model', pool_id='c840c791-823a-4d34-9571-f39dae182078'),
	DataModel(id='24562740-9c61-4566-93ac-21f23f95a157', name='Data Lineage Minotring', pool_id='c840c791-823a-4d34-9571-f39dae182078')
]

In [None]:
""" monitoring = data_pools.find("Sarvesh Monitoring Pool")


# 1. LINEAGE TABLE (STUDIO)
column_config_studio = [
    ColumnTransport(column_name=c, column_type=ColumnType.STRING, field_length=255) 
    for c in df_combined.columns
]
monitoring.create_table(
    df=df_combined, 
    table_name='lineage_frontend', 
    column_config=column_config_studio, 
    drop_if_exists=True
)
print(" Uploaded frontend lineage table")

# 2. MAPPING NODES
column_config_nodes = [
    ColumnTransport(column_name=c, column_type=ColumnType.STRING, field_length=255) 
    for c in df_mapping_nodes.columns
]
monitoring.create_table(
    df=df_mapping_nodes, 
    table_name='mapping_nodes_frontend', 
    column_config=column_config_nodes, 
    drop_if_exists=True
)
print(" Uploaded frontend mapping nodes table")

# 3. BRIDGE TABLE
column_config_bridge = [
    ColumnTransport(column_name=c, column_type=ColumnType.STRING, field_length=255) 
    for c in df_bridge.columns
]
monitoring.create_table(
    df=df_bridge, 
    table_name='bridge_lineage_mapping', 
    column_config=column_config_bridge, 
    drop_if_exists=True
)
print(" Uploaded bridge table")

# 4. USED REPORT TABLE
column_config_report = [
    ColumnTransport(column_name=c, column_type=ColumnType.STRING, field_length=255) 
    for c in df_final.columns
]
monitoring.create_table(
    df=df_final, 
    table_name='usedtables_report_frontend', 
    column_config=column_config_report, 
    drop_if_exists=True
)
print(" Uploaded used tables report") """

' monitoring = data_pools.find("Sarvesh Monitoring Pool")\n\n\n# 1. LINEAGE TABLE (STUDIO)\ncolumn_config_studio = [\n    ColumnTransport(column_name=c, column_type=ColumnType.STRING, field_length=255) \n    for c in df_combined.columns\n]\nmonitoring.create_table(\n    df=df_combined, \n    table_name=\'lineage_frontend\', \n    column_config=column_config_studio, \n    drop_if_exists=True\n)\nprint(" Uploaded frontend lineage table")\n\n# 2. MAPPING NODES\ncolumn_config_nodes = [\n    ColumnTransport(column_name=c, column_type=ColumnType.STRING, field_length=255) \n    for c in df_mapping_nodes.columns\n]\nmonitoring.create_table(\n    df=df_mapping_nodes, \n    table_name=\'mapping_nodes_frontend\', \n    column_config=column_config_nodes, \n    drop_if_exists=True\n)\nprint(" Uploaded frontend mapping nodes table")\n\n# 3. BRIDGE TABLE\ncolumn_config_bridge = [\n    ColumnTransport(column_name=c, column_type=ColumnType.STRING, field_length=255) \n    for c in df_bridge.columns\n]\n