In [None]:
# import libraries
from neo4j.v1 import GraphDatabase
from neo4j.v1.api import CypherError
import json
import asyncio
import queue
import threading
import sys

In [None]:
YELP_REVIEW_FILE = "../../yelp_dataset/yelp_academic_dataset_review_bk.json"
YELP_TIP_FILE = "../../yelp_dataset/yelp_academic_dataset_tip.json"
YELP_USER_FILE = "../../yelp_dataset/yelp_academic_dataset_user.json"
YELP_CHECKIN_FILE = "../../yelp_dataset/yelp_academic_dataset_checkin.json"
YELP_BUSINESS_FILE = "../../yelp_dataset/yelp_academic_dataset_business_final.json"

In [None]:
driver = GraphDatabase.driver("bolt://0.0.0.0:7687",auth=("neo4j", ""))

In [None]:
with driver.session() as session:
    session.run("CREATE CONSTRAINT ON (c:Category) ASSERT c.name IS UNIQUE;")

In [None]:
# cypher queries
CYPHER_CATEGORY_CONSTRAINT = '''
CREATE CONSTRAINT ON (c:Category) ASSERT c.name IS UNIQUE;
'''

CYPHER_USER_CONSTRAINT = '''
CREATE CONSTRAINT ON (u:User) ASSERT u.user_id IS UNIQUE;
'''

CYPHER_BUSINESS_CONSTRAINT = '''
CREATE CONSTRAINT ON (b:Business) ASSERT b.business_id IS UNIQUE;
'''

CYPHER_REVIEW_IMPORT_1 = '''
WITH {items} AS reviews
UNWIND reviews AS review
MERGE (b:Business {business_id: review.business_id})
'''

CYPHER_REVIEW_IMPORT_2 = '''
WITH {items} AS reviews
UNWIND reviews AS review
MERGE (u:User {user_id: review.user_id})
'''

CYPHER_REVIEW_IMPORT_3 = '''
WITH {items} AS reviews
UNWIND reviews AS review
MATCH (u:User {user_id: review.user_id})
MATCH (b:Business {business_id: review.business_id})
CREATE (r:Review {review_id: review.review_id})
SET 
    //r.text   = review.text,
    r.type   = review.type,
    r.date   = review.date, // FIXE: date format?
    r.cool   = review.cool,
    r.funny  = review.funny,
    r.stars  = review.stars,
    r.useful = review.useful
CREATE (u)-[:WROTE]->(r)
CREATE (r)-[:REVIEWS]->(b)
'''

CYPHER_BUSINESS_IMPORT = '''
WITH {items} AS businesses
UNWIND businesses AS business
CREATE (b:Business {business_id: business.business_id})
SET b.address = business.address,
              b.lat     = business.latitude,
              b.lon     = business.longitude,
              b.name    = business.name,
              b.city    = business.city,
              b.postal_code = business.postal_code,
              b.state = business.state,
              b.review_count = business.review_count,
              b.stars = business.stars,
              // FIXME: inconsistent attributes data type
              b.bike_parking = CASE WHEN "BikeParking: True" IN business.attributes THEN True ELSE False END,
              //b.accepts_bitcoin = business.attributes.BusinessAcceptsBitcoin,
              //b.accepts_credit_cards = business.attributes.BusinessAcceptsCreditCards,
              b.garage_parking = CASE WHEN "garage: True" IN business.attributes THEN True ELSE False END,
              b.street_parking = CASE WHEN "street: True" IN business.attributes THEN True ELSE False END,
              b.validated_parking = CASE WHEN "validated: True" IN business.attributes THEN True ELSE False END,
              b.lot_parking = CASE WHEN "lot: True" IN business.attributes THEN True ELSE False END,
              b.valet_parking = CASE WHEN "valet: True" IN business.attributes THEN True ELSE False END,
              b.is_open = CASE WHEN business.open = 1 THEN True ELSE False END,
              b.neighborhood = business.neighborhood
WITH *
UNWIND business.categories AS cat
MERGE (c:Category {name: cat})
MERGE (b)-[:IN_CATEGORY]->(c)
'''

CYPHER_USER_IMPORT = '''
WITH {items} AS users
UNWIND users AS user
CREATE (u:User {user_id: user.user_id})
SET u.name               = user.name,
    u.type               = user.type,
    u.useful             = user.useful,
    u.yelping_since      = user.yelping_since, //FIXME: consistent date format
    u.funny              = user.funny,
    u.review_count       = user.review_count,
    u.average_stars      = user.average_stars,
    u.fans               = user.fans,
    u.compliment_cool    = user.compliment_cool,
    u.compliment_cute    = user.compliment_cute,
    u.compliment_funny   = user.compliment_funny,
    u.compliment_hot     = user.compliment_hot,
    u.compliment_list    = user.compliment_list,
    u.compliment_more    = user.compliment_more,
    u.compliment_note    = user.compliment_note,
    u.compliment_photos  = user.compliment_photos,
    u.compliment_plain   = user.compliment_plain,
    u.compliment_profile = user.compliment_profile,
    u.compliment_writer  = user.compliment_writer,
    u.cool               = user.cool
'''

CYPHER_USER_FRIEND_IMPORT = '''
WITH {items} AS users
UNWIND users AS user
MATCH (u:User {user_id: user.user_id})
UNWIND user.friends AS friend
    MATCH (f:User {user_id: friend})
    CREATE (u)-[:FRIENDS]->(f)
'''

CYPHER_TIP_IMPORT = '''
WITH {items} AS tips
UNWIND tips AS tip
MERGE (u:User {user_id: tip.user_id})
MERGE (b:Business {business_id: tip.business_id})
CREATE (u)-[t:TIP]->(b)
SET t.date  = tip.date, // FIXME: consistent date format
    t.text  = tip.text,
    t.likes = tip.likes,
    t.type  = tip.type
'''

CYPHER_CHECKIN_IMPORT = '''
#TODO: WRITE ME
'''

In [None]:
def run_query():
    while True:
        parts = q.get()
        with driver.session() as session:
            try:
                session.run(parts[0], parameters=parts[1]).consume()
                q.task_done()
            except CypherError as e:
                print("------------------------------------------------------")
                print(e.message)
                print(parts[0])
                
                if parts[2] < 2:
                    q.put((parts[0], parts[1], parts[2] + 1))
                else:
                    print("&&&&&& ADDING TO SINGLE THREADED QUEUE &&&&&&&")
                    single_threaded_q.put((parts[0], parts[1], parts[2] + 1))
                print("------------------------------------------------------")
                q.task_done()
                
def run_single_threaded():
    while True:
        parts = single_threaded_q.get()
        with driver.session() as session:
            try:
                session.run(parts[0], parameters=parts[1]).consume()
                single_threaded_q.task_done()
            except CypherError as e:
                print("***********************************")
                print(e.message)
                print(parts[0])
                print("***********************************")

In [None]:
def import_all():
    parts = [
        #{'datafile': YELP_USER_FILE, 'cypher': CYPHER_USER_IMPORT, 'size': 40000},
        #{'datafile': None, 'cypher': CYPHER_USER_CONSTRAINT, 'size': 1},
        #{'datafile': YELP_USER_FILE, 'cypher': CYPHER_USER_FRIEND_IMPORT, 'size': 10000},
        #{'datafile': None, 'cypher': CYPHER_CATEGORY_CONSTRAINT, 'size': 1},
        #{'datafile': YELP_BUSINESS_FILE, 'cypher': CYPHER_BUSINESS_IMPORT, 'size': 20000},
        #{'datafile': None, 'cypher': CYPHER_BUSINESS_CONSTRAINT, 'size': 1},
        #{'datafile': YELP_REVIEW_FILE, 'cypher': CYPHER_REVIEW_IMPORT_1},
        #{'datafile': YELP_REVIEW_FILE, 'cypher': CYPHER_REVIEW_IMPORT_2},
        {'datafile': YELP_REVIEW_FILE, 'cypher': CYPHER_REVIEW_IMPORT_3, 'size': 20000},
        #{'datafile': YELP_TIP_FILE, 'cypher': CYPHER_TIP_IMPORT},
        #{'datafile': YELP_CHECKIN_FILE, 'cypher': CYPHER_CHECKIN_IMPORT}
    ]

    for part in parts:
        if part['datafile']:
            with open(part['datafile'], "r") as file:
                items = []
                count = 0
                for line in file:
                    items.append(json.loads(line))
                    count +=1
                    if count > part['size']:
                        # write to graph
                        q.put((part['cypher'], {'items': items}, 0))
                        items = []
                        count = 0
                q.put((part['cypher'], {'items': items}, 0))
                q.join()
        else:
            with driver.session() as session:
                session.run(part['cypher']).consume()
            

In [None]:
%%timeit -n1
concurrent = 6

# init the work queue
global q 
q = queue.LifoQueue(500)

global single_threaded_q
single_threaded_q = queue.LifoQueue()




for i in range(concurrent):
    t = threading.Thread(target=run_query)
    t.daemon = True
    t.start()
try:
    import_all()
    #q.join()
except:
    pass


t = threading.Thread(target=run_single_threaded)
t.daemon = True
single_threaded_q.join()