In [1]:
from typing import List, Optional, Dict

import pandas as pd
import pathlib

from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (OpenMetadataConnection, AuthProvider)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import OpenMetadataJWTClientConfig
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.type.entityLineage import ColumnLineage, EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata

In [2]:
# you need to modify this value to match your target open metadata server url
target_om_server = "http://om-dev.casd.local/api"

In [3]:
from conf.creds import om_oidc_token
server_config = OpenMetadataConnection(
    hostPort=target_om_server,
    authProvider=AuthProvider.openmetadata,
    securityConfig=OpenMetadataJWTClientConfig(
        jwtToken=om_oidc_token,
    ),
)
metadata = OpenMetadata(server_config)

In [4]:
# if it returns true, it means the connection is success
metadata.health_check()

True

In [5]:
# conf for the table entity
DB_SERVICE_NAME = "Constances-Geography"
DB_NAME = "hospitals_in_france"
SCHEMA_NAME = "Geography"

# conf for lineage file
project_root = pathlib.Path.cwd().parent
metadata_path = project_root / "data"

In [6]:
print(metadata_path)

C:\Users\PLIU\Documents\git\Seminare_data_catalog\data


In [7]:
# config
lineage_path = f"{metadata_path}/constances_hospital_lineage.csv"
schema_fqn = f"{DB_SERVICE_NAME}.{DB_NAME}.{SCHEMA_NAME}"

# conf for pipeline service
pipeline_service_name="test-service-pipeline"
pipe_fqn_col_name="pipeline_fqn"

code_ref_col_name="ref_code"

## 1. Create a simple lineage

The most simple lineage is just make a link between two tables(from source table to destination table).

In [8]:
def get_table_by_name(om_conn: OpenMetadata, table_fqn: str) -> Table:
    """
    This function takes a table fqn, then returns a table entity
    :param om_conn: openmetadata connection
    :type om_conn: OpenMetadata
    :param table_fqn: table fully qualified name
    :type table_fqn: str
    :return:
    :rtype:
    """
    return om_conn.get_by_name(entity=Table, fqn=table_fqn)

In [9]:
fr_communes_raw_fqn = f"{schema_fqn}.fr_communes_raw"
fr_communes_raw_om_entity = get_table_by_name(metadata,fr_communes_raw_fqn)
fr_communes_clean_fqn = f"{schema_fqn}.fr_communes_clean"
fr_communes_clean_om_entity = get_table_by_name(metadata,fr_communes_clean_fqn)


In [10]:
print(fr_communes_raw_om_entity.id)
print(fr_communes_clean_om_entity.id)

root=UUID('46985e51-8a9f-4a85-a85b-5449b902053c')
root=UUID('2429bf42-dbaf-43a3-97e0-6f33ab5875d6')


In [11]:
add_lineage_req1 = AddLineageRequest(
    edge=EntitiesEdge(
        description="## clean france communes dataset",
        fromEntity=EntityReference(id=fr_communes_raw_om_entity.id, type="table"),
        toEntity=EntityReference(id=fr_communes_clean_om_entity.id, type="table"),
    ),
)
metadata.add_lineage(data=add_lineage_req1)

{'entity': {'id': '46985e51-8a9f-4a85-a85b-5449b902053c',
  'type': 'table',
  'name': 'fr_communes_raw',
  'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_raw',
  'description': 'This table contains all geographical information of french communes',
  'displayName': 'fr_communes_raw',
  'deleted': False,
  'href': 'http://localhost:8585/v1/tables/46985e51-8a9f-4a85-a85b-5449b902053c'},
 'nodes': [{'id': '2429bf42-dbaf-43a3-97e0-6f33ab5875d6',
   'type': 'table',
   'name': 'fr_communes_clean',
   'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_clean',
   'description': 'This table is built based on fr_communes_raw which is suitable for Contances related analysis',
   'displayName': 'fr_communes_clean',
   'deleted': False,
   'href': 'http://localhost:8585/v1/tables/2429bf42-dbaf-43a3-97e0-6f33ab5875d6'}],
 'upstreamEdges': [],
 'downstreamEdges': [{'fromEntity': '46985e51-8a9f-4a85-a85b-5449b902053c',
   'toEn

## 2. Create lineage with column details

We have the table level lineage, if we need more information such as the column lineage and code, we need to add more details in the lineage

In [12]:
column_lineage1 = ColumnLineage(
    fromColumns=[f"{fr_communes_raw_fqn}.geometry",
                 ],
    toColumn=f"{fr_communes_clean_fqn}.geometry",
)

column_lineage2 = ColumnLineage(
    fromColumns=[f"{fr_communes_raw_fqn}.nom",
                 ],
    toColumn=f"{fr_communes_clean_fqn}.name",
)

column_lineage3 = ColumnLineage(
    fromColumns=[f"{fr_communes_raw_fqn}.insee",
                 ],
    toColumn=f"{fr_communes_clean_fqn}.insee",
)

query_detail="""
Create TABLE fr_communes_clean AS
    SELECT fr_communes_raw.nom AS name,
           fr_communes_raw.geometry AS geometry,
           fr_communes_raw.insee AS insee,
    FROM fr_communes_raw;
"""

lineage_details = LineageDetails(
    sqlQuery=query_detail,
    columnsLineage=[column_lineage1,column_lineage2,column_lineage3],
)

add_lineage_req2 = AddLineageRequest(
    edge=EntitiesEdge(
        description="clean france communes dataset",
        fromEntity=EntityReference(id=fr_communes_raw_om_entity.id, type="table"),
        toEntity=EntityReference(id=fr_communes_clean_om_entity.id, type="table"),
        lineageDetails=lineage_details,
    ),
)
metadata.add_lineage(data=add_lineage_req2)


{'entity': {'id': '46985e51-8a9f-4a85-a85b-5449b902053c',
  'type': 'table',
  'name': 'fr_communes_raw',
  'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_raw',
  'description': 'This table contains all geographical information of french communes',
  'displayName': 'fr_communes_raw',
  'deleted': False,
  'href': 'http://localhost:8585/v1/tables/46985e51-8a9f-4a85-a85b-5449b902053c'},
 'nodes': [{'id': '2429bf42-dbaf-43a3-97e0-6f33ab5875d6',
   'type': 'table',
   'name': 'fr_communes_clean',
   'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_clean',
   'description': 'This table is built based on fr_communes_raw which is suitable for Contances related analysis',
   'displayName': 'fr_communes_clean',
   'deleted': False,
   'href': 'http://localhost:8585/v1/tables/2429bf42-dbaf-43a3-97e0-6f33ab5875d6'}],
 'upstreamEdges': [],
 'downstreamEdges': [{'fromEntity': '46985e51-8a9f-4a85-a85b-5449b902053c',
   'toEn

## 3. Add a workflow reference

If the data transformation is done via a `workflow automation tool`(e.g. airflow, etc.). We can as well ingest the `reference of the data pipeline` used to create the data lineage (e.g., the ETL feeding the tables) into OM.

To prepare this example, we need to start by creating the Pipeline Entity. As usual, we have the entity hierarchy, a pipeline must be inside a pipeline service. So we'll need to prepare the Pipeline Service first.

In [16]:
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.services.createPipelineService import (
    CreatePipelineServiceRequest,
)
from metadata.generated.schema.entity.services.pipelineService import (
    PipelineConnection,
    PipelineService,
    PipelineServiceType,
    airflowConnection
)

from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
    BackendConnection,
)

pipeline_service = CreatePipelineServiceRequest(
    name="constances_workflow_manager",
    serviceType=PipelineServiceType.Airflow,
    connection=PipelineConnection(
        config=airflowConnection.AirflowConnection(
            hostPort="http://airflow.casd.local:8080",
            connection=BackendConnection(),
        ),
    ),
)

pipeline_service_entity = metadata.create_or_update(data=pipeline_service)


In [17]:
print(pipeline_service_entity.fullyQualifiedName.__root__)

constances_workflow_manager


In [18]:
create_pipeline = CreatePipelineRequest(
    name="count_hospital_per_commune",
    description="This data pipeline clean raw data and count hospitals of each commune in France",
    sourceUrl=f"http://airflow.casd.local:8080/dags/hospital_count_dag/grid",
    concurrency=5,
    pipelineLocation="https://github.com/CASD-EU/ConstanceDataPlatform/blob/main/Seminar3_workflow_automation/airflow/dags/02.hopital_count_dags.py",
    service=pipeline_service_entity.fullyQualifiedName.__root__,
)

pipeline_entity = metadata.create_or_update(data=create_pipeline)

In [19]:
# now we need to add the data pipeline in the
lineage_details_with_pipeline = LineageDetails(
    sqlQuery=query_detail,
    columnsLineage=[column_lineage1,column_lineage2,column_lineage3],
    pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)

add_lineage_req3 = AddLineageRequest(
    edge=EntitiesEdge(
        description="## clean france communes dataset",
        fromEntity=EntityReference(id=fr_communes_raw_om_entity.id, type="table"),
        toEntity=EntityReference(id=fr_communes_clean_om_entity.id, type="table"),
        lineageDetails=lineage_details_with_pipeline,
    ),
)
metadata.add_lineage(data=add_lineage_req3)

{'entity': {'id': 'f233a2ef-9022-4860-a43f-acedcae808dc',
  'type': 'table',
  'name': 'fr_communes_raw',
  'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_raw',
  'description': 'This table contains all geographical information of french communes',
  'displayName': 'fr_communes_raw',
  'deleted': False,
  'href': 'http://datacatalog.casd.local/api/v1/tables/f233a2ef-9022-4860-a43f-acedcae808dc'},
 'nodes': [{'id': 'aa41b05c-993d-4379-8e08-cc1b6b67332a',
   'type': 'table',
   'name': 'fr_communes_clean',
   'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_clean',
   'description': 'This table is built based on fr_communes_raw which is suitable for Contances related analysis',
   'displayName': 'fr_communes_clean',
   'deleted': False,
   'href': 'http://datacatalog.casd.local/api/v1/tables/aa41b05c-993d-4379-8e08-cc1b6b67332a'}],
 'upstreamEdges': [],
 'downstreamEdges': [{'fromEntity': 'f233a2ef-9022-4860-a43f

In [None]:
## 3.1 Link a workflow reference

# 4 Automate the lineage ingestion

To avoid write duplicate code, we can put all the lineage metadata in a configuration file which has the following schema:
|src_tab_name|src_col_name|dest_tab_name|dest_col_name|ref_code|
|---------------|---------------|-------------|-------------|--------|
|the name of the source table|The name of the source column|The name of the destination table|The name of the destination column|reference code of the data transformation|

Below code illustrate the automation of the lineage ingestion by using a configuration file called `constances_hospital_lineage.csv`


In [13]:
# 2. read lineage file as dataframe
lineage_df = pd.read_csv(lineage_path)
lineage_df.head()

Unnamed: 0,src_tab_name,src_col_name,dest_tab_name,dest_col_name,ref_code
0,fr_communes_raw,geometry,fr_communes_clean,geometry,Seminar3_workflow_automation/airflow/dags/02.h...
1,fr_communes_raw,nom,fr_communes_clean,name,Seminar3_workflow_automation/airflow/dags/02.h...
2,fr_communes_raw,insee,fr_communes_clean,insee,Seminar3_workflow_automation/airflow/dags/02.h...
3,osm_france_raw,id,osm_hospitals_clean,id,Seminar3_workflow_automation/airflow/dags/02.h...
4,osm_france_raw,tags,osm_hospitals_clean,tags,Seminar3_workflow_automation/airflow/dags/02.h...


In [14]:
# config for the lineage data frame colum name
SRC_TAB_COL = "src_tab_name"
SRC_COL_NAME = "src_col_name"
DEST_TAB_COL = "dest_tab_name"
DEST_COL_NAME = "dest_col_name"


In [15]:
import re


# utils functions
def build_column_lineage(lineage_df: pd.DataFrame, source_tab_fqn: str, dest_tab_fqn: str) -> List[ColumnLineage]:
    """
    This function takes a filtered lineage dataframe which only contains one pair of source and dest table, it builds
    all corresponding column lineage of the given pair. If no column lineage provided, return an empty list
    :param lineage_df: A filtered table/column lineage dataframe
    :type lineage_df: pd.Dataframe
    :param source_tab_fqn:
    :type source_tab_fqn:
    :param dest_tab_fqn:
    :type dest_tab_fqn:
    :return:
    :rtype:
    """
    col_lineage_list = []
    # group by the dest col name, and collect all linked source col in a list
    dest_source_map = lineage_df.groupby(DEST_COL_NAME)[SRC_COL_NAME].agg(lambda x: list(x.unique())).reset_index()
    # convert the dataframe to a list of dict
    dest_source_map_list = dest_source_map.to_dict(orient="records")
    if dest_source_map_list:
        # loop the list, for each row build a column lineage
        for row in dest_source_map_list:
            # complete the column fqn with the table fqn
            target_col = f"{dest_tab_fqn}.{row[DEST_COL_NAME]}"
            source_cols = [f"{source_tab_fqn}.{col_name}" for col_name in row[SRC_COL_NAME]]
            column_lineage = ColumnLineage(
                fromColumns=source_cols,
                toColumn=target_col)
            col_lineage_list.append(column_lineage)
    return col_lineage_list

def get_pipeline_entity_by_fqn(om_conn: OpenMetadata, pipeline_fqn: str):
    """
    This function takes a open metadata connexion and a data pipeline fqn, it returns the data pipeline entity
    :param om_conn:
    :param pipeline_fqn:
    :return:
    """
    return om_conn.get_by_name(entity=Pipeline, fqn=pipeline_fqn)

def get_query_summary(code_ref_file_path: str) -> Optional[str]:
    """
    This function takes the code ref(e.g. a file path, or an url), get the summary of the script which transforms the table
    :param code_ref_file_path:
    :type code_ref_file_path: str
    :return:
    :rtype:
    """
    # Regular expression to find variables like query1, query2, query3, etc.
    query_pattern = re.compile(r'query\d+\s*=\s*[\'"](.+?)[\'"]', re.DOTALL)
    query_summary = ""
    if code_ref_file_path:
        with open(code_ref_file_path, "r") as file:
            file_content = file.read()

        queries = query_pattern.findall(file_content)
        for query in queries:
            query_summary = f"{query_summary}; \n {query}"
    return query_summary

In [16]:
# 3. get all source, dest table pair
source_dest_tabs = lineage_df[[SRC_TAB_COL, DEST_TAB_COL]].drop_duplicates().to_dict(orient="records")

In [None]:
# 4. for each table lineage, we create a lineage entity
    # a lineage entity contains three parts:
    #    - source table entity
    #    - dest table entity
    #    - lineage details
for index, row in enumerate(source_dest_tabs):
    # 4.1 get source and dest table entity
    source_tab_name = row[SRC_TAB_COL]
    dest_tab_name = row[DEST_TAB_COL]
    source_tab_fqn = f"{schema_fqn}.{source_tab_name}"
    dest_tab_fqn = f"{schema_fqn}.{dest_tab_name}"
    source_tab_entity = get_table_by_name(metadata, source_tab_fqn)
    dest_tab_entity = get_table_by_name(metadata, dest_tab_fqn)
    # 4.2 test if table exist or not, if not exist, log error and continue to the next table lineage pair
    if source_tab_entity:
        print(f"find the source table {source_tab_fqn} in OM server")
    else:
        print(f"can't find table {source_tab_name} in schema {SCHEMA_NAME}")
        continue
    if dest_tab_entity:
        print(f"find the dest table {dest_tab_fqn} in OM server")
    else:
        print(f"can't find table {dest_tab_name} in schema {SCHEMA_NAME}")
        continue
    # 4.3 if two tables exist, for each pair create a new add lineage request
    print(f"build table lineage from {source_tab_fqn} to {dest_tab_fqn}. current index: {index}")

    # 4.4 Add simple lineage without column details
    lineage_details = None
add_pipeline_lineage_request = AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=source_tab_entity.id, type="table"),
        toEntity=EntityReference(id=dest_tab_entity.id, type="table"),
        lineageDetails=lineage_details,
    ), )

lineage_entity = metadata.add_lineage(data=add_pipeline_lineage_request)
if lineage_entity:
    print(f"table lineage {lineage_entity} is created or updated successfully")
else:
    print(f"can't create table lineage from {source_tab_fqn} to {dest_tab_fqn}")
