## Dataset Loading
We will load the CSV into Graph Database as Nodes and Relationships

In [1]:
#!pip install py2neo pandas matplotlib sklearn IPython

In [2]:
from py2neo import Graph
import pandas as pd
from IPython import display

graph = Graph("bolt://localhost:7687", auth=("neo4j", "123"))

#### Make sure these 4 files are moved into the Neo4j Folder 
(steps please revert to the SOP)
- graph_input_sp.csv
- graph_input_sp2.csv
- graph_input_table.csv
- graph_input_vw.csv

## Create Constraints

In [3]:
#This line will replace the DROP the existing database, use with care

graph.run("CREATE OR REPLACE DATABASE neo4j").stats()

graph.run("CREATE CONSTRAINT ON (s:SP) ASSERT s.SP_ID is unique").stats()
graph.run("CREATE CONSTRAINT ON (t:TABLE) ASSERT t.TABLE_ID is unique").stats()
graph.run("CREATE CONSTRAINT ON (v:VIEW) ASSERT v.VW_ID is unique").stats()

{'constraints_added': 1}

## Loading the data


In [4]:
query = """
//Loading the SP Node
CALL apoc.periodic.iterate(
    "CALL apoc.load.csv('file:///graph_input_sp.csv') YIELD map AS row RETURN row",
    "WITH 
        row.SP_NAME AS SP_NAME,
        row.SP_SCHEMA AS SP_SCHEMA,
        row.SP_TYPE AS SP_TYPE,
        row.SP_INDEX AS SP_INDEX,
        COALESCE(row.SP_SCHEMA ,'') + '.' + COALESCE(row.SP_NAME ,'') AS SP_ID,
        row.EXCLUSION AS EXCLUSION,
    //    row.SYNTAX as SYNTAX,
        toInteger(row.SYNTAX_WORDCOUNT) as SYNTAX_WORDCOUNT

    MERGE (s:SP {SP_ID : SP_ID})
    ON CREATE SET 
        s.SP_NAME = SP_NAME,
        s.SP_SCHEMA = SP_SCHEMA,
        s.SP_TYPE = SP_TYPE,
        s.SP_INDEX = SP_INDEX,
        s.EXCLUSION = EXCLUSION,
    //    s.SYNTAX = SYNTAX,
        s.SYNTAX_WORDCOUNT = SYNTAX_WORDCOUNT
        ",
    {batchSize: 500}
);


"""
graph.run(query).to_data_frame()


Unnamed: 0,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,wasTerminated,failedParams,updateStatistics
0,26,12528,0,12528,0,0,0,{},"{'total': 26, 'committed': 26, 'failed': 0, 'e...","{'total': 12528, 'committed': 12528, 'failed':...",False,{},"{'nodesDeleted': 0, 'labelsAdded': 1230, 'rela..."


In [5]:
query = """
//Loading the Table Node (Using Table Master File)
CALL apoc.periodic.iterate(
    "CALL apoc.load.csv('file:///graph_input_table.csv') YIELD map AS row RETURN row",
    "WITH 
        toUpper(row.TABLE_SCHEMA) AS TABLE_SCHEMA,
        toUpper(row.TABLE_NAME) AS TABLE_NAME,
        toUpper(COALESCE(row.TABLE_SCHEMA ,'') + '.' + COALESCE(row.TABLE_NAME ,'')) AS TABLE_ID

    MERGE (t:TABLE {TABLE_ID : TABLE_ID})
    ON CREATE SET 
        t.TABLE_SCHEMA  = TABLE_SCHEMA,
        t.TABLE_NAME = TABLE_NAME
        ",
    {batchSize: 500}
);

"""
graph.run(query).to_data_frame()


Unnamed: 0,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,wasTerminated,failedParams,updateStatistics
0,7,3320,0,3320,0,0,0,{},"{'total': 7, 'committed': 7, 'failed': 0, 'err...","{'total': 3320, 'committed': 3320, 'failed': 0...",False,{},"{'nodesDeleted': 0, 'labelsAdded': 3320, 'rela..."


In [6]:
query = """
//Loading the View Node
CALL apoc.periodic.iterate(
    "CALL apoc.load.csv('file:///graph_input_vw.csv') YIELD map AS row RETURN row",
    "WITH 
        COLLECT({
        VW_SCHEMA : row.VW_SCHEMA,
        VW_NAME : row.VW_NAME,
        VW_ID : COALESCE(row.VW_SCHEMA ,'') + '.' + COALESCE(row.VW_NAME,'') ,
      //  SYNTAX : row.SYNTAX,
        SYNTAX_WORDCOUNT : row.SYNTAX_WORDCOUNT 
        }) as COLLECTION

UNWIND COLLECTION as COLLECT
    MERGE (v:VIEW {VW_ID : COLLECT.VW_ID})
    ON CREATE SET 
        v.NAME  = COLLECT.VW_NAME,
        v.TABLE_SCHEMA = COLLECT.VW_SCHEMA,
        v.TYPE = 'VIEW',
    //    v.SYNTAX = COLLECT.SYNTAX,
        v.SYNTAX_WORDCOUNT = COLLECT.SYNTAX_WORDCOUNT
        ",
    {batchSize: 500}
);

"""
graph.run(query).to_data_frame()


query = """
//Loading the VIEW-TABLE relationship

CALL apoc.periodic.iterate(
    "CALL apoc.load.csv('file:///graph_input_vw.csv') YIELD map AS row RETURN row",
    "WITH 
        COLLECT({
            VW_ID : COALESCE(row.VW_SCHEMA ,'') + '.' + COALESCE(row.VW_NAME,'') ,
            TABLE_ID : COALESCE(row.TABLE_SCHEMA ,'') + '.' + COALESCE(row.TABLE_NAME,'')
        }) as COLLECTION

UNWIND COLLECTION as COLLECT
        MATCH (v:VIEW {VW_ID : COLLECT.VW_ID})
        MATCH (t:TABLE {TABLE_ID: COLLECT.TABLE_ID})
        MERGE (v)-[k:RELY_ON]->(t)
            SET k.REGEX_TYPE = 'VIEW'
            SET k.TYPE = 'DEPENDS_ON'        
        ",
    {batchSize: 10000}
);
"""

graph.run(query).to_data_frame()




Unnamed: 0,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,wasTerminated,failedParams,updateStatistics
0,1,1018,0,1018,0,0,0,{},"{'total': 1, 'committed': 1, 'failed': 0, 'err...","{'total': 1018, 'committed': 1018, 'failed': 0...",False,{},"{'nodesDeleted': 0, 'labelsAdded': 0, 'relatio..."


In [7]:
query = """
//Loading the SP-VIEW relationship


CALL apoc.periodic.iterate(
    "CALL apoc.load.csv('file:///graph_input_sp.csv') YIELD map AS row RETURN row",
    "WITH 
        COLLECT({
            SP_ID : COALESCE(row.SP_SCHEMA ,'') + '.' + COALESCE(row.SP_NAME ,'') ,
            VW_ID : row.TABLE_FULL_NAME ,
            REGEX_TYPE:  row.REGEX_TYPE
        }) as COLLECTION

UNWIND COLLECTION as COLLECT
        MATCH (p:SP {SP_ID : COLLECT.SP_ID})
        MATCH (v:VIEW {VW_ID: COLLECT.VW_ID})
        MERGE (p)-[k:RELY_ON]->(v)
            SET k.REGEX_TYPE = COLLECT.REGEX_TYPE
            SET k.TYPE = 'DEPENDS_ON'        
        ",
    {batchSize: 10000}
);
"""

graph.run(query).to_data_frame()


Unnamed: 0,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,wasTerminated,failedParams,updateStatistics
0,2,12528,0,12528,0,0,0,{},"{'total': 2, 'committed': 2, 'failed': 0, 'err...","{'total': 12528, 'committed': 12528, 'failed':...",False,{},"{'nodesDeleted': 0, 'labelsAdded': 0, 'relatio..."


In [8]:

#Now link back the SP-View-Table relationship
query = """
MATCH (s:SP)-[k:RELY_ON]-(v:VIEW)-[k2:RELY_ON]->(t:TABLE)
WITH s, t
//point1 CREATE->MERGE
MERGE (s)-[r:RELY_ON]-(t)
    ON CREATE
         SET r.REGEX_TYPE = 'VIEW',
             r.TYPE = 'DEPENDS_ON'
"""

graph.run(query).to_data_frame()

In [9]:
query = """
//Loading the Table/View with SP relationship (DEPENDS_ON)
CALL apoc.periodic.iterate(
    "CALL apoc.load.csv('file:///graph_input_sp.csv') YIELD map AS row RETURN row",
    "WITH 
        COLLECT({
            SP_ID : COALESCE(row.SP_SCHEMA ,'') + '.' + COALESCE(row.SP_NAME ,'') ,
            TABLE_ID : row.TABLE_FULL_NAME,
            REGEX_TYPE:  row.REGEX_TYPE
        }) as COLLECTION

UNWIND COLLECTION as COLLECT
CALL apoc.do.when(COLLECT.REGEX_TYPE = 'SELECT_FROM',
        'MATCH (p:SP {SP_ID : COLLECT.SP_ID})
        MATCH (t:TABLE {TABLE_ID: COLLECT.TABLE_ID})
        MERGE (p)-[k:SP_SELECT_FROM]->(t)',
            '', {COLLECT:COLLECT}) YIELD value AS case1

WITH COLLECT
CALL apoc.do.when(COLLECT.REGEX_TYPE = 'INSERT_UPDATE',
        'MATCH (p:SP {SP_ID : COLLECT.SP_ID})
        MATCH (t:TABLE {TABLE_ID: COLLECT.TABLE_ID})
        MERGE (p)-[k:SP_INSERT_UPDATE]->(t)',
            '', {COLLECT:COLLECT}) YIELD value AS case2

    Return *
        ",
    {batchSize: 10000}
);

"""

graph.run(query).to_data_frame()


Unnamed: 0,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,wasTerminated,failedParams,updateStatistics
0,2,12528,4,12528,0,0,0,{},"{'total': 2, 'committed': 2, 'failed': 0, 'err...","{'total': 12528, 'committed': 12528, 'failed':...",False,{},"{'nodesDeleted': 0, 'labelsAdded': 0, 'relatio..."


In [10]:
query = """
//Loading the SP-SP relationship
CALL apoc.periodic.iterate(
    "CALL apoc.load.csv('file:///graph_input_sp.csv') YIELD map AS row RETURN row",
    "WITH 
        COLLECT({
            SP_ID: COALESCE(row.SP_SCHEMA ,'') + '.' + COALESCE(row.SP_NAME,''),
            TABLE_ID: row.TABLE_FULL_NAME,
            REGEX_TYPE:  row.REGEX_TYPE
        }) as COLLECTION

UNWIND COLLECTION as COLLECT

        MATCH (s1:SP {SP_ID : COLLECT.SP_ID})
        MATCH (s2:SP {SP_ID: COLLECT.TABLE_ID})

        with s1, s2, COLLECT
        CALL apoc.do.when(COLLECT.REGEX_TYPE = 'EXEC',
            'MERGE (s1)-[k:CALL_SP]->(s2)',
            '', {s1:s1, s2:s2}) YIELD value AS case1
            
            RETURN count(*)
        ",
    {batchSize: 10000}
);

"""

graph.run(query).to_data_frame()

Unnamed: 0,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,wasTerminated,failedParams,updateStatistics
0,2,12528,0,12528,0,0,0,{},"{'total': 2, 'committed': 2, 'failed': 0, 'err...","{'total': 12528, 'committed': 12528, 'failed':...",False,{},"{'nodesDeleted': 0, 'labelsAdded': 0, 'relatio..."


### Data Manipulation
- Utility Tables are excluded 
- Summary Tables are excluded

In [11]:
#The AUDIT table is excluded from the calculation
exploratory_query = """
MATCH (t:TABLE)
where t.TABLE_ID = 'PREPARE.AUDIT_INGESTION_SUMMARY' or t.TABLE_ID contains 'UTIL.'
SET t.EXCLUSION = 1
"""

graph.run(exploratory_query).to_data_frame()

exploratory_query = """
MATCH (t:TABLE)
where t.EXCLUSION is null
SET t.EXCLUSION = 0
"""
graph.run(exploratory_query).to_data_frame()

exploratory_query = """
MATCH (s:SP)
where s.EXCLUSION = 'True'
SET s.EXCLUSION = 1
"""
graph.run(exploratory_query).to_data_frame()

exploratory_query = """
MATCH (s:SP)
where s.EXCLUSION = 'False'
SET s.EXCLUSION = 0
"""

graph.run(exploratory_query).to_data_frame()

#### We will fill up direct/indirect linked SP-Table relationships with a generic "RELY_ON"

In [12]:
#If a SP calls another SP, Then the SP should have the RELY_ON relationship for the called SP as well
query = """
    MATCH (s:SP)-[:SP_INSERT_UPDATE|SP_SELECT_FROM]->(t:TABLE)
        WITH 
            s, t
        
        MERGE(s)-[k:RELY_ON]->(t)
            
"""

graph.run(query).to_data_frame()

In [13]:
#If a SP calls another SP, Then the SP should have the RELY_ON relationship for the called SP as well
query = """
    MATCH (s:SP)-[:CALL_SP]->(s2:SP)-[:RELY_ON]->(t:TABLE)
        WITH 
            s, t
        
        MERGE(s)-[k:RELY_ON]->(t)
            SET k.REGEX_TYPE = 'SP_CALL_SP'
            SET k.TYPE = 'DEPENDS_ON'
            
"""

graph.run(query).to_data_frame()

In [14]:
graph.run("CREATE INDEX ON :TABLE(SKIP_FLAG)").stats()
graph.run("CREATE INDEX ON :SP(wcc_group)").stats()

{'indexes_added': 1}

## To check the schema within our database:

The nodes we have:

In [15]:
print("Nodes:")

result = {"label": [], "count": []}
for label in graph.run("CALL db.labels()").to_series():
    query = f"MATCH (:`{label}`) RETURN count(*) as count"
    count = graph.run(query).to_data_frame().iloc[0]['count']
    result["label"].append(label)
    result["count"].append(count)
nodes_df = pd.DataFrame(data=result)
nodes_df.sort_values("count")

Nodes:


Unnamed: 0,label,count
2,VIEW,557
0,SP,1230
1,TABLE,3320


The relationships we have:

In [16]:
print("Relationships:")

result = {"relType": [], "count": []}
for relationship_type in graph.run("CALL db.relationshipTypes()").to_series():
    query = f"MATCH ()-[:`{relationship_type}`]->() RETURN count(*) as count"
    count = graph.run(query).to_data_frame().iloc[0]['count']
    result["relType"].append(relationship_type)
    result["count"].append(count)
rels_df = pd.DataFrame(data=result)
rels_df.sort_values("count")

Relationships:


Unnamed: 0,relType,count
3,CALL_SP,121
1,SP_INSERT_UPDATE,1820
2,SP_SELECT_FROM,4380
0,RELY_ON,7182


### Export Lineage data into Excel
- Moving between layers within Synapse
- Prepare -> Analyse -> Publish


In [17]:
#This codes will grab all the TAbles from different tables and find the full lineage between different layers

#Start from Prepare

query = """
match r1=(t1:TABLE)
where t1.TABLE_SCHEMA ='PREPARE' 
//WITH t1
OPTIONAL MATCH (t1)-[:RELY_ON]-(s1:SP)-[:RELY_ON]-(t2:TABLE)
where t2.TABLE_SCHEMA ='ANALYSE' 
//with t1, t2
OPTIONAL MATCH (t1)-[:RELY_ON]-(s1:SP)-[:RELY_ON]-(t2)-[:RELY_ON]-(s2:SP)-[:RELY_ON]-(t3:TABLE)
where t3.TABLE_SCHEMA ='PUBLISH'

with t1, s1, t2, s2, t3
OPTIONAL MATCH (d1:DBC)-[k1:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t1)
OPTIONAL MATCH (d2:DBC)-[k2:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t2)
OPTIONAL MATCH (d3:DBC)-[k3:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t3)
return  d1.DBC_ID AS PREPARE_DBC, type(k1) as PREPARE_DBC_TYPE,  d2.DBC_ID AS ANALYSE_DBC, type(k2) as ANALYSE_DBC_TYPE, d3.DBC_ID AS PUBLISH_DBC, type(k3) as PUBLISH_DBC_TYPE, t1.TABLE_ID as PREPARE_TABLE, s1.SP_ID, t2.TABLE_ID as ANALYSE_TABLE, s2.SP_ID, t3.TABLE_ID AS PUBLISH_TABLE
"""

df1 = graph.run(query).to_data_frame()

#Start from Publish
query = """
match r1=(t3:TABLE)
where t3.TABLE_SCHEMA ='PUBLISH' 

OPTIONAL MATCH (t3)-[:RELY_ON]-(s2:SP)-[:RELY_ON]-(t2:TABLE)
where t2.TABLE_SCHEMA ='ANALYSE' 

OPTIONAL MATCH (t1:TABLE)-[:RELY_ON]-(s1:SP)-[:RELY_ON]-(t2)-[:RELY_ON]-(s2)-[:RELY_ON]-(t3)
where t1.TABLE_SCHEMA ='PREPARE'

with t1, s1, t2, s2, t3
OPTIONAL MATCH (d1:DBC)-[k1:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t1)
OPTIONAL MATCH (d2:DBC)-[k2:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t2)
OPTIONAL MATCH (d3:DBC)-[k3:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t3)
return  d1.DBC_ID AS PREPARE_DBC, type(k1) as PREPARE_DBC_TYPE,  d2.DBC_ID AS ANALYSE_DBC, type(k2) as ANALYSE_DBC_TYPE, d3.DBC_ID AS PUBLISH_DBC, type(k3) as PUBLISH_DBC_TYPE, t1.TABLE_ID as PREPARE_TABLE, s1.SP_ID, t2.TABLE_ID as ANALYSE_TABLE, s2.SP_ID, t3.TABLE_ID AS PUBLISH_TABLE
"""
df2 = graph.run(query).to_data_frame()


#Start from Analyse
query = """
match r1=(t2:TABLE)
where t2.TABLE_SCHEMA ='ANALYSE' 

OPTIONAL MATCH (t3:TABLE)-[:RELY_ON]-(s2:SP)-[:RELY_ON]-(t2)
where t3.TABLE_SCHEMA ='PUBLISH' 

OPTIONAL MATCH (t1:TABLE)-[:RELY_ON]-(s1:SP)-[:RELY_ON]-(t2)
where t1.TABLE_SCHEMA ='PREPARE'

with t1, s1, t2, s2, t3
OPTIONAL MATCH (d1:DBC)-[k1:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t1)
OPTIONAL MATCH (d2:DBC)-[k2:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t2)
OPTIONAL MATCH (d3:DBC)-[k3:JOB_ADF_RELY_ON|JOB_DBC_RELY_ON|DBC_WRITE_TO_SYNAPSE]-(t3)
return  d1.DBC_ID AS PREPARE_DBC, type(k1) as PREPARE_DBC_TYPE,  d2.DBC_ID AS ANALYSE_DBC, type(k2) as ANALYSE_DBC_TYPE, d3.DBC_ID AS PUBLISH_DBC, type(k3) as PUBLISH_DBC_TYPE, t1.TABLE_ID as PREPARE_TABLE, s1.SP_ID, t2.TABLE_ID as ANALYSE_TABLE, s2.SP_ID, t3.TABLE_ID AS PUBLISH_TABLE
"""

df3 = graph.run(query).to_data_frame()


df = pd.concat([df1, df2, df3]).drop_duplicates()
df.to_excel("lineage_excel.xlsx")

In [18]:
#Start from Analyse
query = """
MATCH (t2:TABLE)-[:SP_INSERT_FROM]->(s2:SP)<-[:SP_INSERT_UPDATE]-(t3)
where t3.TABLE_SCHEMA ='PUBLISH' and t2.TABLE_SCHEMA ='PUBLISH' 

return  t3.TABLE_ID as ANALYSE_TABLE, s2.SP_ID, t2.TABLE_ID as ANALYSE_TABLE_2
"""

df = graph.run(query).to_data_frame()

In [19]:
df