In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import psycopg2 as pg

conn = pg.connect(dbname='bdtp4', user='mard', password='1234')
cur = conn.cursor()

In [12]:
import lzma
import json

with lzma.open('../data/dataset.json.xz') as file:
    values = []    
    for line in file:
        product = json.loads(line)
        product = tuple(
            product.get(key, None) 
            for key in ['Id', 'ASIN', 'title', 'group', 'salesrank']
        )
        product = cur.mogrify('(%s,%s,%s,%s,%s)', product)
        values.append(product)

    try:
        cur.execute(
            b'INSERT INTO product(id,asin,title,"group",sales_rank) VALUES ' + 
            b','.join(values)
        )
        conn.commit()
    except Exception as e:
        print(e)
        conn.rollback()
        conn.reset()

In [15]:
with lzma.open('../data/dataset.json.xz') as file:
    uniqueCategories = set(
        cur.mogrify('(%s,%s)', cat)
        for product in map(json.loads, file) if 'categories' in product 
        for catList in product['categories']
        for cat in catList
    )
    
    try:
        cur.execute(
            b'INSERT INTO category(id,name) VALUES ' +
            b','.join(uniqueCategories)
        )
        conn.commit()
    except Exception as e:
        print(e)
        conn.rollback()
        conn.reset()

In [3]:
import lzma
import json
from bdtp4.util import Oven
import tqdm

with lzma.open('../data/dataset.json.xz') as file:
    values = set(
        cur.mogrify('(%s,%s,%s)', (prod.get('Id', None), catId, i))
        for prod in map(json.loads, file)
        if 'categories' in prod
        for catList in prod['categories']
        for i, (catId, catName) in enumerate(catList)
    )
    
    for batch in tqdm.tqdm(Oven(values, len(values) / 10), total=len(values) / 10):
        try:
            cur.execute(
                b'INSERT INTO product_category(prod_id,cat_id,index) VALUES ' +
                b','.join(batch)
            )
            conn.commit()
        except Exception as e:
            print(e)
            conn.rollback()
            conn.reset()

In [9]:
with lzma.open('../data/dataset.json.xz') as file:
    prods = {
        prod['ASIN']: {key: prod[key] for key in ['Id', 'similar'] if key in prod}
        for prod in map(json.loads, file)
    }

In [23]:
values = (
    cur.mogrify('(%s,%s)', (prod.get('Id', None), prods[sim].get('Id', None)))
    for prod in prods.values()
    if 'similar' in prod
    for sim in prod['similar']
    if sim in prods
)

try:
    cur.execute(
        b'INSERT INTO product_similar(first_prod_id,second_prod_id) VALUES ' +
        b','.join(values)
    )
    conn.commit()
except Exception as e:
    print(e)
    conn.rollback()
    conn.reset()

In [32]:
with lzma.open('../data/dataset.json.xz') as file:
    values = set(
        cur.mogrify(
            '(to_timestamp(%s),%s,%s,%s,%s,%s)', 
            (review['time'], prod['Id'], review['customer_id'], review['rating'], review['votes'], review['helpful'])
        )
        for prod in map(json.loads, file)
        if 'reviews' in prod
        for review in prod['reviews']
    )
    
    try:
        for batch in Oven(values, 500000):
            cur.execute(
                b'INSERT INTO review(time,prod_id,customer_id,rating,votes,helpful) VALUES ' +
                b','.join(batch)
            )
            conn.commit()
    except Exception as e:
        print(e)
        conn.rollback()
        conn.reset()

In [36]:
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures import as_completed
import tqdm

def worker(args):
    arg1, arg2 = args
    return arg1 + arg2
    
with ProcessPoolExecutor() as executor:
    data = [(1,2), (3,4), (4,5)]
    futures = (executor.submit(worker, x) for x in data)
    for fut in tqdm.tqdm(as_completed(futures), total=len(data)):
        print(fut.result())

100%|██████████| 3/3 [00:00<00:00, 17.01it/s]


3
7
9


b'(999,497297)'

In [37]:
conn.close()

In [4]:
with lzma.open('../data/dataset.json.xz') as file:
    for prod in map(json.loads, file):
        print(prod)
        break

{'Id': 999, 'ASIN': '0312252099', 'title': 'The Cowboy and His Elephant : The Story of a Remarkable Friendship', 'group': 'Book', 'salesrank': 412546, 'similar': ['0060929510', '0226542378', '0439293138', '0385314280', '0393019624'], 'categories': [[[283155, 'Books'], [1000, 'Subjects'], [2, 'Biographies & Memoirs'], [2437, 'Specific Groups'], [2443, 'General']], [[283155, 'Books'], [1000, 'Subjects'], [75, 'Science'], [13469, 'Biological Sciences'], [13474, 'Animals'], [13499, 'Mammals']]], 'reviews': [{'time': 993700800, 'customer_id': 'A3O52OB19M6PZQ', 'rating': 5, 'votes': 4, 'helpful': 4}, {'time': 994392000, 'customer_id': 'A3ML4ZDZZLV9DO', 'rating': 4, 'votes': 2, 'helpful': 2}, {'time': 1000440000, 'customer_id': 'ADGN6LAG8XF8E', 'rating': 5, 'votes': 3, 'helpful': 3}, {'time': 1003204800, 'customer_id': 'A11YGKKKLTJ8R0', 'rating': 5, 'votes': 5, 'helpful': 5}, {'time': 1004328000, 'customer_id': 'A2DV39USD3ROKW', 'rating': 5, 'votes': 3, 'helpful': 3}, {'time': 1014782400, 'cu

In [16]:
from bdtp4.util import Oven
from bdtp4.parser import parse
from collections import Counter

conn.reset()

try:
    cur.execute('DELETE FROM category')
    cur.execute('DELETE FROM product')
    cur.execute('DELETE FROM product_category')
    conn.commit()
except Exception as e:
    print(e)
    conn.rollback()
    conn.reset()

for batch in Oven(parse('../data/amazon-meta.txt'), 10):
    dataset = batch
    
    uniqueCategories = set(z for x in dataset if 'categories' in x for y in x['categories'] for z in y)
    categoriesIds = Counter(x[1] for x in uniqueCategories).most_common()
    if len(uniqueCategories) != len(categoriesIds):
        raise AssertionError('categories have tuples with similar id but different names')

    try:
        mogValues = (cur.mogrify('(%s,%s)', (_id, name)) for name, _id in uniqueCategories)
        cur.execute(b'INSERT INTO category(id,name) VALUES ' + b','.join(mogValues))
        conn.commit()
    except Exception as e:
        print(e)
        conn.rollback()
        conn.reset()
        
    try:
        mogValues = (
            cur.mogrify(
                '(%s,%s,%s,%s,%s)',
                (prod.get('id', None), prod.get('ASIN', None), prod.get('title', None), prod.get('group', None), prod.get('salesrank', None))
            ) 
            for prod in dataset
        )
        cur.execute(b'INSERT INTO product(id,asin,title,"group",sales_rank) VALUES ' + b','.join(mogValues))
        conn.commit()

    except Exception as e:
        print(e)
        conn.rollback()
        conn.reset()
        
    try:
        mogValues = (
            cur.mogrify('(%s,%s,%s)', (prod.get('id', None), catId, i))
            for prod in dataset
            if 'categories' in prod
            for catList in prod['categories']
            for i, (catName, catId) in enumerate(catList)
        )
        query = b'UPDATE product_category(prod_id,cat_id,index) VALUES ' + b','.join(mogValues)
        print(query)
        cur.execute(query)
        conn.commit()

    except Exception as e:
        print(e)
        conn.rollback()
        conn.reset()

    break

b"UPDATE product_category(prod_id,cat_id,index) VALUES (1,'283155',0),(1,'1000',1),(1,'22',2),(1,'12290',3),(1,'12360',4),(1,'12368',5),(1,'283155',0),(1,'1000',1),(1,'22',2),(1,'12290',3),(1,'12360',4),(1,'12370',5),(2,'283155',0),(2,'1000',1),(2,'22',2),(2,'12472',3),(2,'12484',4),(2,'283155',0),(2,'1000',1),(2,'22',2),(2,'12472',3),(2,'12486',4),(3,'283155',0),(3,'1000',1),(3,'48',2),(3,'5126',3),(3,'5144',4),(4,'283155',0),(4,'1000',1),(4,'22',2),(4,'12290',3),(4,'172810',4),(4,'12155',5),(4,'12159',6),(4,'283155',0),(4,'1000',1),(4,'22',2),(4,'12290',3),(4,'12333',4),(4,'12335',5),(4,'283155',0),(4,'1000',1),(4,'22',2),(4,'12290',3),(4,'12059',4),(4,'764432',5),(4,'572080',6),(4,'283155',0),(4,'1000',1),(4,'22',2),(4,'12056',3),(4,'764430',4),(4,'572082',5),(4,'283155',0),(4,'1000',1),(4,'22',2),(4,'12290',3),(4,'12059',4),(4,'764438',5),(4,'572094',6),(5,'283155',0),(5,'1000',1),(5,'22',2),(5,'12290',3),(5,'12465',4),(5,'12470',5),(5,'283155',0),(5,'1000',1),(5,'22',2),(5,'12290'

In [2]:
from bdtp4.parser import parse
dataset = list(parse('../data/amazon-meta.txt'))

In [3]:
import random
random.choice(dataset)

[autoreload of bdtp4.parser failed: Traceback (most recent call last):
  File "/home/mard/Documents/bdtp4/.venv/lib/python3.8/site-packages/IPython/extensions/autoreload.py", line 245, in check
    superreload(m, reload, self.old_objects)
  File "/home/mard/Documents/bdtp4/.venv/lib/python3.8/site-packages/IPython/extensions/autoreload.py", line 394, in superreload
    module = reload(module)
  File "/home/mard/Documents/bdtp4/.venv/lib/python3.8/imp.py", line 314, in reload
    return importlib.reload(module)
  File "/home/mard/Documents/bdtp4/.venv/lib/python3.8/importlib/__init__.py", line 169, in reload
    _bootstrap._exec(spec, module)
  File "<frozen importlib._bootstrap>", line 604, in _exec
  File "<frozen importlib._bootstrap_external>", line 779, in exec_module
  File "<frozen importlib._bootstrap_external>", line 916, in get_code
  File "<frozen importlib._bootstrap_external>", line 846, in source_to_code
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frame

{'id': 371831,
 'ASIN': '0789722097',
 'title': "Complete Idiot's Guide to Religions Online (Complete Idiot's Guide)",
 'group': 'Book',
 'salesrank': '1005908',
 'similar': [],
 'categories': [[[283155, 'Books'],
   [1000, 'Subjects'],
   [5, 'Computers & Internet'],
   [69765, 'Home & Office'],
   [69766, 'Internet'],
   [4112, 'Online Reference']],
  [[283155, 'Books'],
   [1000, 'Subjects'],
   [5, 'Computers & Internet'],
   [4053, 'Software'],
   [549660, 'Introductory Guides'],
   [4121, "Complete Idiot's Guide: General"]],
  [[283155, 'Books'],
   [1000, 'Subjects'],
   [22, 'Religion & Spirituality'],
   [12504, 'General']],
  [[283155, 'Books'],
   [1000, 'Subjects'],
   [22, 'Religion & Spirituality'],
   [12779, 'Religious Studies'],
   [12783, 'Comparative Religion']],
  [[283155, 'Books'],
   [1000, 'Subjects'],
   [5, 'Computers & Internet'],
   [69765, 'Home & Office'],
   [69766, 'Internet'],
   [69771, 'Online Searching']],
  [[283155, 'Books'],
   [1000, 'Subjects'],

In [8]:
uniqueCategories = set((catName, int(catId)) for x in dataset if 'categories' in x for y in x['categories'] for catName, catId in y)

In [4]:



with open('../data/amazon-meta.txt') as file:
    data = file.read().split('\n\n')

In [None]:
from bdtp4.parallelParser import parse
import pathlib

total = list(parse(pathlib.Path('../data/amazon-meta.txt')))

  0%|          | 0/548554 [00:00<?, ?it/s]

In [1]:
import lzma
import json

with lzma.open('out.json.xz', 'wb') as f:
    f.write(json.dumps({'a': 1}).encode())