In [1]:
import sys
import json
import gzip
from configparser import ConfigParser
import psycopg2

INSERTS_PER_COMMIT = 1024
VERBOSE            = True
MAX_INSERT         = 20000000 # sys.maxsize

TOP15PROPERTIES = ['brand',
                   'category',
                   'description',
                   'id',
                   'keyValuePairs',
                   'price',
                   'specTableContent',
                   'title',
                   'mpn',
                   'sku',
                   'productID',
                   'gtin8',
                   'gtin12',
                   'gtin13'
]

SQL_START = """  
    DROP TABLE IF EXISTS TABLEBASE_offers CASCADE;
    CREATE TABLE TABLEBASE_offers (
          brand              TEXT   DEFAULT NULL
        , category           TEXT   DEFAULT NULL
        , cluster_id         BIGINT NOT NULL
        , description        TEXT   DEFAULT NULL
        , id                 TEXT NOT NULL
        , keyValuePairs      TEXT   DEFAULT NULL
        , price              TEXT   DEFAULT NULL
        , specTableContent   TEXT   DEFAULT NULL
        , title              TEXT   DEFAULT NULL
        , mpn                TEXT   DEFAULT NULL
        , sku                TEXT   DEFAULT NULL
        , productID          TEXT   DEFAULT NULL
        , gtin8              TEXT   DEFAULT NULL
        , gtin12             TEXT   DEFAULT NULL
        , gtin13             TEXT   DEFAULT NULL
        , PRIMARY KEY (id)
    );
"""

def init_db(tablebase=None):
    try:
        sql = SQL_START.replace('TABLEBASE',tablebase)
        execute_pg(sql)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        exit() # pretty fatal
    
def add_offer_to_db(tablebase, cursor, offer):
    try:
        all_args = [offer['brand'], offer['category'], 
                    int(offer['cluster_id']), offer['description'],
                    offer['id'], str(offer['keyValuePairs']),
                    offer['price'], offer['specTableContent'],
                    offer['title']]
        p_names  = ''
        p_perc   = ''
        #
        prop = get_top_properties(offer)
        for k in prop.keys():
            p_names += ','+k
            p_perc  += ',%s'
            all_args.append(prop[k])
            # print(k,prop[k])
        cursor.execute('INSERT INTO {}_offers(brand, category, cluster_id, description, id, keyValuePairs, price, specTableContent, title{}) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s{});'.format(tablebase,p_names,p_perc),all_args)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        exit() # pretty fatal

def convert_json(jsonzip=None,tablebase=None):
    """ Convert the .gz JSON input to a table in de PostgreSQL database """
    init_db(tablebase)
    with gzip.open(jsonzip) as fp:
        cursor   = None
        inserted = 0
        total    = 0
        for line in fp:
            offer = json.loads(line)
            # print(json.dumps(offer, indent=4))
            if cursor is None:
                cursor = conn_pg.cursor()
            inserted += 1
            total    += 1
            if total <= MAX_INSERT:
                add_offer_to_db(tablebase,cursor,offer)
            if inserted >= INSERTS_PER_COMMIT:
                if VERBOSE:
                    sys.stdout.write('.')
                    sys.stdout.flush()
                inserted = 0
                conn_pg.commit()
                cursor.close()
                cursor = None
            if total > MAX_INSERT:
                break
        if cursor is not None:
            conn_pg.commit() 
            cursor.close()
            cursor = None
        if VERBOSE:
            sys.stdout.write('\n')
    if VERBOSE:
        print("# Succesfully inserted {} tuples.".format(total))

def add_top_property(pl,label,new_d):
    key = '/'+label
    for single_d in pl:
        if key in single_d:
            val = single_d[key]
            if val.startswith('['):
                # strip [] and leading/trailing spaces
                val = val[1:len(val)-1].strip()
            new_d[label] = val
            #print(label,'=',val)
            return True
    return False

def get_top_properties(offer):
    result_d = {}
    d_ident  = offer['identifiers'];
    for label in TOP15PROPERTIES:
        add_top_property(d_ident, label,  result_d)
    return result_d

def analyze_json(jsonzip=None):
    # framework for doing try-out on json 'offer' object
    with gzip.open(jsonzip) as fp:
        for line in fp:
            offer = json.loads(line)

def config(configname='database.ini', section='postgresql'):
    # create a parser
    parser = ConfigParser()
    # read config file
    parser.read(configname)

    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, configname))

    return db

conn_pg = None

def connect_pg(configname='database.ini'):
    """ Connect to the PostgreSQL database server """
    try:
        # read connection parameters
        params = config(configname=configname)

        # connect to the PostgreSQL server
        if VERBOSE:
            print('Connecting to the PostgreSQL database...')
        global conn_pg
        conn_pg = psycopg2.connect(**params)
		
        # create a cursor
        cur = conn_pg.cursor()
        
	# execute a statement
        if VERBOSE:
            print('PostgreSQL database version:')
        cur.execute('SELECT version()')

        # display the PostgreSQL database server version
        db_version = cur.fetchone()
        if VERBOSE:
            print(db_version)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        exit() # pretty fatal


def close_pg():
    """ Close connection to the PostgreSQL database server """
    try:
        if conn_pg is not None:
            conn_pg.close()
            if VERBOSE:
                print('Database connection closed.')
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

def execute_pg(sql_stat=None):
    """ Execute single command on the PostgreSQL database server """
    try:
        cur = conn_pg.cursor()
        cur.execute(sql_stat)
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

In [2]:
if __name__ == '__main__':
    connect_pg(configname='database.ini')
    # analyze_json(jsonzip='./sample_offersenglish.json.gz')
    convert_json(jsonzip='./offers_corpus_english_v2.json.gz',tablebase='WDC_ENG')
    # convert_json(jsonzip='./sample_offersenglish.json.gz',tablebase='WDC_ENG')
    close_pg()

Connecting to the PostgreSQL database...
PostgreSQL database version:
('PostgreSQL 12.18 (Ubuntu 12.18-0ubuntu0.20.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.2) 9.4.0, 64-bit',)
......................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................