Four phases:
1. Read in the zipped json files from the s3 bucket
2. Establish the PSQL connection
3. Transform the data
4. Load the data

In [6]:
import requests
from io import BytesIO
from zipfile import ZipFile
import sys

import psycopg2
from sqlalchemy import create_engine
from sqlalchemy import String, Boolean, BIGINT, DateTime, Float
import creds


import pandas as pd
import json
import datetime
from collections import Counter

# import os

Read in the zipped json files from the s3 bucket

In [4]:
def download_extract_zip(url):
    r=requests.get(url)
    if (r.status_code=='200'):
        print('Loading data...')
    with ZipFile(BytesIO(r.content)) as thezip:
        for zipinfo in thezip.infolist():
            with thezip.open(zipinfo) as thefile:
                yield zipinfo.filename, thefile

def main(args):
    print(args)
    url=args[0]
    datafiles=download_extract_zip(url)
    
    # Connecting to PostgreSQL by providing a sqlachemy engine
    engine = create_engine('postgresql://'+creds.username+':'
                           +creds.password+'@'+creds.host+':'
                           +creds.port+'/'+creds.db,echo=False)

if __name__ == "__main__":
    main(sys.argv)

In [None]:
  
def convert_timestring_utc(x):
    if x is None:
        return x
    x=x[:-6]+''.join(x[-6:].split(':'))
    return datetime.datetime.strptime(x,'%Y-%m-%dT%H:%M:%S%z').astimezone(tz=datetime.timezone.utc)

def convert_datetime_to_str(x):
    if x is None:
        return x
    else:
        return x.strftime('%Y-%m-%d %H:%M:%S%z')

exclude_list=['closed_at','created_at','updated_at',
              'processed_at','cancelled_at','line_items',
             'id']
include_list=['email','number','total_price','confirmed']
user_include_list=['customer_locale','buyer_accepts_marketing','contact_email','phone']

In [None]:
order_dict={}
line_item_dict={}
user_dict={}
for (fn,fobj) in datafiles:
    print(fn)
    data=json.load(fobj)
    orders=data['orders']
    for order in orders:
        order_id=order['id']
        line_item_list=order['line_items']
        line_item_total=len(line_item_list)
        item_total=sum([x['quantity'] for x in line_item_list])
        order_dict[order_id]={k:v for k,v in order.items() 
                              if( k not in set(exclude_list) 
                                 and k not in set(user_include_list))}
        order_dict[order_id]['line_item_total']=line_item_total
        for line_item in line_item_list:
            line_item_id=line_item['id']
            line_item_dict[line_item_id]={k:v for k,v in line_item.items() if k!='id'}
            line_item_dict[line_item_id]['order_id']=order_id
    break

In [None]:
order_df=pd.DataFrame.from_dict(order_dict,orient='index')
order_df.head()

In [None]:


order_type_dict={
    'email':String,
    'number':BIGINT,
    'total_price':Float,
    'total_tax':Float,
    'subtotal_price':Float,
    'total_discount':Float,
    'total_line_items_price':Float,
    'confirmed':Boolean,
    'checkout_id':BIGINT,
    'line_item_total':BIGINT,
    'processed_at':DateTime(timezone=True),
    'cancelled_at':DateTime(timezone=True),
    'created_at':DateTime(timezone=True),
    'updated_at':DateTime(timezone=True),
    'closed_at':DateTime(timezone=True)   
}

# order_df.to_sql(name='order_test2', con=engine, if_exists = 'append', index=True,dtype=order_type_dict)

In [None]:
main:
    load the file from s3 link
    connect to psql
    

In [7]:
    engine = create_engine('postgresql://'+creds.username+':'
                           +creds.password+'@'+creds.host+':'
                           +creds.port+'/'+creds.db,echo=False)

In [11]:
pd.read_sql('orders__2017_10',engine,columns=['index'])

Unnamed: 0,index
0,11674779651
1,11675992067
2,11676844035
3,11677827075
4,11678711811
5,11680219139
6,11682414595
7,11683430403
8,11685265411
9,11686838275


In [13]:
df=pd.read_sql('user_stats',engine,columns=['index'])

In [18]:
31994179 in set(df['index'])

True

In [46]:
existing_order_ids=set(pd.read_sql('order_line_item_mapping',engine,columns=['order_id'])['order_id'])


In [47]:
existing_order_ids

{75062640643,
 75723309059,
 74336010243,
 74273095683,
 74265460739,
 11675992067,
 11676844035,
 11674779651,
 11678711811,
 11677827075,
 74293182467,
 74309959683,
 75183947779,
 75058905091,
 74249666563,
 74274832387,
 74281123843,
 74297901059,
 74335649795,
 74323066883,
 75717607427,
 74965811203,
 74474061827,
 74927046659,
 74952212483,
 75086430211,
 75698798595,
 74977378307,
 75044487171,
 75589746691,
 74994155523,
 75212259331,
 75592925187,
 75131551747,
 74287480835,
 74312646659,
 74404921347,
 75008901123,
 75143118851,
 75688378371,
 75168284675,
 75570937859,
 75654823939,
 74282270723,
 75557339139,
 75725111299,
 75649613827,
 74520330243,
 75745067011,
 75099144195,
 75652792323,
 74431234051,
 74439622659,
 74934550531,
 75244929027,
 75622416387,
 75055169539,
 11712790531,
 74979672067,
 75063558147,
 75701092355,
 75105501187,
 75172610051,
 74445979651,
 74957684739,
 75603607555,
 75150622723,
 75570053123,
 75779768323,
 74432380931,
 74491101187,
 74969

In [53]:
def read_table_index(table_name,colname):
    try: 
        return set(pd.read_sql(table_name,engine,columns=[colname])[colname])
    except:
        return ()

In [54]:
p=set(read_table_index('orders__2017_10','index'))

In [44]:
11674779651 in set(read_table_index('orders__2017_10'))

False