In [2]:
from typing import List, Optional, Dict
import re
import pandas as pd
import pathlib

from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (OpenMetadataConnection, AuthProvider)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import OpenMetadataJWTClientConfig
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.type.entityLineage import ColumnLineage, EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata

In [3]:
from conf.creds.creds import om_oidc_token
server_config = OpenMetadataConnection(
    hostPort="http://datacatalog.casd.local/api",
    authProvider=AuthProvider.openmetadata,
    securityConfig=OpenMetadataJWTClientConfig(
        jwtToken=om_oidc_token,
    ),
)
metadata = OpenMetadata(server_config)

In [4]:
# if it returns true, it means the connection is success
metadata.health_check()

True

In [5]:
# conf for the table entity
DB_SERVICE_NAME = "Constances-Geography"
DB_NAME = "hospitals_in_france"
SCHEMA_NAME = "Geography"

# conf for lineage file
project_root = pathlib.Path.cwd().parent.parent
metadata_path = project_root / "data" / "om"

In [6]:
# config
lineage_path = f"{metadata_path}/constances_hospital_lineage.csv"
schema_fqn = f"{DB_SERVICE_NAME}.{DB_NAME}.{SCHEMA_NAME}"

# conf for pipeline service
pipeline_service_name="test-service-pipeline"
pipe_fqn_col_name="pipeline_fqn"

code_ref_col_name="ref_code"

In [7]:
lineage_df = pd.read_csv(lineage_path)

In [8]:
lineage_df.head()

Unnamed: 0,origin_tab_name,origin_col_name,destinataire_tab_name,destinataire_col_name,ref_code
0,fr_communes_raw,geometry,fr_communes_clean,geometry,Seminar3_workflow_automation/airflow/dags/02.h...
1,fr_communes_raw,nom,fr_communes_clean,name,Seminar3_workflow_automation/airflow/dags/02.h...
2,fr_communes_raw,insee,fr_communes_clean,insee,Seminar3_workflow_automation/airflow/dags/02.h...
3,osm_france_raw,id,osm_hospitals_clean,id,Seminar3_workflow_automation/airflow/dags/02.h...
4,osm_france_raw,tags,osm_hospitals_clean,tags,Seminar3_workflow_automation/airflow/dags/02.h...


## 1. Create a simple lineage

The most simple lineage is just make a link between two tables(from source table to destination table).

In [9]:
def get_table_by_name(om_conn: OpenMetadata, table_fqn: str) -> Table:
    """
    This function takes a table fqn, then returns a table entity
    :param om_conn: openmetadata connection
    :type om_conn: OpenMetadata
    :param table_fqn: table fully qualified name
    :type table_fqn: str
    :return:
    :rtype:
    """
    return om_conn.get_by_name(entity=Table, fqn=table_fqn)

In [13]:
fr_communes_raw_fqn = f"{schema_fqn}.fr_communes_raw"
fr_communes_raw_om_entity = get_table_by_name(metadata,fr_communes_raw_fqn)
fr_communes_clean_fqn = f"{schema_fqn}.fr_communes_clean"
fr_communes_clean_om_entity = get_table_by_name(metadata,fr_communes_clean_fqn)


In [14]:
print(fr_communes_raw_om_entity.id)
print(fr_communes_clean_om_entity.id)

__root__=UUID('f233a2ef-9022-4860-a43f-acedcae808dc')
__root__=UUID('aa41b05c-993d-4379-8e08-cc1b6b67332a')


In [15]:
add_lineage_req1 = AddLineageRequest(
    edge=EntitiesEdge(
        description="## clean france communes dataset",
        fromEntity=EntityReference(id=fr_communes_raw_om_entity.id, type="table"),
        toEntity=EntityReference(id=fr_communes_clean_om_entity.id, type="table"),
    ),
)
metadata.add_lineage(data=add_lineage_req1)

{'entity': {'id': 'f233a2ef-9022-4860-a43f-acedcae808dc',
  'type': 'table',
  'name': 'fr_communes_raw',
  'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_raw',
  'description': 'This table contains all geographical information of french communes',
  'displayName': 'fr_communes_raw',
  'deleted': False,
  'href': 'http://datacatalog.casd.local/api/v1/tables/f233a2ef-9022-4860-a43f-acedcae808dc'},
 'nodes': [{'id': 'aa41b05c-993d-4379-8e08-cc1b6b67332a',
   'type': 'table',
   'name': 'fr_communes_clean',
   'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_clean',
   'description': 'This table is built based on fr_communes_raw which is suitable for Contances related analysis',
   'displayName': 'fr_communes_clean',
   'deleted': False,
   'href': 'http://datacatalog.casd.local/api/v1/tables/aa41b05c-993d-4379-8e08-cc1b6b67332a'}],
 'upstreamEdges': [],
 'downstreamEdges': [{'fromEntity': 'f233a2ef-9022-4860-a43f

## 2. Create lineage with column details

We have the table level lineage, if we need more information such as the column lineage and code, we need to add more details in the lineage

In [16]:
column_lineage1 = ColumnLineage(
    fromColumns=[f"{fr_communes_raw_fqn}.geometry",
                 ],
    toColumn=f"{fr_communes_clean_fqn}.geometry",
)

column_lineage2 = ColumnLineage(
    fromColumns=[f"{fr_communes_raw_fqn}.nom",
                 ],
    toColumn=f"{fr_communes_clean_fqn}.name",
)

column_lineage3 = ColumnLineage(
    fromColumns=[f"{fr_communes_raw_fqn}.insee",
                 ],
    toColumn=f"{fr_communes_clean_fqn}.insee",
)

query_detail="""
Create TABLE fr_communes_clean AS
    SELECT fr_communes_raw.nom AS name,
           fr_communes_raw.geometry AS geometry,
           fr_communes_raw.insee AS insee,
    FROM fr_communes_raw;
"""

lineage_details = LineageDetails(
    sqlQuery=query_detail,
    columnsLineage=[column_lineage1,column_lineage2,column_lineage3],
)

add_lineage_req2 = AddLineageRequest(
    edge=EntitiesEdge(
        description="## clean france communes dataset",
        fromEntity=EntityReference(id=fr_communes_raw_om_entity.id, type="table"),
        toEntity=EntityReference(id=fr_communes_clean_om_entity.id, type="table"),
        lineageDetails=lineage_details,
    ),
)
metadata.add_lineage(data=add_lineage_req2)


{'entity': {'id': 'f233a2ef-9022-4860-a43f-acedcae808dc',
  'type': 'table',
  'name': 'fr_communes_raw',
  'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_raw',
  'description': 'This table contains all geographical information of french communes',
  'displayName': 'fr_communes_raw',
  'deleted': False,
  'href': 'http://datacatalog.casd.local/api/v1/tables/f233a2ef-9022-4860-a43f-acedcae808dc'},
 'nodes': [{'id': 'aa41b05c-993d-4379-8e08-cc1b6b67332a',
   'type': 'table',
   'name': 'fr_communes_clean',
   'fullyQualifiedName': 'Constances-Geography.hospitals_in_france.Geography.fr_communes_clean',
   'description': 'This table is built based on fr_communes_raw which is suitable for Contances related analysis',
   'displayName': 'fr_communes_clean',
   'deleted': False,
   'href': 'http://datacatalog.casd.local/api/v1/tables/aa41b05c-993d-4379-8e08-cc1b6b67332a'}],
 'upstreamEdges': [],
 'downstreamEdges': [{'fromEntity': 'f233a2ef-9022-4860-a43f

## 3. Add a workflow reference

If the data transformation is done a `workflow automation tool`(e.g. airflow, etc.). We can as well ingest the `reference of the data pipeline` used to create the data lineage (e.g., the ETL feeding the tables) into OM.

To prepare this example, we need to start by creating the Pipeline Entity. As usual, we have the entity hierarchy, a pipeline must be inside a pipeline service. So we'll need to prepare the Pipeline Service first.

In [None]:
import ast

def get_dag_info(file_path: str) -> str:
    """
    This function read an airflow dag file, and return the dag id. If not valid, return None
    :param file_path: the input dag file
    :type file_path: str
    :return:
    :rtype:
    """
    # Initialize variables to store DAG ID
    dag_id = None
    with open(file_path, 'r') as file:
        file_content = file.read()
        # Parse the file content into an AST
        tree = ast.parse(file_content)
        # Traverse the AST
        for node in ast.walk(tree):
            if isinstance(node, ast.Call) and hasattr(node.func, 'id') and node.func.id == 'DAG':
                for keyword in node.keywords:
                    if keyword.arg == 'dag_id':
                        dag_id = keyword.value.s
    if dag_id is None:
        raise ValueError('DAG id is not found')
    return dag_id

In [None]:
def generate_pipeline_params(code_ref: str) -> Dict:
    resu = None
    if code_ref:
        dag_id = "constance-hospital-count"
        dag_description = "this pipeline transform the snds raw data into constance simplyfy table"
        resu = {"name": dag_id,
                "description": dag_description,
                "air_url": f"http://airflow.casd.local:8080/dags/{dag_id}/grid",
                "pipeline_loc": "/opt/airflow/dags/airflow_metadata_extraction.py"}
    return resu


def build_query_pipeline(om_conn: OpenMetadata, code_ref: str, pipeline_service_fqn: str) -> Pipeline:
    """
    This function takes the code ref, build a pipeline with the content of the ref
    :param om_conn: Open metadata server connexion
    :type om_conn:
    :param code_ref: A fqn of the code reference(e.g. file path, url)
    :type code_ref: str
    :param pipeline_service_fqn: The target pipeline service
    :type pipeline_service_fqn: str
    :return: the generated pipeline
    :rtype:
    """
    pipeline_service = get_pipeline_service_by_name(om_conn, pipeline_service_fqn)
    pipeline_params = generate_pipeline_params(code_ref)
    create_pipeline = CreatePipelineRequest(
        # pipeline name generated from the code reference
        name=pipeline_params["name"],
        # pipeline description generated from the code reference
        description=pipeline_params["description"],
        sourceUrl=pipeline_params["air_url"],
        concurrency=5,
        pipelineLocation=pipeline_params["pipeline_loc"],
        service=pipeline_service.fullyQualifiedName, )
    pipeline_entity = om_conn.create_or_update(data=create_pipeline)
    return pipeline_entity


def build_column_lineage(lineage_df: pd.DataFrame, source_tab_fqn: str, dest_tab_fqn: str) -> List[ColumnLineage]:
    """
    This function takes a filtered lineage dataframe which only contains one pair of source and dest table, it builds
    all corresponding column lineage of the given pair. If no column lineage provided, return an empty list
    :param lineage_df: A filtered table/column lineage dataframe
    :type lineage_df: pd.Dataframe
    :param source_tab_fqn:
    :type source_tab_fqn:
    :param dest_tab_fqn:
    :type dest_tab_fqn:
    :return:
    :rtype:
    """
    col_lineage_list = []
    # group by the dest col name, and collect all linked source col in a list
    dest_source_map = lineage_df.groupby(DEST_COL_NAME)[SRC_COL_NAME].agg(lambda x: list(x.unique())).reset_index()
    # convert the dataframe to a list of dict
    dest_source_map_list = dest_source_map.to_dict(orient="records")
    if dest_source_map_list:
        # loop the list, for each row build a column lineage
        for row in dest_source_map_list:
            # complete the column fqn with the table fqn
            target_col = f"{dest_tab_fqn}.{row[DEST_COL_NAME]}"
            source_cols = [f"{source_tab_fqn}.{col_name}" for col_name in row[SRC_COL_NAME]]
            column_lineage = ColumnLineage(
                fromColumns=source_cols,
                toColumn=target_col)
            col_lineage_list.append(column_lineage)
    return col_lineage_list


def find_pipeline_fqn_by_dag(dag_file: str, om_conn: OpenMetadata):
    """
    This function read a dag file and extract the dag id, it checks if this dag
    has a mapping pipeline entity inside OM server or not. If it exists, return
    the corresponding pipeline entity fqn, if not raise value error
    :param om_conn:
    :type om_conn:
    :param dag_file:
    :type dag_file:
    :return:
    :rtype:
    """
    if dag_file is None or dag_file == '':
        return None
    try:
        dag_id = get_dag_info(dag_file)
    except FileNotFoundError:
        print(f'DAG file {dag_file} not found')
        raise
    except ValueError:
        print(f'DAG file {dag_file} has invalid format')
        raise
    pipeline_fqn = f"{PIPELINE_SERVICE_NAME}.{dag_id}"
    # the type(pipeline_entity.id) returns
    # metadata.generated.schema.type.basic.Uuid.
    # so we can't store it in a dataframe
    pipeline_entity = get_pipeline_entity_by_fqn(om_conn, pipeline_fqn)
    if pipeline_entity:
        print(f"find the pipeline {pipeline_entity.id}")
        return pipeline_fqn
    else:
        raise ValueError(
            f'Can not find the pipeline {pipeline_fqn} in the OM server. Load the dag file to airflow first please')

In [None]:
from metadata.generated.schema.type.entityLineage import ColumnLineage

source_col_name="origin_col_name"
dest_col_name="destinataire_col_name"

def build_column_lineage(lineage_df,source_tab_fqn,dest_tab_fqn):
    col_lineage_list=[]
    # group by the dest col name, and collect all linked source col in a list
    dest_source_map = lineage_df.groupby(dest_col_name)[source_col_name].agg(lambda x: list(x.unique())).reset_index()
    # convert the dataframe to a list of dict
    dest_source_map_list = dest_source_map.to_dict(orient="records")
    if dest_source_map_list:
        # loop the list, for each row build a column lineage
        for row in dest_source_map_list:
            target_col=f"{dest_tab_fqn}.{row[dest_col_name]}"
            # add
            source_cols=[f"{source_tab_fqn}.{col_name}" for col_name in row[source_col_name]]
            print(target_col)
            print(source_cols)

            column_lineage = ColumnLineage(
            fromColumns=source_cols,
            toColumn=target_col)
            col_lineage_list.append(column_lineage)
    return col_lineage_list