In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

In [0]:
spark = SparkSession.builder.appName('PySpark Read CSV').enableHiveSupport().getOrCreate()

In [0]:
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [0]:
warehouse_location = abspath('spark-warehouse')

In [0]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()


In [0]:
df=spark.sql("CREATE TABLE IF NOT EXISTS employee (user_id INT, name STRING,gender STRING,address STRING)  USING hive
             ")
             

In [0]:
df =spark.sql("select * from employee")
df.show()

+-------+------+------+-----------------+
|user_id|  name|gender|          address|
+-------+------+------+-----------------+
|      2|Ujjwal|     M|65,akashneem marg|
|      1|Anurag|     M|65,akashneem marg|
+-------+------+------+-----------------+



In [0]:
spark.sql('''insert into employee(user_id,name,gender,address) values(2,"Ujjwal","M","65,akashneem marg")''')

Out[18]: DataFrame[]

In [0]:
spark.sql("select * from employee").show()

+-------+------+------+-----------------+
|user_id|  name|gender|          address|
+-------+------+------+-----------------+
|      2|Ujjwal|     M|65,akashneem marg|
|      1|Anurag|     M|65,akashneem marg|
+-------+------+------+-----------------+



In [0]:
spark.sql("create table if not exists route(route_id INT,route_name STRING) using hive")

Out[20]: DataFrame[]

In [0]:
spark.sql('''insert into route(route_id,route_name) values(2,"R2-via gurgaon")''')

Out[23]: DataFrame[]

In [0]:
spark.sql("select * from route").show()

+--------+--------------+
|route_id|    route_name|
+--------+--------------+
|       2|R2-via gurgaon|
|       1|  R1-via delhi|
+--------+--------------+



In [0]:
employee_df=spark.sql("select * from employee")
employee_df.show()

+-------+------+------+-----------------+
|user_id|  name|gender|          address|
+-------+------+------+-----------------+
|      2|Ujjwal|     M|65,akashneem marg|
|      1|Anurag|     M|65,akashneem marg|
+-------+------+------+-----------------+



In [0]:
route_df= spark.sql("select * from route")
route_df.show()

+--------+--------------+
|route_id|    route_name|
+--------+--------------+
|       2|R2-via gurgaon|
|       1|  R1-via delhi|
+--------+--------------+



In [0]:
user_id_df= employee_df.select("user_id")
route_id_df=route_df.select("route_id")

In [0]:
route_id_df.show()
user_id_df.show()

+--------+---------+
|route_id|unique_id|
+--------+---------+
|       2|        4|
|       1|        2|
+--------+---------+

+-------+---------+
|user_id|unique_id|
+-------+---------+
|      2|        4|
|      1|        2|
+-------+---------+



In [0]:
user_id_df=user_id_df.withColumn("unique_id",user_id_df.user_id*2)
route_id_df=route_id_df.withColumn("unique_id",route_id_df.route_id*2)

In [0]:
relation_df=user_id_df.join(route_id_df,user_id_df.unique_id==route_id_df.unique_id,"inner")

In [0]:
relation_df=relation_df.drop("unique_id","unique_id")

In [0]:
relation_df.show()

+-------+--------+
|user_id|route_id|
+-------+--------+
|      1|       1|
|      2|       2|
+-------+--------+



In [0]:
employee_df.write\
  .format("org.neo4j.spark.DataSource")\
  .mode("Overwrite")\
  .option("authentication.type", "basic")\
  .option("url", "bolt://52.201.226.156:7687")\
  .option("authentication.basic.username", "neo4j")\
  .option("authentication.basic.password", "verse-ventilations-mercury")\
  .option("labels","Employee")\
  .option("node.keys","user_id")\
  .save()

In [0]:
route_df.write\
  .format("org.neo4j.spark.DataSource")\
  .mode("Overwrite")\
  .option("authentication.type", "basic")\
  .option("url", "bolt://52.201.226.156:7687")\
  .option("authentication.basic.username", "neo4j")\
  .option("authentication.basic.password", "verse-ventilations-mercury")\
  .option("labels","Route")\
  .option("node.keys","route_id")\
  .save()

In [0]:
relation_df.write\
  .format("org.neo4j.spark.DataSource")\
  .mode("Overwrite")\
  .option("authentication.type", "basic")\
  .option("url", "bolt://52.201.226.156:7687")\
  .option("authentication.basic.username", "neo4j")\
  .option("authentication.basic.password", "verse-ventilations-mercury")\
  .option("relationship", "HAS_ROUTE")\
  .option("relationship.save.strategy", "keys")\
  .option("relationship.source.labels", "Employee")\
  .option("relationship.source.node.keys", "user_id")\
  .option("relationship.source.save.mode", "overwrite")\
  .option("relationship.target.labels", "Route")\
  .option("relationship.target.node.keys", "route_id")\
  .option("relationship.target.save.mode", "overwrite")\
  .save()

In [0]:
spark.read.format("org.neo4j.spark.DataSource")\
       .option("authentication.type", "basic")\
      .option("url", "bolt://52.201.226.156:7687")\
      .option("authentication.basic.username", "neo4j")\
      .option("authentication.basic.password", "verse-ventilations-mercury")\
       .option("query","match(n:Employee)-[:HAS_ROUTE]->(m:Route) return n.name as name,n.gender as gender,n.address as address,m.route_name as route_name")\
        .load()\
        .show()

+------+------+-----------------+--------------+
|  name|gender|          address|    route_name|
+------+------+-----------------+--------------+
|Anurag|     M|65,akashneem marg|  R1-via delhi|
|Ujjwal|     M|65,akashneem marg|R2-via gurgaon|
+------+------+-----------------+--------------+

