## Set up

In [1]:
# load libraries
import json
import csv
import gzip
import time
import re
import ast
import shutil
import os
import dropbox
from py2neo import Graph
from itertools import islice

In [2]:
# input files
prodInFile = "data/metadata.json.gz"
revInFile = "data/kcore_5.json.gz"

# output csv files
prodCsvName = "data/products.csv"
revCsvName = "data/reviews.csv"
pplCsvName = "data/people.csv"

# dropbox output folder
dropLocs = "/home/danny/Dropbox/Apps/bigdataclass/data/"

In [3]:
with open("dboxkey.txt") as f:
        key = f.read()
dbx = dropbox.Dropbox(key)
# dbx.users_get_current_account()

## Start the graph connection

In [4]:
# a couple of graph setup options
doLocally = False
startFresh = False

if doLocally:
    server = "http://neo4j:bigdata@localhost:7474/db/data/"
else:
    with open("graphpass.txt") as f:
        server = f.read()
print(server)
graph = Graph(server)
if startFresh:
    graph.delete_all()

http://neo4j:AgentSmith@34.195.7.211:7474/db/data/


In [5]:
# statements asserting uniqueness
q1 = """
create constraint on (pe:Person) assert pe.id is unique;
"""
q2 = """
create constraint on (pr:Product) assert pr.id is unique;
"""
graph.run(q1)
graph.run(q2)

<py2neo.database.Cursor at 0x7f8e0f5d2d50>

In [6]:
# load reviews
qRev = """
using periodic commit 1000
load csv with headers from "%s" as row
merge (person:Person {id:row.reviewerID})
merge (product:Product {id:row.asin})
create (person)-[:Reviewed {ts:row.ts, reviewText:row.reviewText, score:row.score, summary:row.summary, helpful0:row.helpful0, helpful1:row.helpful1}]->(product);
"""

# load people
qPpl = """
using periodic commit 1000
load csv with headers from "%s" as row
match(person:Person {id:row.reviewerID})
set person.name = row.name;
"""

# load products
qProd = """
using periodic commit 1000
load csv with headers from "%s" as row
match(product:Product {id:row.asin})
set product.name = row.name, product.price = row.price, product.imUrl = row.imUrl,
    product.brand = row.brand, product.rankCat = row.rankCat, product.rank = row.rank,
    product.categories = row.categories;
"""

## Create and upload review data

In [7]:
# # get the number of reviews
# sttime = time.time() # time the process
# numLines = sum(1 for line in gzip.open(revInFile))
# print(numLines, time.time()-sttime)

In [7]:
numLines = 41135700

In [8]:
# open gzip json and write
def makeRevFile(revInFile, revCsvName, startLn, endLn):
    sttime = time.time() # time the process
    with gzip.open(revInFile, "r") as f, open(revCsvName, 'w') as csvRev:
        # write first row of csv
        rev = csv.writer(csvRev)
        rev.writerow(["reviewerID", "score", "reviewText", "summary", "helpful0",
                      "helpful1", "ts", "asin"])
        
        count = 0
        for line in islice(f, startLn, endLn):
            ln = line.decode("ascii")
            d = json.loads(ln)

            # add review
            tr = d.get("reviewText")
            tsu = d.get("summary")
            if tr != None:
                tr = re.sub("\n", " ", tr)
                tr = tr.replace("\\", "")
                tr = tr.replace(",", "")
            if tsu != None:
                tsu = re.sub("\n", " ", tsu)
                tsu = tsu.replace("\\", "")
                tsu = tsu.replace(",", "")
            rev.writerow([d.get("reviewerID"), d.get("overall"), tr, tsu,
                          d.get("helpful")[0], d.get("helpful")[1],
                          d.get("unixReviewTime"), d.get("asin")])
            count += 1
#             if count % 100000 == 0:
#                 print(count)
#             if (count + startLn) >= endLn:
#                 break # stop if you reach the endLn
    print(count, time.time()-sttime)

In [9]:
# choose the chunks to upload
itt = list(range(0, numLines, 1000000))
itt.append(numLines)
itt = itt[32:]
print(itt)

[32000000, 33000000, 34000000, 35000000, 36000000, 37000000, 38000000, 39000000, 40000000, 41000000, 41135700]


In [10]:
for i in range(len(itt)-1):
    print("--------------------------------------------------")
    print("start: ", itt[i])
    # create the csv for upload
    makeRevFile(revInFile, revCsvName, itt[i], itt[i+1])
    # copy the csv to the dropbox folder
    shutil.copy(revCsvName, dropLocs)
    # wait until the file uploads
    resp = dbx.files_list_folder("/data")
    while len(resp.entries) == 0:
        print ".",
        time.sleep(5)
        resp = dbx.files_list_folder("/data")
    print("On Dropbox")
    # get the dropbox url for the file
    for fil in resp.entries:
        fl = fil.path_lower
    link = dbx.sharing_create_shared_link(fl)
    url = link.url
    url = url.replace("?dl=0", "")
    url = url.replace("https://www.", "https://dl.")
    # upload to the database
    sttime = time.time() # time the process
    graph.run(qRev % url)
    print("upload", time.time()-sttime)
    # delete the file to start again
    dbx.files_delete("/" + revCsvName)

--------------------------------------------------
('start: ', 32000000)
(1000000, 543.624843120575)
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . On Dropbox
('upload', 209.81379413604736)
--------------------------------------------------
('start: ', 33000000)
(1000000, 571.2465131282806)
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . On Dropbox
('upload', 149.6985960006714)
--------------------------------------------------
('start: ', 34000000)
(1000000, 642.7367570400238)
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . On Dropbox
('upload', 149.23469805717468)
--------------------------------------------------
('start: ', 35000000)
(1000000, 537.7515671253204)
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . On Dropbox
('upload', 143.61693477630615)
--------------------------------------------------
('s

## Create and upload people data

In [11]:
# open gzip json and write
sttime = time.time() # time the process
count = 0
with gzip.open(revInFile, "r") as f, open(pplCsvName, 'w') as csvPpl:
    # create set for ensuring only unique items
    pplSet = set()
    
    # create csv writers
    ppl = csv.writer(csvPpl)
    ppl.writerow(["reviewerID", "name"])

    for line in f:
        ln = line.decode("ascii")
        d = json.loads(ln)
        
        # add person
        if d.get("reviewerID") not in pplSet:
            pplSet.add(d.get("reviewerID"))
            tempNm = d.get("reviewerName")
            if tempNm != None:
                tempNm = re.sub("\n", " ", tempNm)
                tempNm = tempNm.replace("\\", "")
                tempNm = tempNm.replace(",", "")
            ppl.writerow([d.get("reviewerID"), tempNm])
        count += 1
        if count % 5000000 == 0:
            print(count)
#         if count > 100000: break
print(count, time.time()-sttime)

# copy the csv to the dropbox folder
shutil.copy(pplCsvName, dropLocs)
# wait until the file uploads
resp = dbx.files_list_folder("/data")
while len(resp.entries) == 0:
    print ".",
    time.sleep(5)
    resp = dbx.files_list_folder("/data")
print("On Dropbox")
# get the dropbox url for the file
for fil in resp.entries:
    fl = fil.path_lower
link = dbx.sharing_create_shared_link(fl)
url = link.url
url = url.replace("?dl=0", "")
url = url.replace("https://www.", "https://dl.")
# upload to the database
sttime = time.time() # time the process
graph.run(qPpl % url)
print("upload", time.time()-sttime)
# delete the file to start again
dbx.files_delete("/" + pplCsvName)

5000000
10000000
15000000
20000000
25000000
30000000
35000000
40000000
(41135700, 1874.876326084137)
. On Dropbox
('upload', 241.80340790748596)


FileMetadata(name=u'people.csv', id=u'id:oplCPYks8QAAAAAAAAAARQ', client_modified=datetime.datetime(2017, 10, 23, 19, 48, 10), server_modified=datetime.datetime(2017, 10, 23, 19, 48, 12), rev=u'835dcafb3e', size=90621266, path_lower=u'/data/people.csv', path_display=u'/data/people.csv', parent_shared_folder_id=None, media_info=None, sharing_info=None, property_groups=None, has_explicit_shared_members=None, content_hash=u'30851e3f5673dad6c1d33aaefc1482d7d9f7ff753eb2c785ed4d5cb5316342f7')

## Create and upload product data

In [None]:
# create product csv
# open gzip json and write
sttime = time.time() # time the process
count = 0
with gzip.open(prodInFile, "r") as f, open(prodCsvName, 'w') as csvProd:
    # create set for ensuring only unique items
    prodSet = set()
    # create csv writer
    prod = csv.writer(csvProd)
    prod.writerow(["asin", "name", "price", "imUrl", "brand", "categories", "rankCat", "rank"])
    for line in f:
        ln = line.decode("ascii")
        ln = re.sub("\n", "", ln)
        d = ast.literal_eval(ln)
        if d.get("asin") not in prodSet:
            prodSet.add(d.get("asin"))
            tmpAs = d.get("asin")
            if tmpAs != None:
                tmpAs = re.sub("\n", " ", tmpAs)
                tmpAs = tmpAs.replace("\\", "")
                tmpAs = tmpAs.replace(",", "")
            sr = d.get("salesRank")
            if sr == None or len(sr) == 0:
                sr = {"NA": 0}
            sr2 = [list(sr.keys())[0], list(sr.values())[0]]
            nm = d.get("tmp")
            if nm != None:
                nm = re.sub("\n", " ", nm)
                nm = nm.replace("\\", "")
                nm = nm.replace(",", "")
            ti = d.get("title")
            if ti != None:
                ti = re.sub("\n", " ", ti)
                ti = ti.replace("\\", "")
                ti = ti.replace(",", "")
                ti = ti.replace("\"", "")
                ti = ti.replace("\'", "")
            prod.writerow([tmpAs, ti, d.get("price"), d.get("imUrl"),
                          d.get("brand"), d.get("categories"),
                          sr2[0], sr2[1]])
        count += 1
        if count % 1000000 == 0:
            print(count)
#         if count > 1000000: break
print(count, time.time()-sttime)

# copy the csv to the dropbox folder
shutil.copy(prodCsvName, dropLocs)
# wait until the file uploads
resp = dbx.files_list_folder("/data")
while len(resp.entries) == 0:
    print ".",
    time.sleep(5)
    resp = dbx.files_list_folder("/data")
print("On Dropbox")
# get the dropbox url for the file
for fil in resp.entries:
    fl = fil.path_lower
link = dbx.sharing_create_shared_link(fl)
url = link.url
url = url.replace("?dl=0", "")
url = url.replace("https://www.", "https://dl.")

# upload to the database
sttime = time.time() # time the process
graph.run(qProd % url)
print("upload", time.time()-sttime)
# delete the file to start again
dbx.files_delete("/" + prodCsvName)

## Deduplicate records

In [None]:
qDedup = """
MATCH (pe)-[r]->(pr)
WITH pe, pr, TAIL (COLLECT (r)) as rr
FOREACH (r IN rr | DELETE r)
"""

sttime = time.time() # time the process
graph.run(qDedup)
print("deduplicate", time.time()-sttime)