In [1]:
###
# Copyright (2024) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###




from neo4j import GraphDatabase
import os
import pandas as pd
import time
print("Import successful")

Import successful


In [2]:
URI = os.environ["NEO4J_URI"]
USER=os.environ["NEO4J_USER_NAME"]
PASSWORD=os.environ["NEO4J_PASSWD"]
AUTH = (os.environ["NEO4J_USER_NAME"], os.environ["NEO4J_PASSWD"])

In [3]:
#Neo4J connect and Query Boilerplate

class Neo4jConnection:
    
    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            #response = (session.run(query, parameters))
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        
        #return pd.DataFrame([r.values() for r in response], columns=response.keys())
        return response
    
    def multi_query(self, multi_line_query, parameters=None, db=None):
        for li in multi_line_query.splitlines():
                print(li)
                result=self.query(li, parameters=None, db=None)
                print(result)

In [4]:
#Make a default connection and it should return `[<Record count(n)=0>]`
conn = Neo4jConnection(uri=URI, 
                       user=USER,              
                       pwd=PASSWORD)

#if db is empty, then seed with init values 
res=conn.query('MATCH (n) RETURN count(n)')
print(res)

[<Record count(n)=7356314>]


In [5]:
# NOTE - to be executed only once per KG. If it alreafy executed as a part of loading PWC dataset, this does not need to be executed
# Before adding relationships, need to construct a constraint
constraint = """
CREATE CONSTRAINT pipeline_id FOR (pipeline:Pipeline) REQUIRE pipeline.itemID IS UNIQUE;
CREATE CONSTRAINT stage_id FOR (stage:Stage) REQUIRE stage.itemID IS UNIQUE;
CREATE CONSTRAINT execution_id FOR (execution:Execution) REQUIRE execution.itemID IS UNIQUE;
CREATE CONSTRAINT artifact_id FOR (artifact:Artifact) REQUIRE artifact.itemID IS UNIQUE;
CREATE CONSTRAINT task_id FOR (task:Task) REQUIRE task.itemID IS UNIQUE;
CREATE CONSTRAINT dataset_id FOR (dataset:Dataset) REQUIRE dataset.itemID IS UNIQUE;
CREATE CONSTRAINT model_id FOR (model:Model) REQUIRE model.itemID IS UNIQUE;
CREATE CONSTRAINT parameter_id FOR (parameter:Parameter) REQUIRE parameter.itemID IS UNIQUE;
CREATE CONSTRAINT metric_id FOR (metric:Metric) REQUIRE metric.itemID IS UNIQUE;
CREATE CONSTRAINT framework_id FOR (framework:Framework) REQUIRE framework.itemID IS UNIQUE;
CREATE CONSTRAINT report_id FOR (report:Report) REQUIRE report.itemID IS UNIQUE;
CALL db.awaitIndexes();
"""
# TODO - parameters need to be added to constraints

# DROP INDEX ON :Dataset(datasetID)

call_db = """CALL db.indexes();"""

In [6]:
# Query to load the nodes
load_pipelines = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/pipelines.csv') yield map as row", 
"MERGE (pipeline:Pipeline {itemID: row.pipeline_id}) 
ON CREATE SET pipeline.name = row.pipeline_name, pipeline.source = row.source, pipeline.srcID = row.src_id", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_stages = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/stages.csv') yield map as row", 
"MERGE (stage:Stage {itemID: row.stage_id}) 
ON CREATE SET stage.name = row.stage_name, stage.source = row.source, stage.pipelineID = row.pipeline_id, stage.pipelineName = row.pipeline_name, stage.properties=row.properties", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_executions = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/executions.csv') yield map as row", 
"MERGE (execution:Execution {itemID: row.execution_id}) 
ON CREATE SET execution.name = row.execution_name, execution.srcID = row.src_id, execution.source = row.source, execution.pipelineID = row.pipelineID, execution.pipelineName = row.pipeline_name, execution.command = row.command, execution.properties=row.properties", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_artifacts = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/artifacts.csv') yield map as row", 
"MERGE (artifact:Artifact {itemID: row.artifact_id}) 
ON CREATE SET artifact.name = row.artifact_name, artifact.pipelineID = row.pipelineID, artifact.pipelineName = row.pipeline_name, artifact.source = row.source, artifact.executionID=row.execution_id", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_tasks = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/tasks.csv') yield map as row", 
"MERGE (task:Task {itemID: row.task_id}) 
ON CREATE SET task.name = row.task_name, task.source = row.source, task.taskDesc = row.task_description, task.srcID = row.src_id, task.taskType = row.task_type, task.modality = row.modality, task.category = row.category", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_datasets = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/datasets.csv') yield map as row", 
"MERGE (dataset:Dataset {itemID: row.dataset_id}) 
ON CREATE SET dataset.name = row.dataset_name, dataset.datasetDesc = row.description, dataset.srcID = row.src_id, dataset.url = row.url, dataset.modality=row.modality, dataset.source = row.source", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_models = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/models.csv') yield map as row", 
"MERGE (model:Model {itemID: row.model_id})
ON CREATE SET model.name = row.model_name, model.modelClass = row.model_class, model.url = row.model_url, model.modelDesc = row.description, model.source = row.source", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_metrics = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/metrics.csv') yield map as row", 
"MERGE (metric:Metric {itemID: row.metric_id}) 
ON CREATE SET metric.result=row.metrics, metric.source = row.source", 
{batchSize:1000, iterateList:true, parallel:true})"""

load_frameworks = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/frameworks.csv') yield map as row", 
"MERGE (framework:Framework {itemID: row.framework_id})
ON CREATE SET framework.name = row.framework_name, framework.url = row.url, framework.description = row.description, framework.version = row.framework_version, framework.source = row.source",
{batchSize:1000, iterateList:true, parallel:true})"""

load_reports = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/nodes/reports.csv') yield map as row", 
"MERGE (report:Report {itemID: row.report_id})
ON CREATE SET report.name = row.title, report.url = row.url, report.abstractUrl = row.abstract_url, report.source = row.source, report.srcID = row.src_id",
{batchSize:1000, iterateList:true, parallel:true})"""


In [7]:

# Queries to load relationships
rel_pipeline_task = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-pipeline-task.csv') yield map as row", 
"MATCH (pipeline:Pipeline {itemID: row.pipeline_id})
MATCH (task:Task {itemID: row.task_id})
MERGE (pipeline)-[:executes]->(task)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_pipeline_framework = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-pipeline-framework.csv') yield map as row", 
"MATCH (pipeline:Pipeline {itemID: row.pipeline_id})
MATCH (framework:Framework {itemID: row.framework_id})
MERGE (pipeline)-[:uses]->(framework)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_pipeline_report = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-pipeline-report.csv') yield map as row", 
"MATCH (pipeline:Pipeline {itemID: row.pipeline_id})
MATCH (report:Report {itemID: row.report_id})
MERGE (pipeline)-[:has]->(report)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_pipeline_stage = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-pipeline-stage.csv') yield map as row", 
"MATCH (pipeline:Pipeline {itemID: row.pipeline_id})
MATCH (stage:Stage {itemID: row.stage_id})
MERGE (pipeline)-[:contains]->(stage)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_stage_execution = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-stage-execution.csv') yield map as row", 
"MATCH (stage:Stage {itemID: row.stage_id})
MATCH (execution:Execution {itemID: row.execution_id})
MERGE (stage)-[:runs]->(execution)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_execution_metric = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-execution-metric.csv') yield map as row", 
"MATCH (execution:Execution {itemID: row.execution_id})
MATCH (metric:Metric {itemID: row.metric_id})
MERGE (execution)-[:generates]->(metric)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_execution_artifact = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-execution-artifact.csv') yield map as row", 
"MATCH (execution:Execution {itemID: row.execution_id})
MATCH (artifact:Artifact {itemID: row.artifact_id})
MERGE (execution)-[:isOutput]->(artifact)
MERGE (execution)<-[:isInput]-(artifact)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_artifact_dataset = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-artifact-dataset.csv') yield map as row", 
"MATCH (artifact:Artifact {itemID: row.artifact_id})
MATCH (dataset:Dataset {itemID: row.dataset_id})
MERGE (artifact)<-[:subCatOf]-(dataset)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_artifact_model = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-artifact-model.csv') yield map as row", 
"MATCH (artifact:Artifact {itemID: row.artifact_id})
MATCH (model:Model {itemID: row.model_id})
MERGE (artifact)<-[:subCatOf]-(model)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_artifact_metric = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-artifact-metric.csv') yield map as row", 
"MATCH (artifact:Artifact {itemID: row.artifact_id})
MATCH (metric:Metric {itemID: row.metric_id})
MERGE (artifact)<-[:subCatOf]-(metric)", 
{batchSize:10000, iterateList:true, parallel:true})"""

rel_execution_metric = """CALL apoc.periodic.iterate("CALL apoc.load.csv('/huggingface/relationships/rel-execution-metric.csv') yield map as row", 
"MATCH (execution:Execution {itemID: row.execution_id})
MATCH (metric:Metric {itemID: row.metric_id})
MERGE (execution)-[:generates]->(metric)", 
{batchSize:10000, iterateList:true, parallel:true})"""

In [8]:

"""
# NOTE - constraints is to be executed only once per KG. If it already executed as a part of loading other data sources, this does not need to be executed
# const_res = conn.query(constraint)
# call_db_res = conn.query(call_db)
"""


start_time = time.time()
# Loading nodes to KG
print("Adding Nodes...")

res_pipeline=conn.query(load_pipelines)
print("Pipeline nodes loaded. Time taken:" + str(res_pipeline[0][2]) + ' seconds. Committed Operations: ' + str(res_pipeline[0][3]) + '. Failed Operations:' + str(res_pipeline[0][4]))

res_stages=conn.query(load_stages)
print("Stage nodes loaded. Time taken:" + str(res_stages[0][2]) + ' seconds. Committed Operations: ' + str(res_stages[0][3]) + '. Failed Operations:' + str(res_stages[0][4]))

res_executions=conn.query(load_executions)
print("Execution nodes loaded. Time taken:" + str(res_executions[0][2]) + ' seconds. Committed Operations: ' + str(res_executions[0][3]) + '. Failed Operations:' + str(res_executions[0][4]))

res_artifacts=conn.query(load_artifacts)
print("Artifacts nodes loaded. Time taken:" + str(res_artifacts[0][2]) + ' seconds. Committed Operations: ' + str(res_artifacts[0][3]) + '. Failed Operations:' + str(res_artifacts[0][4]))

res_tasks=conn.query(load_tasks)
print("Task nodes loaded. Time taken:" + str(res_tasks[0][2]) + ' seconds. Committed Operations: ' + str(res_tasks[0][3]) + '. Failed Operations:' + str(res_tasks[0][4]))

res_datasets=conn.query(load_datasets)
print("Dataset nodes loaded. Time taken:" + str(res_datasets[0][2]) + ' seconds. Committed Operations: ' + str(res_datasets[0][3]) + '. Failed Operations:' + str(res_datasets[0][4]))

res_models=conn.query(load_models)
print("Model nodes loaded. Time taken:" + str(res_models[0][2]) + ' seconds. Committed Operations: ' + str(res_models[0][3]) + '. Failed Operations:' + str(res_models[0][4]))

res_metrics=conn.query(load_metrics)
print("Metrics nodes loaded. Time taken:" + str(res_metrics[0][2]) + ' seconds. Committed Operations: ' + str(res_metrics[0][3]) + '. Failed Operations:' + str(res_metrics[0][4]))

res_frameworks=conn.query(load_frameworks)
print("Framework nodes loaded. Time taken:" + str(res_frameworks[0][2]) + ' seconds. Committed Operations: ' + str(res_frameworks[0][3]) + '. Failed Operations:' + str(res_frameworks[0][4]))

res_reports=conn.query(load_reports)
print("Report nodes loaded. Time taken:" + str(res_reports[0][2]) + ' seconds. Committed Operations: ' + str(res_reports[0][3]) + '. Failed Operations:' + str(res_reports[0][4]))

print("--- %s seconds ---" % (time.time() - start_time))

Adding Nodes...
Pipeline nodes loaded. Time taken:1 seconds. Committed Operations: 267140. Failed Operations:0
Stage nodes loaded. Time taken:1 seconds. Committed Operations: 267140. Failed Operations:0
Execution nodes loaded. Time taken:1 seconds. Committed Operations: 287189. Failed Operations:0
Artifacts nodes loaded. Time taken:2 seconds. Committed Operations: 287189. Failed Operations:0
Task nodes loaded. Time taken:0 seconds. Committed Operations: 40. Failed Operations:0
Dataset nodes loaded. Time taken:0 seconds. Committed Operations: 47318. Failed Operations:0
Model nodes loaded. Time taken:1 seconds. Committed Operations: 267140. Failed Operations:0
Metrics nodes loaded. Time taken:0 seconds. Committed Operations: 64421. Failed Operations:0
Framework nodes loaded. Time taken:0 seconds. Committed Operations: 267140. Failed Operations:0
Report nodes loaded. Time taken:0 seconds. Committed Operations: 467. Failed Operations:0
--- 10.68008804321289 seconds ---


In [11]:

"""
# NOTE - IMPORTANT
# If the result of some query has failed operations, copy paste the query in neo4j browser and run it again and again till the value under 
#"Failed Operations" become 0.
"""

start_time = time.time()
# Loading relationships
print("Adding Relationships..")
res = conn.query(rel_pipeline_framework)
print("Added relationship between Pipeline and Framework. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_pipeline_task)
print("Added relationship between Pipeline and Task. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_pipeline_stage)
print("Added relationship between Pipeline and Stage. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_stage_execution)
print("Added relationship between Stage and Execution. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_execution_artifact)
print("Added relationship between Execution and Artifact. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_artifact_dataset)
print("Added relationship between Artifact and Dataset. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_artifact_model)
print("Added relationship between Artifact and Model. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_artifact_metric)
print("Added relationship between Artifact and Metric. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_execution_metric)
print("Added relationship between Execution and Metric. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

res = conn.query(rel_pipeline_report)
print("Added relationship between Pipeline and Report. Time taken:" + str(res[0][2]) + ' seconds. Committed Operations: ' + str(res[0][3]) + '. Failed Operations:' + str(res[0][4]))

print("--- %s seconds ---" % (time.time() - start_time))

Adding Relationships..
Added relationship between Pipeline and Framework. Time taken:3 seconds. Committed Operations: 207140. Failed Operations:60000
Added relationship between Pipeline and Task. Time taken:0 seconds. Committed Operations: 88907. Failed Operations:60000
Added relationship between Pipeline and Stage. Time taken:0 seconds. Committed Operations: 267140. Failed Operations:0
Added relationship between Stage and Execution. Time taken:0 seconds. Committed Operations: 287189. Failed Operations:0
Added relationship between Execution and Artifact. Time taken:0 seconds. Committed Operations: 287189. Failed Operations:0
Added relationship between Artifact and Dataset. Time taken:0 seconds. Committed Operations: 36712. Failed Operations:0
Added relationship between Artifact and Model. Time taken:0 seconds. Committed Operations: 287189. Failed Operations:0
Added relationship between Artifact and Metric. Time taken:0 seconds. Committed Operations: 64269. Failed Operations:0
Added rel