# Graph data processing with Cypher, BigQuery, and Dataproc on Google Cloud

This notebook illustrates how to:
* Build a graph from a BigQuery table
* Carry out graph processing using Cypher

### Read BigQuery table

In [1]:
!scala -version

Scala code runner version 2.12.11 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.


Because I have Scala 2.12, I'll get the BigQuery connector for 2.12

In [1]:
from pyspark.sql import SparkSession, SQLContext, Row

BQ_JAR='gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'  # CHANGE
CYPHER='org.opencypher:morpheus-spark-cypher:0.4.2'

spark = SparkSession.builder.appName("sfbus")\
   .config('spark.jars', BQ_JAR)\
   .config('spark.jars.packages', CYPHER)\
   .getOrCreate()
sc = spark.sparkContext

In [2]:
BUCKET='ai-analytics-solutions-kfpdemo'     # CHANGE
spark.conf.set('temporaryGcsBucket', BUCKET)
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [3]:
df_stops = spark.read.format('bigquery') \
  .option('table', 'bigquery-public-data:san_francisco_transit_muni.stop_times') \
  .option('filter', 'arrives_next_day = false AND dropoff_type = "regular"') \
  .load()

df_stops.printSchema()

root
 |-- stop_id: long (nullable = true)
 |-- trip_id: long (nullable = true)
 |-- stop_sequence: long (nullable = true)
 |-- arrival_time: long (nullable = true)
 |-- arrives_next_day: boolean (nullable = true)
 |-- departure_time: long (nullable = true)
 |-- departs_next_day: boolean (nullable = true)
 |-- dropoff_type: string (nullable = true)
 |-- exact_timepoint: boolean (nullable = true)



## Build Property graph

This uses [Morpheus](https://github.com/opencypher/morpheus)

In [8]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, MapType
def create_entry(stop_id, trip_id, arrival_time):
    return ("(s:Stop {id: " + str(stop_id) + "})" + 
            "<-[:CONTAINS]-" + 
            "(t:Trip {id: " + str(trip_id) + "}),\n" +
            "(s:Stop {id: " + str(stop_id) + "})" + 
            "-[:AT]->" + 
            "(t:Time {value: " + str(arrival_time) + "})")
create_entry_udf = udf(create_entry, StringType())
df_nodes = df_stops.select(create_entry_udf("stop_id", "trip_id", "arrival_time").alias("nodes"))

In [9]:
df_nodes.show()

+--------------------+
|               nodes|
+--------------------+
|(s:Stop {id: 1575...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1575...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1575...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1575...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1575...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1575...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1576...|
|(s:Stop {id: 1575...|
|(s:Stop {id: 1576...|
+--------------------+
only showing top 20 rows



In [10]:
morpheus = sc._jvm.org.opencypher.morpheus.api.MorpheusSession.local()

Py4JError: An error occurred while calling z:org.opencypher.morpheus.api.MorpheusSession.local. Trace:
py4j.Py4JException: Method local([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
	at py4j.Gateway.invoke(Gateway.java:276)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

