# Leveraging the Neo4j Streams module - Part 1 - Build a Just-In-Time Datawarehouse

# A Legacy Structure

## Enterprise Data Warehouses
Traditional DWH requires data teams to constantly build multiple costly and time-consuming **E**xtract **T**ransform **L**oad (ETL) pipelines to ultimately derive business insights.
One of the biggest pain points is that, due to its **Rigid architecture that's difficult to chang*e*, Enterprise Data Warehouses are **inherently rigid**. That's because:

* they are **based on** the **Schema-On-Write** architecture: first, you define your schema, then you write your data, then you read your data and it comes back in the schema you defined up-front;
* they are **based on** (expensive) **batched/scheduled jobs**;

**This results in having to build costly and time-consuming ETL pipeline** to access and manipulate the data. And as **new data types** and sources are introduced, the need to augment your ETL pipelines **exacerbates the problem**.
Thanks to the combination of the stream data processing with the Neo4j Streams CDC module and the Schema-On-Read approach provided by Apache Spark we can overcome this rigidity and build a new kind of (flexible) DWH.

# A paradigm shift: Just-In-Time Data Warehouse

A JIT-DWH solution is designed to easily handle a wider variety of data from different sources an start from a different approach about how to deal and manage data: **Schema-On-Read**

## Schema-On-Read

Schema-On-Read follows a different sequence, it just loads the data as-is and applies your own lens to the data when you read it back out. With this kind of approach you can present data in a schema that is adapted best to the queries being issued. You're not stuck with a one-size-fits-all schema. With schema-on-read, you can present the data back in a schema that is most relevant to the task at hand.

## How to?

Apply this kind of schema is pretty simple and leverage the **Neo4j Streams** module with **Apache Spark**'s Structured Streaming Apis and **Apache Kafka**

# Leveraging the Neo4j Streams Change Data Capture (CDC)

The Neo4j Streams project is composed of three main pillars:

* The Change Data Capture that allows to stream database changes over Kafka topics
* The Sink that allows consuming data streams from Kafka topic
* A set of procedures that allows to Produce/Consume data to/from Kafka Topics

This notebook is focused on the Change Data Capture

## What is a Change Data Capture?

It's a system that automatically captures changes from a source system (a Datatabase for instace) and automatically notifies these changes to a target system.
CDC typically forms part of an ETL pipeline. This is an important component for ensuring Data Warehouses (DWH) are kept up to date with any record changes.
Also traditionally CDC applications worked off of transaction logs, thereby allowing to replicate databases without having a/much performance impact on its operation.

## How the Neo4j Streams CDC module deal with database changes?

Every transaction inside Neo4j gets intercept and unpacked in order to stream an atomic element of the transaction.
Let's suppose we have a simple creation of two nodes and one relationship between them:

```
CREATE (andrea:Person{name:"Andrea"})-[knows:KNOWS{since:2014}]->(michael:Person{name:"Michael"})
```

The CDC module will unpack this transaction into 3 events (2 node creation, 1 relationship creation). And the data streamed from the CDC has the following structure:

The Event structure was inspired by the [Debezium](https://debezium.io/) format and has the following general structure:

```
{
  "meta": { /* transaction meta-data */ },
  "payload": { /* the data related to the transaction */
    "before": { /* the data before the transaction */},
    "after": { /* the data after the transaction */}
  }
}
```

Node source `(andrea)`:
```
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 1,
    "tx_event_id": 0,
    "tx_events_count": 3,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "name": "Andrea"
      }
    }
  }
}
```


Node target `(michael)`:
```
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 1,
    "tx_event_id": 1,
    "tx_events_count": 3,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1006",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "name": "Michael"
      }
    }
  }
}
```


Relationship `knows`:
```
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 1,
    "tx_event_id": 2,
    "tx_events_count": 3,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1007",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "1005"
    },
    "end": {
      "labels": ["Person"],
      "id": "106"
    },
    "after": {
      "properties": {
        "since": 2014
      }
    }
  }
}
```

To a more in-depth description of the Neo4j Streams project and how/why we at [LARUS](http://www.larus-ba.it/) and [Neo4j](https://neo4j.com/) built it, check out [this article]((https://medium.com/neo4j/a-new-neo4j-integration-with-apache-kafka-6099c14851d2)) that provides an in-depth description.

## Initialise Spark & Neo4j Session

In [None]:
# Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').config(
    "spark.jars.packages",
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2"
).getOrCreate()

# Neo4j
import sys
!{sys.executable} -m pip install py2neo

from py2neo import Graph

graph = Graph("bolt://neo4j:7687", auth=("neo4j", "streams"))


# Create random Social Network Data

We'll create a fake social network by using the APOC apoc.periodic.repeat procedure that executes every 15 seconds this query:

```
WITH ["M", "F", ""] AS gender
UNWIND range(1, 10) AS id
CREATE (p:Person {id: apoc.create.uuid(), name: "Name-" +  apoc.text.random(10), age: round(rand() * 100), index: id, gender: gender[toInteger(size(gender) * rand())]})
WITH collect(p) AS people
UNWIND people AS p1
UNWIND range(1, 3) AS friend
WITH p1, people[(p1.index + friend) % size(people)] AS p2
CREATE (p1)-[:KNOWS{years: round(rand() * 10), engaged: (rand() > 0.5)}]->(p2)
```

If you need more details about how the APOC project please follow this [link](https://neo4j-contrib.github.io/neo4j-apoc-procedures/)

## Execute this query if you want to clean-up the DB

In [None]:
graph.run("MATCH (n) DETACH DELETE n")

## Create an index on Person

In [None]:
graph.run("CREATE INDEX ON :Person(id)")

## Launch background job for populating data

In [None]:
graph.run("""
CALL apoc.periodic.repeat(
  'create-fake-social-data', 
  '
     WITH ["M", "F", "X"] AS gender 
     UNWIND range(1, 10) AS id 
     CREATE (p:Person {
       id: apoc.create.uuid(), 
       name: "Name-" +  apoc.text.random(10), 
       age: round(rand() * 100), 
       index: id, 
       gender: gender[toInteger(size(gender) * rand())]})
     WITH collect(p) AS people
     UNWIND people AS p1
     UNWIND range(1, 3) AS friend
     WITH p1, people[(p1.index + friend) % size(people)] AS p2
     CREATE (p1)-[:KNOWS{years: round(rand() * 10), engaged: (rand() > 0.5)}]->(p2)
  ', 15) YIELD name
RETURN name AS created
""")

## Run this query when you want to stop the background data population

In [None]:
graph.run("CALL apoc.periodic.cancel('create-fake-social-data') YIELD name RETURN name AS cancelled")

## Let's see how our data grow up

In [None]:
graph.run("MATCH p = (p1:Person)-[k:KNOWS]->(p2:Person) RETURN count(p)")

## Create a Structured Streaming Dataset that consumes the data from a "neo4j" topic (the default topic of the CDC)

In [None]:
kafkaStreamingDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9093") \
    .option("startingoffsets", "earliest") \
    .option("subscribe", "neo4j") \
    .load()

## The structure of the data, as you can see is basically a Kafka ProducerRecord representation

In [None]:
kafkaStreamingDF.printSchema()

## Let's create the Structure of the data streamed by the CDC

In [None]:
from pyspark.sql.types import StructType, StructField, MapType, ArrayType, StringType, LongType

cdcMetaSchema = StructType([
	StructField("timestamp", LongType()),
	StructField("username", StringType()),
	StructField("operation", StringType()),
	StructField("source", MapType(StringType(), StringType(), True))
])
    
cdcPayloadSchemaBeforeAfter = StructType([
	StructField("labels", ArrayType(StringType(), False)),
	StructField("properties", MapType(StringType(), StringType(), True))
])
    
cdcPayloadSchema = StructType([
	StructField("id", StringType()),
	StructField("type", StringType()),
	StructField("label", StringType()),
	StructField("start", MapType(StringType(), StringType(), True)),
	StructField("end", MapType(StringType(), StringType(), True)),
	StructField("before", cdcPayloadSchemaBeforeAfter),
	StructField("after", cdcPayloadSchemaBeforeAfter)
])
    
cdcSchema = StructType([
	StructField("meta", cdcMetaSchema),
	StructField("payload", cdcPayloadSchema)
])

## Let's extract only the CDC Dataframe

In [None]:
from pyspark.sql.functions import from_json

cdcDataFrame = kafkaStreamingDF \
    .selectExpr("CAST(value AS STRING) AS VALUE") \
    .select(from_json('VALUE', cdcSchema).alias('JSON'))

cdcDataFrame.printSchema()

## Let's perform a simple ETL query in order to extract fields of interest

In [None]:
dataWarehouseDataFrame = cdcDataFrame \
    .where("json.payload.type = 'node' and (array_contains(nvl(json.payload.after.labels, json.payload.before.labels), 'Person'))") \
    .selectExpr("json.payload.id AS neo_id", "CAST(json.meta.timestamp / 1000 AS Timestamp) AS timestamp", \
        "json.meta.source.hostname AS host", \
        "json.meta.operation AS operation", \
        "nvl(json.payload.after.labels, json.payload.before.labels) AS labels", \
        "explode(json.payload.after.properties)")
dataWarehouseDataFrame.printSchema()

## If you want debug your streaming query please execute this

In [None]:
query = dataWarehouseDataFrame \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("debug") \
    .start()

from IPython.display import display, clear_output
import time

timeout = 10
timeout_start = time.time()

while time.time() < timeout_start + timeout:
    clear_output(wait=True)
    display(spark.sql('SELECT * FROM debug').show())
    time.sleep(2)

query.stop()

## Save the data streamed from the Neo4j CDC over the Filesystem as JSON

In [None]:
writeOnDisk = dataWarehouseDataFrame \
    .writeStream \
    .format("json") \
    .option("checkpointLocation", "/home/streams/jit-dwh/checkpoint") \
    .option("path", "/home/streams/jit-dwh") \
    .queryName("nodes") \
    .start()

## Launch this paragraph if you want to stop the streaming job

In [None]:
for x in spark.streams.active:
	x.stop()