In [0]:
%pip install neo4j

In [0]:
from neo4j import *
import json

In [0]:
#/Workspace/Users/mugunthan2783@gmail.com/Neo4j/Neo4j_credentials.json
with open("/Workspace/Users/mugunthan2783@gmail.com/Neo4j/Neo4j_credentials.json") as f:
  credentials = json.load(f)
uri = credentials['uri']
user = credentials['username']
password = credentials['password']

In [0]:
driver = GraphDatabase.driver(uri, auth=(user, password))

try:
    driver.verify_connectivity()
    print("Connection successful")
except Exception as e:
    print(f"Connection failed: {e}")
session=driver.session()

In [0]:
from pyspark.sql import *

spark = SparkSession.getActiveSession()
if spark is None:
    spark = SparkSession.builder.getOrCreate()

In [0]:
product_schema_query = "CREATE CONSTRAINT unique_product FOR (n:product) REQUIRE n.product_id IS UNIQUE"
session.run(product_schema_query)

customers_schema_query = "CREATE CONSTRAINT unique_customer FOR (n:customer) REQUIRE n.customer_id IS UNIQUE"
session.run(customers_schema_query)

sales_schema_query = "CREATE CONSTRAINT unique_sale FOR ()-[r:brought]->() REQUIRE (r.sales_id) IS UNIQUE"
session.run(sales_schema_query)

In [0]:

def load_data(spark,t_name):
    return spark.table(t_name).collect()

#Customer data
customer_data=load_data(spark,'pro_customers')
print("Customer data :-\n\tType : ",type(customer_data),"\n\tTotal records : ",len(customer_data),"\n\tSample data : ",customer_data[0])
#Product data
product_data=load_data(spark,'pro_products')
print("\nProduct data :-\n\tType : ",type(product_data),"\n\tTotal records : ",len(product_data),"\n\tSample data : ",product_data[0])
#Sales data 
sales_data=load_data(spark,'pro_sales')
print("\nSales data :-\n\tType : ",type(sales_data),"\n\tTotal records : ",len(sales_data),"\n\tSample data : ",sales_data[0])


In [0]:
create_customer_query='''CREATE (n:customer 
            {customer_id:$customer_id,
            customer_name:$customer_name,
            customer_age:$customer_age,
            customer_address:$customer_address,
            customer_city:$customer_city,
            customer_state:$customer_state,
            customer_zip:$customer_zip
            })
            return n'''

create_product_query='''CREATE (n:product 
            {product_id:$product_id,
            product_type:$product_type,
            product_name:$product_name,
            colour:$colour,
            price:$price,
            quantity:$quantity,
            description:$description
            })
            return n'''

create_relationship_query='''MATCH (c:customer),(p:product) 
            WHERE c.customer_id=$customer_id AND p.product_id=$product_id 
            CREATE (c)-[r:brought 
            {sales_id:$sales_id,
            customer_id:$customer_id,
            product_id:$product_id,
            quantity:$quantity,
            price_per_unit:$price_per_unit, 
            total_price:$total_price
            }]->(p)
            return properties(r)'''

create_queries=[create_customer_query,create_product_query,create_relationship_query]

In [0]:
#Row(customer_id=378, customer_name='Andy McPhillimey', gender='Bigender', age=48, home_address='40 Rose CircuitSuite 567', zip_code=7541, city='Abigailchester', state='New South Wales')
for row in customer_data:
    parameter={
        'customer_id':row['customer_id'],
        'customer_name':row['customer_name'],
        'customer_age':row['age'],
        'customer_address':row['home_address'],
        'customer_city':row['city'],
        'customer_state':row['state'],
        'customer_zip':row['zip_code']
    }
    #print(parameter)
    session.run(create_customer_query,parameter)
    #break

In [0]:
#Row(product_id=583, product_type='Jacket', product_name='Bomber', colour='blue', price=90, quantity=64, description='A blue coloured, L sized, Bomber Jacket')
for row in product_data:
    parameter={
        'product_id':row['product_id'],
        'product_type':row['product_type'],
        'product_name':row['product_name'],
        'colour':row['colour'],
        'price':row['price'],
        'quantity':row['quantity'],
        'description':row['description']
    }
    #print(parameter)
    session.run(create_product_query,parameter)
    #break

In [0]:
# Row(sales_id=0, product_id=218, customer_id=64, price_per_unit=106, quantity=2, total_price=212)
for row in sales_data:
    parameter={
        'sales_id':row['sales_id'],
        'customer_id':row['customer_id'],
        'product_id':row['product_id'],
        'quantity':row['quantity'],
        'price_per_unit':row['price_per_unit'],
        'total_price':row['total_price']
    }
    #print(parameter)
    session.run(create_relationship_query,parameter)
    #break

In [0]:
query1='''MATCH (n:customer)-[r:brought {price_per_unit:90}]-> (m:product) order by n.customer_age return n limit 2'''   # .customer_age RETURN n.customer_name,n.customer_age,n.customer_city,m.product_name
# df=session.run(query1).data()
# print(df)

query2='''MATCH (n:customer)-[r:brought {price_per_unit:90}]-> (m:product) order by n.customer_age return n.customer_name,n.customer_age,n.customer_city limit 2'''   # .customer_age RETURN n.customer_name,n.customer_age,n.customer_city,m.product_name
# df=session.run(query2).data()
# print('\n\n',df)

query3='''MATCH (n:customer)-[r:brought {price_per_unit:90}]-> (m:product) order by n.customer_age return m limit 2'''   # .customer_age RETURN n.customer_name,n.customer_age,n.customer_city,m.product_name
# df=session.run(query3).data()
# print('\n\n',df[0])

query4='''MATCH (n:customer)-[r:brought {price_per_unit:90}]-> (m:product) order by n.customer_age return n,m limit 2'''   # .customer_age RETURN n.customer_name,n.customer_age,n.customer_city,m.product_name
# df=session.run(query4).data()
# print('\n\n',df)

query5='''MATCH (n:customer)-[r:brought {price_per_unit:90}]-> (m:product) order by n.customer_age return properties(n) AS customer, properties(r) AS sale, properties(m) AS product limit 2'''   # .customer_age RETURN n.customer_name,n.customer_age,n.customer_city,m.product_name
# df=session.run(query5).data()
# print('\n\n',df)

In [0]:
def to_dataframe(spark,temp):
    df=[]
    split=False
    for record in temp:
        for k,v in record.items():
            if(isinstance(v,dict) and len(record)==1):
                df.append(v)
            else:
                if(len(record)>1):
                    split=True
                df.append(record)
                break
    return spark.createDataFrame(df)

def result(spark,session,query):
    temp=session.run(query).data()
    return to_dataframe(spark,temp)
    
res=result(spark,session,query1)
#res=spark.createDataFrame(res)
res.show()

Create Nodes/Relationship in the DB

In [0]:
get_customer_id=lambda session:session.run("match (n:customer) return max(n.customer_id)+1 as  id").data()[0]['id']
get_product_id=lambda session:session.run("match (n:product) return max(n.product_id)+1 as  id").data()[0]['id']
get_sales_id=lambda session:session.run("match (n)-[r]->(m) return max(r.sales_id)+1 as  id").data()[0]['id']

In [0]:
def get_customer_params(session):
    print("Enter customer details")
    customer_id=get_customer_id(session)
    customer_name=input("Enter customer name : ")
    customer_age=int(input("Enter customer age : "))
    customer_address=input("Enter customer address : ")
    customer_city=input("Enter customer city : ")
    customer_state=input("Enter customer state : ")
    customer_zip=int(input("Enter customer zip : "))
    return {'customer_id':customer_id,
            'customer_name':customer_name,
            'customer_age':customer_age,
            'customer_address':customer_address,
            'customer_city':customer_city,
            'customer_state':customer_state,
            'customer_zip':customer_zip}

def get_product_params(session):
    print("Enter product details")
    product_id=get_product_id(session)
    product_type=input("Enter product type : ")
    product_name=input("Enter product name : ")
    colour=input("Enter product colour : ")
    price=int(input("Enter product price : "))
    quantity=int(input("Enter product quantity : "))
    description=input("Enter product description : ")
    return {'product_id':product_id,
            'product_type':product_type,
            'product_name':product_name,
            'colour':colour,
            'price':price,
            'quantity':quantity,
            'description':description}

def get_sale_params(session):
    print("Enter sale details")
    sales_id=get_sales_id(session)
    customer_id=int(input("Enter customer id : "))
    product_id=int(input("Enter product id : "))
    quantity=int(input("Enter quantity : "))
    price_per_unit=int(input("Enter price per unit : "))
    return {'sales_id':sales_id,
            'customer_id':customer_id,
            'product_id':product_id,
            'quantity':quantity,
            'price_per_unit':price_per_unit,
            'total_price':quantity*price_per_unit}


In [0]:
def create_node_relationship(spark,session,queries):
    print("1.Customer\n2.Product\n3.Relationship")
    err=False
    choice=int(input("Create what? : "))
    if(choice==1):
        #parameter=get_customer_params()
        data=session.run(queries[0],get_customer_params(session)).data()
    elif(choice==2):
        #parameter=get_product_params()
        data=session.run(queries[1],get_product_params(session)).data()
    elif(choice==3):
        #parameter=get_sale_params()
        data=session.run(queries[2],get_sale_params(session)).data()
    else:
        print("Invalid Choice.")
        err=True

    if(not err):
        print("Node/Relationship created successfully.")
        # return data
        data=to_dataframe(spark,data)
        data.show()
        
create_node_relationship(spark,session,create_queries)

Print operations

In [0]:
customer_preview = lambda session, cust_id: session.run(
    "MATCH (n:customer {customer_id: $customer_id}) RETURN n", 
    customer_id=cust_id
).data()[0]['n']
customer_preview(session,1001)

product_preview = lambda session, pro_id: session.run(
    "MATCH (p:product {product_id: $product_id}) RETURN  p",
    product_id=pro_id
).data()[0]['p']
product_preview(session,1259)

sales_preview = lambda session, sales_id: session.run(
    "MATCH (n:customer)-[r:brought {sales_id:$sal_id}]->(m:product) return properties(r) as p ",
    sal_id=sales_id
).data()[0]['p']
sales_preview(session,4993)

Delete Operation

In [0]:
delete_customer_query='''match (n:customer {customer_id:$customer_id}) delete n'''
delete_product_query='''match (n:product {product_id:$product_id}) delete n '''
delete_sale_query='''match (n:customer)-[r:brought {sales_id:$sales_id}]->(m:product) delete r'''
delete_queries=[delete_customer_query,delete_product_query,delete_sale_query]

In [0]:
confirmation=lambda: input("Are u sure?(y/n)")
confirmation()

In [0]:
def delete_Node_Relation(spark,session,query):
  print("1.Customer\n2.Product\n3.Relationship")
  choice=int(input("Delete what? : "))
  del1=True
  if(choice==1):
    cust_id=int(input("Enter customer id : "))
    print(customer_preview(session,cust_id))
    if(confirmation()=='y'):
      session.run(query[0],{'customer_id':cust_id})
    else:
      del1=False
  elif(choice==2):
    pro_id=int(input("Enter product id : "))
    print(product_preview(session,pro_id))
    if(confirmation()=='y'):
      session.run(query[1],{'product_id':pro_id})
    else:
      del1=False
  elif(choice==3):
    sales_id=int(input("Enter sales id : "))
    print(sales_preview(session,sales_id))
    if(confirmation()=='y'):
      session.run(query[2],{'sales_id':sales_id})
    else:
      del1=False  
  else:
    print("Invalid Choice.")
    return
  
  if(del1):
    print("Node/Relationship deleted successfully.")
  else:
    print("Not deleted.")


delete_Node_Relation(spark,session,delete_queries)

Update Operation

In [0]:
update_customer_query='''match (n:customer {customer_id:$customer_id}) set n.customer_name=$customer_name,n.customer_age=$customer_age return n'''
update_product_query='''match (n:product {product_id:$product_id}) set n.product_type=$product_type,n.product_name=$product_name,n.colour=$colour return n'''
update_queries=[update_customer_query,update_product_query]

In [0]:
def get_customer_params():
    print("Enter customer details")
    customer_id=int(input("Enter customer id : "))
    customer_name=input("Enter customer name : ")
    customer_age=int(input("Enter customer age : "))
    return {'customer_id':customer_id,
            'customer_name':customer_name,
            'customer_age':customer_age}

def get_product_params():
    print("Enter product details")
    product_id=int(input("Enter product id : "))    
    product_type=input("Enter product type : ")
    product_name=input("Enter product name : ")
    colour=input("Enter product colour : ")
    return {'product_id':product_id,
            'product_type':product_type,
            'product_name':product_name,
            'colour':colour}

def update_Node_Relation(spark,session,query):
  print("1.Customer\n2.Product")
  choice=int(input("Update what? : "))
  if(choice==1):
    data=session.run(query[0],get_customer_params()).data()
  elif(choice==2):
    data=session.run(query[1],get_product_params()).data()
  else:
    print("Invalid Choice.")
    return
  
  print("Node/Relationship updated successfully.")
  data=to_dataframe(spark,data)
  data.show()

update_Node_Relation(spark,session,update_queries)

In [0]:
session.close()
driver.close()