In [None]:
%pip install neo4j

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
%ls /dbfs/mnt/bclearer/temp/mansoors_folder/evolve/bukom/delta/spi_v12/evolve/E3205

[0m[34;42mbclearer_edges[0m/  [34;42mbclearer_nodes[0m/


In [None]:
%ls /dbfs/mnt/bclearer/temp/mansoors_folder/evolve/bukom/delta/spi_v11/evolve/E5003

[0m[34;42mbclearer_neo4j_edges[0m/  [34;42mbclearer_neo4j_nodes[0m/


In [None]:
%ls /dbfs/mnt/bclearer/temp/mansoors_folder_neo4j_output/

[0m[34;42mall_edges[0m/  [34;42mall_nodes[0m/



#Global variables

In [None]:
IP_ADDRESS = "10.1.0.5"  # 20.76.138.131 RCC, 10.1.0.5 local
DATABASE_USERNAME = "neo4j"
DATABASE_PASSWORD = ""
DATABASE_CONNECTION_URL = f"bolt://{IP_ADDRESS}:7687"
# DATA_CORE_PATH = "/mnt/bclearer/temp/mansoors_folder/evolve/bukom/delta/spi_v12/evolve/E3205"  #Iteration
# FILE_BASENAME = "bclearer"
DATA_CORE_PATH = "/mnt/bclearer/temp/mansoors_folder/evolve/bukom/delta/spi_v11/evolve/E5003"  # Iteration
FILE_BASENAME = "bclearer_neo4j"
DATABASE = "pbsv11E5003"
NEO4J_WEBAPP = f"http://{IP_ADDRESS}:7474/"


#Imports

In [None]:
# from pyspark.sql.types import *
# from pyspark.sql.functions import *

In [None]:
from functools import reduce

from neo4j import GraphDatabase
from pyspark.sql import DataFrame, SparkSession


#Delete data

In [None]:
AUTH = (DATABASE_USERNAME, DATABASE_PASSWORD)


def delete_database(db_name: str):
    with GraphDatabase.driver(DATABASE_CONNECTION_URL, auth=AUTH) as driver:
        driver.execute_query(
            "DROP DATABASE $db IF EXISTS",
            db=db_name,
        )


def create_database(db_name: str):
    with GraphDatabase.driver(DATABASE_CONNECTION_URL, auth=AUTH) as driver:
        driver.execute_query(
            "CREATE DATABASE $db IF NOT EXISTS",
            db=db_name,
        )


# Deletes all information within a database
def delete_nodes_edges(db_name: str):
    with GraphDatabase.driver(DATABASE_CONNECTION_URL, auth=AUTH) as driver:
        driver.execute_query(
            """
            MATCH (n)
            DETACH DELETE n
            """,
            database_=db_name,
        )


#Load dataset

In [None]:
def load_dataframe(
    path: "str",
) -> "DataFrame":
    return spark.read.format("delta").load(path)


#Load nodes and edges

In [None]:
nodes_df = load_dataframe(f"{DATA_CORE_PATH}/{FILE_BASENAME}_nodes")
edges_df = load_dataframe(f"{DATA_CORE_PATH}/{FILE_BASENAME}_edges")


#Save data to neo4j

In [None]:
delete_database(DATABASE)
create_database(DATABASE)

In [None]:
# Building our SparkSession
spark = SparkSession.builder.appName("bukom-Shell-POC").getOrCreate()

# this block I want to manipulate the data so that I have dataframes containing the different types in one data frame for nodes and also for edges I want to separate dataframes that has same rels
column_name_node = "type"
column_name_edge = "relation_type"

split_node_df = {}
split_edge_df = {}

distinct_values = [
    row[column_name_node]
    for row in nodes_df.select(column_name_node).distinct().collect()
]

for type_value in distinct_values:
    new_df = nodes_df.filter(nodes_df["type"] == type_value)
    if type_value not in split_node_df:
        split_node_df[type_value] = [new_df]
    else:
        split_node_df[type_value].append(new_df)

distinct_values = [
    row[column_name_edge]
    for row in edges_df.select(column_name_edge).distinct().collect()
]

for type_value in distinct_values:
    new_df = edges_df.filter(edges_df["relation_type"] == type_value)
    if type_value not in split_edge_df:
        split_edge_df[type_value] = [new_df]
    else:
        split_edge_df[type_value].append(new_df)

In [None]:
split_node_df["tag"]

[DataFrame[source_primary_key_hash: string, name: string, description: string, type: string, source_system: string, stringified_aliases: array<string>, universe: string]]

In [None]:
df_tag = split_node_df.get("tag")
df_tag = reduce(DataFrame.unionAll, df_tag)

df_plant = split_node_df.get("plant")
df_plant = reduce(DataFrame.unionAll, df_plant)

df_site = split_node_df.get("site")
df_site = reduce(DataFrame.unionAll, df_site)

df_pu = split_node_df.get("process_unit")
df_pu = reduce(DataFrame.unionAll, df_pu)

df_issues = split_node_df.get("issues")
df_issues = reduce(DataFrame.unionAll, df_issues)

df_issue_type = split_node_df.get("issue_type")
df_issue_type = reduce(DataFrame.unionAll, df_issue_type)

df_refinery = split_node_df.get("refinery")
df_refinery = reduce(DataFrame.unionAll, df_refinery)


df_same_as = split_edge_df.get("SAME_AS")
df_same_as = reduce(DataFrame.unionAll, df_same_as)

df_whole_part = split_edge_df.get("WHOLE_PART")
df_whole_part = reduce(DataFrame.unionAll, df_whole_part)

df_has_issue = split_edge_df.get("HAS_ISSUE")
df_has_issue = reduce(DataFrame.unionAll, df_has_issue)

df_has_issue_type = split_edge_df.get("HAS_ISSUE_TYPE")
df_has_issue_type = reduce(DataFrame.unionAll, df_has_issue_type)

# drop the type column
df_tag = df_tag.drop("type")
df_plant = df_plant.drop("type")
df_site = df_site.drop("type")
df_pu = df_pu.drop("type")
df_issues = df_issues.drop("type")

In [None]:
df_tag.write.format("org.neo4j.spark.DataSource").mode("overwrite").option(
    "url",
    DATABASE_CONNECTION_URL,
).option("authentication.basic.username", DATABASE_USERNAME).option(
    "authentication.basic.password",
    DATABASE_PASSWORD,
).option("database", DATABASE).option("labels", "Entity:Tag").option(
    "node.keys",
    "source_primary_key_hash",
).option(
    "script",
    """CREATE CONSTRAINT IF NOT EXISTS FOR (tag:Tag) REQUIRE tag.source_primary_key_hash IS UNIQUE;
                        CREATE INDEX IF NOT EXISTS FOR (entity:Entity) ON entity.source_primary_key_hash;""",
).save()

In [None]:
df_plant.write.format("org.neo4j.spark.DataSource").mode("overwrite").option(
    "url",
    DATABASE_CONNECTION_URL,
).option("authentication.basic.username", DATABASE_USERNAME).option(
    "authentication.basic.password",
    DATABASE_PASSWORD,
).option("database", DATABASE).option("labels", "Entity:Plant").option(
    "node.keys",
    "source_primary_key_hash",
).option(
    "script",
    """CREATE CONSTRAINT IF NOT EXISTS FOR (plant:Plant) REQUIRE plant.source_primary_key_hash IS UNIQUE;""",
).save()

In [None]:
df_site.write.format("org.neo4j.spark.DataSource").mode("overwrite").option(
    "url",
    DATABASE_CONNECTION_URL,
).option("authentication.basic.username", DATABASE_USERNAME).option(
    "authentication.basic.password",
    DATABASE_PASSWORD,
).option("database", DATABASE).option("labels", "Entity:Site").option(
    "node.keys",
    "source_primary_key_hash",
).option(
    "script",
    """CREATE CONSTRAINT IF NOT EXISTS FOR (s:Site) REQUIRE s.source_primary_key_hash IS UNIQUE;""",
).save()

In [None]:
df_pu.write.format("org.neo4j.spark.DataSource").mode("overwrite").option(
    "url",
    DATABASE_CONNECTION_URL,
).option("authentication.basic.username", DATABASE_USERNAME).option(
    "authentication.basic.password",
    DATABASE_PASSWORD,
).option("database", DATABASE).option("labels", "Entity:ProcessUnit").option(
    "node.keys",
    "source_primary_key_hash",
).option(
    "script",
    """CREATE CONSTRAINT IF NOT EXISTS FOR (pu:ProcessUnit) REQUIRE pu.source_primary_key_hash IS UNIQUE;""",
).save()

In [None]:
df_issues.write.format("org.neo4j.spark.DataSource").mode("overwrite").option(
    "url",
    DATABASE_CONNECTION_URL,
).option("authentication.basic.username", DATABASE_USERNAME).option(
    "authentication.basic.password",
    DATABASE_PASSWORD,
).option("database", DATABASE).option("labels", "Entity:Issue").option(
    "node.keys",
    "source_primary_key_hash",
).option(
    "script",
    """CREATE CONSTRAINT IF NOT EXISTS FOR (issue:Issue) REQUIRE issue.source_primary_key_hash IS UNIQUE;""",
).save()

In [None]:
df_issue_type.write.format("org.neo4j.spark.DataSource").mode("overwrite").option(
    "url",
    DATABASE_CONNECTION_URL,
).option("authentication.basic.username", DATABASE_USERNAME).option(
    "authentication.basic.password",
    DATABASE_PASSWORD,
).option("database", DATABASE).option("labels", "Entity:IssueType").option(
    "node.keys",
    "source_primary_key_hash",
).option(
    "script",
    """CREATE CONSTRAINT IF NOT EXISTS FOR (issueType:IssueType) REQUIRE issueType.source_primary_key_hash IS UNIQUE;""",
).save()

In [None]:
df_refinery.write.format("org.neo4j.spark.DataSource").mode("overwrite").option(
    "url",
    DATABASE_CONNECTION_URL,
).option("authentication.basic.username", DATABASE_USERNAME).option(
    "authentication.basic.password",
    DATABASE_PASSWORD,
).option("database", DATABASE).option("labels", "Entity:Refinery").option(
    "node.keys",
    "source_primary_key_hash",
).option(
    "script",
    """CREATE CONSTRAINT IF NOT EXISTS FOR (refinery:Refinery) REQUIRE refinery.source_primary_key_hash IS UNIQUE;""",
).save()

In [None]:
df_same_as.repartition(1).write.format("org.neo4j.spark.DataSource").mode(
    "append",
).option("url", DATABASE_CONNECTION_URL).option(
    "authentication.basic.username",
    DATABASE_USERNAME,
).option("authentication.basic.password", DATABASE_PASSWORD).option(
    "database",
    DATABASE,
).option(
    "query",
    """ MATCH (source:Entity {source_primary_key_hash: event.source}), (destination:Entity {source_primary_key_hash: event.destination}) MERGE (source)-[:SAME_AS]->(destination)""",
).save()

In [None]:
df_whole_part.repartition(1).write.format("org.neo4j.spark.DataSource").mode(
    "append",
).option("url", DATABASE_CONNECTION_URL).option(
    "authentication.basic.username",
    DATABASE_USERNAME,
).option("authentication.basic.password", DATABASE_PASSWORD).option(
    "database",
    DATABASE,
).option(
    "query",
    """ MATCH (source:Entity {source_primary_key_hash: event.source}), (destination:Entity {source_primary_key_hash: event.destination}) MERGE (source)-[:WHOLE_PART]->(destination)""",
).save()

In [None]:
df_has_issue.repartition(1).write.format("org.neo4j.spark.DataSource").mode(
    "append",
).option("url", DATABASE_CONNECTION_URL).option(
    "authentication.basic.username",
    DATABASE_USERNAME,
).option("authentication.basic.password", DATABASE_PASSWORD).option(
    "database",
    DATABASE,
).option(
    "query",
    """ MATCH (source:Entity {source_primary_key_hash: event.source}), (destination:Entity {source_primary_key_hash: event.destination}) MERGE (source)-[:HAS_ISSUE]->(destination)""",
).save()

In [None]:
df_has_issue_type.repartition(1).write.format("org.neo4j.spark.DataSource").mode(
    "append",
).option("url", DATABASE_CONNECTION_URL).option(
    "authentication.basic.username",
    DATABASE_USERNAME,
).option("authentication.basic.password", DATABASE_PASSWORD).option(
    "database",
    DATABASE,
).option(
    "query",
    """ MATCH (source:Entity {source_primary_key_hash: event.source}), (destination:Entity {source_primary_key_hash: event.destination}) MERGE (source)<-[:HAS_ISSUE_TYPE]-(destination)""",
).save()