In [None]:
!pip install -U pandas==1.5.3

Let's extract the dataset from its archive

In [None]:
import zipfile

dataset = "data/1000000 Sales Records.zip"
# Extracting the dataset from its archive
with zipfile.ZipFile("data/1000000 Sales Records.zip", 'r') as zip_ref:
    zip_ref.extractall("data")

Downloading the Neo4j Spark Connector jar, but only if it's not present

In [None]:
import wget,os

# Url of Neo4j Connector for Spark Jar
url = 'https://github.com/neo4j-contrib/neo4j-spark-connector/releases/download/4.1.5/neo4j-connector-apache-spark_2.12-4.1.5_for_spark_3.jar'

# Getting the name of the Spark Connector from its URL
filename = url.split("/")[-1]

# Download it only if it's not present
if filename not in os.listdir():
    print("Downlading Spark Connector")
    filename = wget.download(url)

Start the SparkSession by providing also the Neo4j Spark Connector jar

In [None]:
from pyspark.sql import SparkSession
import pandas as pd

# Providing the downloaded jar to the pyspark-shell
os.environ[
    'PYSPARK_SUBMIT_ARGS'] = f'--jars {filename} pyspark-shell'

#Ip address of the Neo4j database, you need also to provide its port number
NEO4J_IP_ADDRESS = "ip_address_of_neo4j:bolt_port_number(usually is 7687)"

# Building our SparkSession
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1536m").\
        getOrCreate()

We can now load the DataFrame of our dataset to our Spark Cluster

In [None]:
# Creating the Spark dataFrame
df_pd = pd.read_csv(dataset)
sparkDF=spark.createDataFrame(df_pd)
sparkDF.show(5)

Loading the nodes with label Country

In [None]:
# Getting the data from our SparkDataframe before writing it to Neo4j
countryValuesDF = sparkDF\
                    .select('Country')\
                    .distinct()\
                    .withColumnRenamed("Country", "countryName")

# Example of creating nodes by using the Connector options
countryValuesDF.write.format("org.neo4j.spark.DataSource")\
.mode("overwrite")\
.option("url", f"bolt://{NEO4J_IP_ADDRESS}")\
.option("authentication.basic.username", "neo4j")\
.option("authentication.basic.password", "test")\
.option("batch.size", "5000")\
.option("database", "neo4j")\
.option("labels","Country")\
.option("node.keys","countryName")\
.option("script","""CREATE CONSTRAINT IF NOT EXISTS FOR (t:Country) REQUIRE t.countryName IS UNIQUE;""")\
.save()

Loading the nodes with label Region

In [None]:
# Getting the data from our SparkDataframe before writing it to Neo4j
regionValuesDF = sparkDF\
                    .select('Region')\
                    .distinct()\
                    .withColumnRenamed("Region", "regionName")

print(f"Ingesting {regionValuesDF.count()} nodes of type Country")

# Example of creating nodes by using the Connector options
regionValuesDF.write.format("org.neo4j.spark.DataSource")\
.mode("overwrite")\
.option("url", f"bolt://{NEO4J_IP_ADDRESS}")\
.option("authentication.basic.username", "neo4j")\
.option("authentication.basic.password", "test")\
.option("batch.size", "5000")\
.option("database", "neo4j")\
.option("labels","Region")\
.option("node.keys","regionName")\
.option("script","""CREATE CONSTRAINT IF NOT EXISTS FOR (t:Region) REQUIRE t.regionName IS UNIQUE;""")\
.save()

Loading the relationship that connects nodes of type Country with Nodes of type Region (IN_REGION)

In [None]:
# Getting the data from our SparkDataframe before writing it to Neo4j
regionCountryDf = sparkDF\
                    .select(['Region','Country'])\
                    .distinct()\
                    .withColumnRenamed("Region", "regionName")\
                    .withColumnRenamed("Country", "countryName")

print(f"Ingesting {regionCountryDf.count()} relationships from Region to Node called HAS_COUNTRY")

regionCountryDf\
    .write\
    .mode("overwrite")\
    .format("org.neo4j.spark.DataSource")\
    .option("url", f"bolt://{NEO4J_IP_ADDRESS}")\
    .option("authentication.basic.username", "neo4j")\
    .option("authentication.basic.password", "test")\
    .option("relationship", "HAS_COUNTRY")\
    .option("relationship.save.strategy", "keys")\
    .option("relationship.source.labels", "Region")\
    .option("relationship.source.save.mode", "Overwrite")\
    .option("relationship.source.node.keys", "regionName:regionName")\
    .option("relationship.target.labels", "Country")\
    .option("relationship.target.save.mode", "Overwrite")\
    .option("relationship.target.node.keys", "countryName:countryName")\
    .save()


Ingesting all the Order nodes

In [None]:
# Getting the data from our SparkDataframe before writing it to Neo4j
orderDF = sparkDF\
            .select(['Order ID'])\
            .distinct()\
            .withColumnRenamed("Order ID", "orderId")

print(f"Ingesting {orderDF.count()} nodes of type Order")

# When loading nodes using the native format of the Neo4j Connector,
# The connector will load all the nodes inside
orderDF.write.format("org.neo4j.spark.DataSource")\
.mode("overwrite")\
.option("url", f"bolt://{NEO4J_IP_ADDRESS}")\
.option("authentication.basic.username", "neo4j")\
.option("authentication.basic.password", "test")\
.option("batch.size", "10000")\
.option("database", "neo4j")\
.option("labels","Order")\
.option("node.keys","orderId")\
.option("script","""CREATE CONSTRAINT IF NOT EXISTS FOR (t:Order) REQUIRE t.orderId IS UNIQUE;""")\
.save()

Loading the OrderDetail nodes and the relationship they have with their Order (right now we are not using all the data inside the dataset, an OrderDetail objects contains only its total profit)

In [None]:
# Getting the data from our SparkDataframe before writing it to Neo4j
orderDetailDF = sparkDF\
            .select(['Order ID','Order Date','Total Profit',"Units Sold","Unit Price","Unit Cost"])\
            .distinct()\
            .withColumnRenamed("Order ID", "orderId")\
            .withColumnRenamed("Order Date", "orderDate")\
            .withColumnRenamed("Units Sold", "unitSold")\
            .withColumnRenamed("Unit Price", "unitPrice")\
            .withColumnRenamed("Unit Cost", "unitCost")

print(f"Ingesting {orderDetailDF.count()} relationships from Order to OrderDetail nodes called HAS_ORDER_DETAIL")

# Using a query gives us the opportunity to create much more complex injection logics
orderDetailDF.write.format("org.neo4j.spark.DataSource")\
.mode("overwrite")\
.option("url", f"bolt://{NEO4J_IP_ADDRESS}")\
.option("authentication.basic.username", "neo4j")\
.option("authentication.basic.password", "test")\
.option("batch.size", "10000")\
.option("database", "neo4j")\
.option("query","""MATCH (n:Order{orderId: event.orderId})
                   WITH event, n, toString(event.orderId) + "-" + toString(event.orderDate) as orderDetailId
                   MERGE (od:OrderDetail{
                                        orderDetailId:orderDetailId,
                                        unitSold:event.unitSold,
                                        unitPrice:event.unitPrice,
                                        unitCost:event.unitCost
                                        })
                   MERGE (n)-[:HAS_ORDER_DETAIL]->(od)""")\
.option("script","""CREATE CONSTRAINT IF NOT EXISTS FOR (t:OrderDetail) REQUIRE t.orderDetailId IS UNIQUE;""")\
.save()

Let's load the categories of each OrderDetail, called ItemType (remember to sort them, because we have only 10 categories across almost a million orders, so they're supernodes).

In [None]:
# Getting the data from our SparkDataframe before writing it to Neo4j
itemTypeOrderDetailDF = sparkDF\
                    .select(['Item Type', 'Order Id', 'Order Date'])\
                    .withColumnRenamed("Item Type", "itemType")\
                    .withColumnRenamed("Order ID", "orderId")\
                    .withColumnRenamed("Order Date", "orderDate")\
                    .sort('itemType')

print(f"Ingesting {itemTypeOrderDetailDF.count()} relationships from OrderDetail to ItemType nodes called HAS_ITEM_TYPE")

# Using a query gives us the opportunity to create much more complex injection logics
itemTypeOrderDetailDF.write.format("org.neo4j.spark.DataSource")\
.mode("overwrite")\
.option("url", f"bolt://{NEO4J_IP_ADDRESS}")\
.option("authentication.basic.username", "neo4j")\
.option("authentication.basic.password", "test")\
.option("batch.size", "10000")\
.option("database", "neo4j")\
.option("query","""WITH event,toString(event.orderId) + "-" + toString(event.orderDate) as orderDetailId
                   MATCH (od:OrderDetail{orderDetailId: orderDetailId})
                   MERGE (it:ItemType{itemType:event.itemType})
                   MERGE (od)-[:HAS_ITEM_TYPE]->(it)""")\
.option("script","""CREATE CONSTRAINT IF NOT EXISTS FOR (it:ItemType) REQUIRE it.itemType IS UNIQUE;""")\
.save()

Ingesting all the relationships between an Order and its Country

In [None]:
# Getting the data from our SparkDataframe before writing it to Neo4j
countryOrderDf = sparkDF\
                    .select(['Country','Order Id'])\
                    .distinct()\
                    .withColumnRenamed("Country", "countryName")\
                    .withColumnRenamed("Order Id", "orderId")

print(f"Ingesting {countryOrderDf.count()} relationships from Country to Order nodes called HAS_ORDER")

countryOrderDf\
    .write\
    .mode("overwrite")\
    .format("org.neo4j.spark.DataSource")\
    .option("url", f"bolt://{NEO4J_IP_ADDRESS}")\
    .option("authentication.basic.username", "neo4j")\
    .option("authentication.basic.password", "test")\
    .option("relationship", "HAS_ORDER")\
    .option("relationship.save.strategy", "keys")\
    .option("relationship.source.labels", "Country")\
    .option("relationship.source.save.mode", "Overwrite")\
    .option("relationship.source.node.keys", "countryName:countryName")\
    .option("relationship.target.labels", "Order")\
    .option("relationship.target.save.mode", "Overwrite")\
    .option("relationship.target.node.keys", "orderId:orderId")\
    .save()

At the end of all, we should stop the sparkSession

In [None]:
# Close the SparkSession
spark.stop()