## Preparing

In [1]:
# Necessary libraries.
from py2neo import Graph
import reverse_geocoder as rg
import json, csv, uuid, os, regex as re, subprocess, datetime, numpy as np, pandas as pd
from typing import List, Tuple, Set
from pathlib import Path
from time import sleep
from neo4j.exceptions import ServiceUnavailable, TransientError
import shutil

In [2]:
# Const.
IN_CATEGORY = "IN_CATEGORY"
IN_CITY = "IN_CITY"
IN_AREA = "IN_AREA"
IN_COUNTRY = "IN_COUNTRY"
FRIENDS = "FRIENDS"
REVIEWS = "REVIEWS"
WROTE = "WROTE"
BUSINESS_NODE = "Business"
USER_NODE = "User"
REVIEW_NODE = "Review"
CATEGORY_NODE = "Category"
CITY_NODE = "City"
AREA_NODE = "Area"
COUNTRY_NODE = "Country"

In [3]:
# Important locations.
data_folder = r"./data"
business_json_file = data_folder + "/yelp_academic_dataset_business.json"
review_json_file = data_folder + "/yelp_academic_dataset_review.json"
user_json_file = data_folder + "/yelp_academic_dataset_user.json"
fixed_business_json_file = data_folder + "/fixed_yelp_academic_dataset_business.json"
fixed_review_json_file = data_folder + "/fixed_yelp_academic_dataset_review.json"
fixed_user_json_file = data_folder + "/fixed_yelp_academic_dataset_user.json"
list_raw_files = [business_json_file, review_json_file, user_json_file]
list_fixed_data = [fixed_business_json_file, fixed_review_json_file, fixed_user_json_file]

In [4]:
# Csv files for Import Tool.
business_nodes_csv_file = "business_nodes.csv"
category_nodes_csv_file = "category_nodes.csv"
city_nodes_csv_file = "city_nodes.csv"
area_nodes_csv_file = "area_nodes.csv"
country_nodes_csv_file = "country_nodes.csv"
user_nodes_csv_file = "user_nodes.csv"
review_nodes_csv_file = "review_nodes.csv"
relationship_csv_file = "relationships.csv"
nodes_files = [business_nodes_csv_file, category_nodes_csv_file, city_nodes_csv_file, area_nodes_csv_file, country_nodes_csv_file, user_nodes_csv_file, review_nodes_csv_file]

In [5]:
# Connection to Neo4j Db.
graph_name = "neo4j" # Default for 4.0: neo4j; default for 3.5: graph.db
SERVER_ADDRESS = "bolt://localhost:7687"
SERVER_AUTH = ("neo4j", "12345")
graph = Graph(SERVER_ADDRESS, auth=SERVER_AUTH)

In [6]:
neo4j_home = graph.service.config["dbms.directories.neo4j_home"]

In [7]:
def delete_files(files: List[str]):
    for one_file in files:
        one_path = Path(one_file)
        if one_path.is_file():
            one_path.unlink()

In [8]:
if not all(list(map(lambda x: Path(x).is_file(), list_raw_files))):
    raise Exception(f'Not all Yelp raw files are available. Need: {list_raw_files}')

## Preprocessing Data

**Data Problems:**  
1. Ids are unique only in the specific domain. E.g. `business_id` in Business `oiAlXZPIFm2nBCt0DHLu_Q` is identical to that `user_id` in User.
When using Neo4j's Import Tool, the relationship csv defines "from-node" and "to-node", identified 
by unique ids. Thus, the entities' ids, regardless of Business or User or Review must be unique.
2. Users have friends, but not all the friends exist. E.g. Unknown friend: `oeMvJh94PiGQnx_6GlndPQ` for User `ntlvfPzc8eglqvk92iDIAw`. Those unknown friends must be removed.
3. Some Business contain duplicate categories, e.g. Business with id `HEGy1__jKyMhkhXRW3O1ZQ` has duplicated `Gas Stations`. These must be deduplicated.
4. In Users, `friends` is supposed to be an array with string elements. But in reality, it is a string in which friend_ids are concatenated with commas. Same problem with `categories` field in Business.

In [9]:
# Fix data problems.
def remove_unknown_friends(raw_path: str, output_path: str) -> None:
    user_ids = set()
    with open(raw_path, "r", encoding="utf-8") as rf:
        for line in rf:
            if len(line.strip()) > 0:
                json_node = json.loads(line)
                user_ids.add(json_node["user_id"])
            
    with open(raw_path, "r", encoding="utf-8") as rf, open(output_path, mode="w", encoding="utf-8") as of:
        for line in rf:
            if len(line.strip()) > 0:
                json_node = json.loads(line)
                friends_str = json_node["friends"]
                if friends_str and len(friends_str.strip()) > 0:
                    friends_arr = re.split(r"\s*,\s*", friends_str.strip())
                    friends_exist_arr = list(filter(lambda x: x in user_ids, friends_arr))
                    json_node["friends"] = ', '.join(friends_exist_arr)
                    of.write(f'{json.dumps(json_node)}\n')

                    
def make_ids_unique(input_path: str, output_path: str, func_to_modify) -> None:
    temp_output_file = str(uuid.uuid4())
    with open(input_path, mode="r", encoding="utf-8") as in_f, open(temp_output_file, mode="w", encoding="utf-8") as ou_f:
        for line in in_f:
            if len(line.strip())>0:
                json_node = json.loads(line)
                json_node = func_to_modify(json_node)
                ou_f.write(f'{json.dumps(json_node)}\n')
    # Rename the file.
    os.replace(temp_output_file, output_path)

In [10]:
# Check if the fixed files are already available.
if not all(list(map(lambda x: Path(x).is_file(), list_fixed_data))):
    # Remove everything first.
    delete_files(list_fixed_data)
    # Re-create the fixed files.
    def fix_user(json_node):
        json_node["user_id"] = "u-" + json_node["user_id"]
        friends_str = json_node["friends"]
        if friends_str and len(friends_str.strip()) > 0:
            friends_arr = re.split(r"\s*,\s*", friends_str.strip())
            friends_arr = list(map(lambda x: "u-" + x, friends_arr))
            json_node["friends"] = ', '.join(friends_arr)
        return json_node
    def fix_business(json_node):
        json_node["business_id"] = "b-" + json_node["business_id"]
        if json_node["categories"] and len(json_node["categories"].strip())>0:
            categories_arr = re.split("\s*,\s*", json_node["categories"].strip())
            categories_set = set(categories_arr)
            if len(categories_set) < len(categories_arr):
                json_node["categories"] = ', '.join(categories_set)
        return json_node
    def fix_review(json_node):
        json_node["review_id"] = "r-" + json_node["review_id"]
        json_node["user_id"] = "u-" + json_node["user_id"]
        json_node["business_id"] = "b-" + json_node["business_id"]
        return json_node
    
    # Start fixing data problems.
    remove_unknown_friends(user_json_file, fixed_user_json_file)
    make_ids_unique(fixed_user_json_file, fixed_user_json_file, fix_user)
    make_ids_unique(business_json_file, fixed_business_json_file, fix_business)
    make_ids_unique(review_json_file, fixed_review_json_file, fix_review)

## Importing Data

### Generating inputs for Import Tool

In [11]:
if not all(list(map(lambda x: Path(x).is_file(), nodes_files))) or not Path(relationship_csv_file).is_file():
    # Delete everything.
    delete_files(nodes_files)
    delete_files([relationship_csv_file])
    # Using sets to ensures uniqueness.
    business_lat_lon = {}
    area_nodes: Set[Tuple[str, str]] = set()
    city_nodes: Set[Tuple[str, str]]= set()
    country_nodes: Set[str] = set()
    categories_nodes: Set[str] = set()
    in_area_relationships: Set[Tuple[str, str, str]] = set()
    in_country_relationships: Set[Tuple[str, str, str]] = set()
    # Relationship writer.
    relationship_csv = open(relationship_csv_file, mode="w", encoding="utf-8", newline="\n")
    relationship_fieldnames = [":START_ID", ":END_ID", ":TYPE"]
    relationship_writer = csv.DictWriter(relationship_csv, fieldnames=relationship_fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
    relationship_writer.writeheader()
    
    # Func to write Business Nodes to file.
    def write_business_nodes_to_file():
        with open(fixed_business_json_file, "r", encoding="utf-8") as bjf, open(business_nodes_csv_file, mode="w", encoding="utf-8", newline="\n") as business_nodes_csv:
            fieldnames = ["business_id:ID", "name", "address", ":LABEL"]
            writer = csv.DictWriter(business_nodes_csv, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writeheader()
            # File too large, read line by line.
            for line in bjf:
                line = line.strip()
                if len(line)>0:
                    json_node = json.loads(line)
                    writer.writerow({k:v for k, v in zip(fieldnames, [json_node["business_id"], json_node["name"], json_node["address"], BUSINESS_NODE])})
                    business_lat_lon[json_node["business_id"]] = (json_node["latitude"], json_node["longitude"])
                    if json_node["categories"] and len(json_node["categories"].strip())>0: # can be None (e.g. for Business Id: 2W1tLg8ybRUEKMPoAPHTsQ)
                        cur_categories = list(filter(lambda x: len(x)>0, map(lambda x: x.strip(), re.split("\s*,\s*", json_node["categories"].strip()))))
                        categories_nodes.update(cur_categories)
                        for category in cur_categories:
                            relationship_writer.writerow({k:v for k, v in zip(relationship_fieldnames, [json_node["business_id"], category, IN_CATEGORY])})
    
    # Func to write Category Nodes to file.
    def write_category_nodes_to_file():
        with open(category_nodes_csv_file, mode="w", encoding="utf-8", newline="\n") as category_nodes_csv:
            fieldnames = ["category_id:ID", ":LABEL"]
            writer = csv.DictWriter(category_nodes_csv, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writeheader()
            for category_id in categories_nodes:
                writer.writerow({k:v for k, v in zip(fieldnames, [category_id, CATEGORY_NODE])})
                
    # Func to make City, Area, Country Nodes.
    def make_city_area_country_nodes():
        lat_lons = list(business_lat_lon.values())
        city_state_countries = rg.search(lat_lons)
        # Process City, Area, Country Nodes; save IN_CITY relationship to file.
        for (business_id, city_state_country) in list(zip(list(business_lat_lon.keys()), city_state_countries)):
            city, state, country = city_state_country["name"], city_state_country["admin1"], city_state_country["cc"]
            unique_state = f'{state}-{country}'
            unique_city = f'{city}-{state}-{country}'
            country_nodes.add(country)
            area_nodes.add((unique_state, state))
            city_nodes.add((unique_city, city))
            # Create corresponding relationships.
            relationship_writer.writerow({k:v for k, v in zip(relationship_fieldnames, [business_id, unique_city, IN_CITY])})
            in_area_relationships.add((unique_city, unique_state, IN_AREA))
            in_country_relationships.add((unique_state, country, IN_COUNTRY))
    
    # Func to write City, Area, Country Nodes to file.
    def write_city_area_country_nodes_to_file():
        with open(city_nodes_csv_file, mode="w", encoding="utf-8", newline="\n") as city_nodes_csv:
            fieldnames = ["city_id:ID", "name", ":LABEL"]
            writer = csv.DictWriter(city_nodes_csv, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writeheader()
            for (city_id, city_name) in city_nodes:
                writer.writerow({k:v for k, v in zip(fieldnames, [city_id, city_name, CITY_NODE])})

        with open(area_nodes_csv_file, mode="w", encoding="utf-8", newline="\n") as area_nodes_csv:
            fieldnames = ["area_id:ID", "name", ":LABEL"]
            writer = csv.DictWriter(area_nodes_csv, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writeheader()
            for (area_id, area_name) in area_nodes:
                writer.writerow({k:v for k, v in zip(fieldnames, [area_id, area_name, AREA_NODE])})

        with open(country_nodes_csv_file, mode="w", encoding="utf-8", newline="\n") as country_nodes_csv:
            fieldnames = ["country_id:ID", ":LABEL"]
            writer = csv.DictWriter(country_nodes_csv, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writeheader()
            for country_id in country_nodes:
                writer.writerow({k:v for k, v in zip(fieldnames, [country_id, COUNTRY_NODE])})
                
        for (u1, u2, rel_type) in in_area_relationships:
            relationship_writer.writerow({k:v for k, v in zip(relationship_fieldnames, [u1, u2, rel_type])})

        for (u1, u2, rel_type) in in_country_relationships:
            relationship_writer.writerow({k:v for k, v in zip(relationship_fieldnames, [u1, u2, rel_type])})
    
    # Func to write User nodes to file.
    def write_user_nodes_to_file():
        friend_relationships: Set[Tuple[str, str, str]] = set()
        with open(fixed_user_json_file, "r", encoding="utf-8") as ujf, open(user_nodes_csv_file, mode="w", encoding="utf-8", newline="\n") as user_nodes_csv:
            fieldnames = ["user_id:ID", "name", "yelping_since", ":LABEL"]
            writer = csv.DictWriter(user_nodes_csv, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writeheader()
            # File too large, read line by line.
            for line in ujf:
                line = line.strip()
                if len(line)>0:
                    json_node = json.loads(line)
                    writer.writerow({k:v for k, v in zip(fieldnames, [json_node["user_id"], json_node["name"], json_node["yelping_since"], USER_NODE])})
                    if json_node["friends"] and len(json_node["friends"].strip()) > 0:
                        friends_arr = re.split("\s*,\s*", json_node["friends"].strip())
                        for friend_id in friends_arr:
                            # Prevent duplicate friend relationship later!
                            f1 = min(json_node["user_id"], friend_id)
                            f2 = max(json_node["user_id"], friend_id)
                            friend_relationships.add((f1, f2, FRIENDS))
        
        for (f1, f2, rel_type) in friend_relationships:
            relationship_writer.writerow({k:v for k, v in zip(relationship_fieldnames, [f1, f2, rel_type])})
    
    # Func to write Review nodes to file.
    def write_review_nodes_to_file():
        with open(fixed_review_json_file, "r", encoding="utf-8") as rjf, open(review_nodes_csv_file, mode="w", encoding="utf-8", newline="\n") as review_nodes_csv:
            fieldnames = ["review_id:ID", "stars", "date", "text", ":LABEL"]
            writer = csv.DictWriter(review_nodes_csv, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writeheader()
            # File too large, read line by line.
            for line in rjf:
                line = line.strip()
                if len(line)>0:
                    json_node = json.loads(line)
                    writer.writerow({k:v for k, v in zip(fieldnames, [json_node["review_id"], json_node["stars"], json_node["date"], json_node["text"], REVIEW_NODE])})
                    relationship_writer.writerow({k:v for k, v in zip(relationship_fieldnames, [json_node["user_id"], json_node["review_id"], WROTE])})
                    relationship_writer.writerow({k:v for k, v in zip(relationship_fieldnames, [json_node["review_id"], json_node["business_id"], REVIEWS])})
    
    # Start re-creating CSV files for Import Tool.
    write_business_nodes_to_file()
    write_category_nodes_to_file()
    del categories_nodes
    make_city_area_country_nodes()
    write_city_area_country_nodes_to_file()
    del business_lat_lon, city_nodes, area_nodes, country_nodes, in_area_relationships, in_country_relationships
    write_user_nodes_to_file()
    write_review_nodes_to_file()
    
    relationship_csv.close()

Loading formatted geocoded file...


### Checking Data Integrity

In [12]:
num_of_nodes = 0

# Func to if nodes are unique and relationships are also unique.
def check_nodes_relationships_csv_files_integrity():
    global num_of_nodes
    # Nodes files.
    for one_node_file in nodes_files:
        temp_df = pd.read_csv(one_node_file, header = "infer", sep = ",", encoding = "utf-8")
        if len(temp_df.iloc[:, 0]) != len(np.unique(temp_df.iloc[:, 0].values)):
            raise Exception(f'Nodes in [{one_node_file}]: not unique')
        num_of_nodes += len(temp_df.iloc[:, 0])
    # Relationship file.
    temp_df = pd.read_csv(relationship_csv_file, header = "infer", sep = ",", encoding = "utf-8")
    rels = temp_df.iloc[:, 0] + temp_df.iloc[:, 1]
    if len(rels) != len(np.unique(rels.values)):
        raise Exception(f'Relationships in [{relationship_csv_file}]: not unique')

In [13]:
check_nodes_relationships_csv_files_integrity()

### Importing to Neo4j

#### Reseting the database and importing data

The quickest/cleanest/safest way is:  
1. Stop the Neo4j's Database Service.  
2. Remove the neo4j's Database's Data folder.  
3. Execute the Import Tool.  
4. Start the Neo4j's Database Service.  

**Important Note**: Users must have privileges to start/stop Windows services. If needing elevating, *a Windows popup will
appear, asking for the permission*.

In [14]:
# Func to control Neo4j's Database Service.
def command_neo4j_database_service(cmd: str):
    neo4j_cmd = neo4j_home + r"\bin\neo4j.bat"
    if cmd in ["stop", "start"]:
        cmd_res = subprocess.run([neo4j_cmd, cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        cmd_stdout = str(cmd_res.stdout, "utf-8")
        if cmd == "stop" and "stopped" not in cmd_stdout:
            raise Exception(f"Can't stop Neo4j's Database Service [{cmd_res.stderr}]")
        elif cmd == "start" and "started" not in cmd_stdout:
            raise Exception(f"Can't start Neo4j's Database Service [{cmd_res.stderr}]")
    else:
        raise Exception(f'Unknown command for Neo4j\'s Database Service: [{cmd}]')

In [15]:
# Func to remove the Database's Data folder.
def reset_neo4j_database():
    neo4j_graph_folder = rf"{neo4j_home}\data\databases\{graph_name}"
    neo4j_trans_folder = rf"{neo4j_home}\data\transactions\{graph_name}"
    if not Path(neo4j_graph_folder).is_dir():
        raise Exception("Can't find Neo4j's Database's Data folder")
    shutil.rmtree(neo4j_graph_folder)
    shutil.rmtree(neo4j_trans_folder)

In [16]:
# Func to import data into Neo4j's Database.
def import_data():
    import_tool_cmd = neo4j_home + r"\bin\neo4j-admin.bat"
    arguments = ["import", "--multiline-fields=true"] + list(map(lambda x: f'--nodes={x}', nodes_files)) + [f"--relationships={relationship_csv_file}"]
    # Execute the Import Tool (This tool will re-create a fresh "neo4j" folder if needed).
    cmd_res = subprocess.run([import_tool_cmd] + arguments, cwd=os.getcwd(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if "IMPORT DONE" not in str(cmd_res.stdout, "utf-8"):
        raise Exception(f"Can't execute Import Tool successfully [{cmd_res.stderr}]")

In [17]:
# Check if Neo4j's Database Service is already containing the desired data.
if not(len(graph.nodes) == num_of_nodes and num_of_nodes > 0):
    # Start resetting the database, then importing the data (nodes & relationships).
    command_neo4j_database_service("stop")
    reset_neo4j_database()
    import_data()
    command_neo4j_database_service("start")

#### Check if the importing is successful

In [18]:
yelp_graph_ready = False

# Func to check if the importing process was done successfully.
def check_if_importing_is_successful():
    global graph
    global yelp_graph_ready
    num_tries = 30
    for one_try in range(num_tries):
        try:
            graph = Graph(SERVER_ADDRESS, auth=SERVER_AUTH)
            cur_num_nodes = len(graph.nodes)
            if cur_num_nodes == 0:
                raise Exception(f"There is no node in the Database")
            if cur_num_nodes != num_of_nodes:
                raise Exception(f'Expected: [{num_of_nodes}] nodes, but found: [{cur_num_nodes}]')
            yelp_graph_ready = True
        except (ConnectionRefusedError, ServiceUnavailable, TransientError, ConnectionAbortedError) as e:
            print(f"{e}. Try again...")
            sleep(1)

In [19]:
check_if_importing_is_successful()
if not yelp_graph_ready:
    raise Exception("Tried waiting for Neo4j Database Service, but still not available")

[WinError 10053] An established connection was aborted by the software in your host machine. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made because the target machine actively refused it. Try again...
[WinError 10061] No connection could be made b

## Conclusion

In Neo4j's Browser, one can execute cypher `CALL db.schema.visualization()` to get the Schema of the Graph. It should look like this:

![Expected Schema](expected_schema.jpg "Expected Schema")

Given the raw data from [Yelp Dataset](https://www.yelp.com/dataset), we intentionally ignore "Tip" and "Checking" files. We take into account "Business", "User" and "Review" files. However, those files are **not** free from errors (see Data Problems above). Thus, we fix them and save into `fixed_....json` files.  

Because the data files are heavy, e.g. "Review" (6GB), this Python script is prioritized in keeping memory footprint as little as possible, at the expense of running speed.  

The quickest way to import a large amount of nodes and relationships into Neo4j is through Import Tool. It's implied that the *user should stay on the machine where Neo4j's Database Service is installed, and has sufficient privileges to start/stop Neo4j's Database Service*. Note: With Neo4j Desktop, to install a Database as a Service, we first create a Database, find out its location on disk, stop it (if it is running), then execute `bin\neo4j.bat install-service` to install the Database as a Service, and `bin\neo4j.bat start` to start it.  

The `fixed_....json` files above are transformed into a list of CSV files with the formats expected by the [Import Tool](https://neo4j.com/docs/operations-manual/current/tutorial/import-tool/). Before continuing, we do some Data Integrity checking.  

Next, we start importing the CSV files into Neo4j's Database Service *appropriately*. Finally, we check that the importing process is done successfully by counting the number of nodes.  

**Tested environment**:
+ Windows 8.1 x64  
+ Anaconda3-2020.02 x64 (Python v3.7.6)  
+ Neo4j **v4.0.4** (JDK v11.0.7 LTS x64), with `dbms.memory.heap.max_size` 4GB  
+ Yelp Dataset (03.05.2020). MD5: `7610af013edf610706021697190dab15`  
+ Neo4j Driver: Py2neo **v5.0b1**