In [11]:
import os

import pandas as pd
from pandas.core.config_init import copy_on_write_doc
# os.environ.update({'DB_HOST':'localhost'})

In [2]:
%env DB_HOST=locahost

env: DB_HOST=locahost


In [3]:
os.environ.get('DB_HOST')

'locahost'

In [4]:
%load_ext sql

In [5]:
%env DATABASE_URL=postgresql://postgres:2254@localhost:5432/retail_db

env: DATABASE_URL=postgresql://postgres:2254@localhost:5432/retail_db


In [6]:
%env DATABASE_URL

'postgresql://postgres:2254@localhost:5432/retail_db'

In [7]:
%%sql

SELECT * FROM orders LIMIT 10

10 rows affected.


order_id,order_date,order_customer_id,order_status
1,2013-07-25 00:00:00,11599,CLOSED
2,2013-07-25 00:00:00,256,PENDING_PAYMENT
3,2013-07-25 00:00:00,12111,COMPLETE
4,2013-07-25 00:00:00,8827,CLOSED
5,2013-07-25 00:00:00,11318,COMPLETE
6,2013-07-25 00:00:00,7130,COMPLETE
7,2013-07-25 00:00:00,4530,COMPLETE
8,2013-07-25 00:00:00,2911,PROCESSING
9,2013-07-25 00:00:00,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00,5648,PENDING_PAYMENT


In [9]:
conn_url = 'postgresql://postgres:2254@localhost:5432/retail_db'

In [12]:
pd.read_sql('orders', conn_url)

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25,11599,CLOSED
1,2,2013-07-25,256,PENDING_PAYMENT
2,3,2013-07-25,12111,COMPLETE
3,4,2013-07-25,8827,CLOSED
4,5,2013-07-25,11318,COMPLETE
...,...,...,...,...
68878,68879,2014-07-09,778,COMPLETE
68879,68880,2014-07-13,1117,COMPLETE
68880,68881,2014-07-19,2518,PENDING_PAYMENT
68881,68882,2014-07-22,10000,ON_HOLD


In [13]:
query = '''
    SELECT order_status, COUNT(order_id) AS order_count
    FROM orders
    GROUP BY order_status
    ORDER BY order_count DESC
'''

In [14]:
pd.read_sql(query, conn_url)

Unnamed: 0,order_status,order_count
0,COMPLETE,22899
1,PENDING_PAYMENT,15030
2,PROCESSING,8275
3,PENDING,7610
4,CLOSED,7556
5,ON_HOLD,3798
6,SUSPECTED_FRAUD,1558
7,CANCELED,1428
8,PAYMENT_REVIEW,729


In [83]:
%%sql

TRUNCATE TABLE orders

 * postgresql://postgres:***@localhost:5432/retail_db
Done.


[]

In [15]:
import json

In [16]:
def get_column_names(schema, ds_name, sorting_key='column_position'):
    column_details = schema[ds_name]
    columns = sorted(column_details, key=lambda col: col[sorting_key])
    return [col['column_name'] for col in columns]

In [29]:
schemas = json.load(open('data/retail_db/schemas.json'))

In [30]:
# os.chdir("D:/Projects/apache-spark-with-pyspark/data-engineering-SQL-python-pyspark")
# pwd

In [48]:
ds_name = 'orders'

In [49]:
columns = get_column_names(schemas, ds_name)

In [50]:
columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [46]:
df = pd.read_csv(
    f'data/retail_db/{ds_name}/part-00000',
    names=columns)

In [47]:
type(df)

pandas.core.frame.DataFrame

In [34]:
df.head()

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE
3,4,2013-07-25 00:00:00.0,8827,CLOSED
4,5,2013-07-25 00:00:00.0,11318,COMPLETE


In [43]:
df.to_sql(
    'orders',
    conn_url,
    if_exists='replace',
    index=False)

883

In [85]:
pd.read_sql('orders', conn_url)

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE
3,4,2013-07-25 00:00:00.0,8827,CLOSED
4,5,2013-07-25 00:00:00.0,11318,COMPLETE
...,...,...,...,...
68878,68879,2014-07-09 00:00:00.0,778,COMPLETE
68879,68880,2014-07-13 00:00:00.0,1117,COMPLETE
68880,68881,2014-07-19 00:00:00.0,2518,PENDING_PAYMENT
68881,68882,2014-07-22 00:00:00.0,10000,ON_HOLD


In [71]:
df_reader = pd.read_csv(
    f'data/retail_db/{ds_name}/part-00000',
    names=columns,
    chunksize=10000)

In [72]:
for idx, df in enumerate(df_reader):
    print(f'Size of chunk {idx} is {len(df.shape)}')

Size of chunk 0 is 2
Size of chunk 1 is 2
Size of chunk 2 is 2
Size of chunk 3 is 2
Size of chunk 4 is 2
Size of chunk 5 is 2
Size of chunk 6 is 2


In [58]:
type(df_reader)

pandas.io.parsers.readers.TextFileReader

In [59]:
df_list = list(df_reader)

In [60]:
df_list[0]

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE
3,4,2013-07-25 00:00:00.0,8827,CLOSED
4,5,2013-07-25 00:00:00.0,11318,COMPLETE
...,...,...,...,...
9995,9996,2013-09-25 00:00:00.0,11839,PENDING
9996,9997,2013-09-25 00:00:00.0,3471,PENDING_PAYMENT
9997,9998,2013-09-25 00:00:00.0,9419,PENDING
9998,9999,2013-09-25 00:00:00.0,1185,CLOSED


In [61]:
type(df_list[0])

pandas.core.frame.DataFrame

In [62]:
len(df_list)

7

In [69]:
df_reader.chunksize

10000

In [67]:
for idx, df in enumerate(df_list):
    print(f'Size of chunk {idx} is {len(df.shape)}')

Size of chunk 0 is 2
Size of chunk 1 is 2
Size of chunk 2 is 2
Size of chunk 3 is 2
Size of chunk 4 is 2
Size of chunk 5 is 2
Size of chunk 6 is 2


In [79]:
df_reader = pd.read_csv(
    f'data/retail_db/{ds_name}/part-00000',
    names=columns,
    chunksize=10000)

In [80]:
for idx, df in enumerate(df_reader):
    print(f'Processing chunk {idx} with size {df.shape[0]} of {ds_name}')
    df.to_sql(
        'orders',
        conn_url,
        if_exists='append',
        index=False
    )

Processing chunk 0 with size 10000 of orders
Processing chunk 1 with size 10000 of orders
Processing chunk 2 with size 10000 of orders
Processing chunk 3 with size 10000 of orders
Processing chunk 4 with size 10000 of orders
Processing chunk 5 with size 10000 of orders
Processing chunk 6 with size 8883 of orders


In [82]:
pd.read_sql(ds_name, conn_url)

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE
3,4,2013-07-25 00:00:00.0,8827,CLOSED
4,5,2013-07-25 00:00:00.0,11318,COMPLETE
...,...,...,...,...
68878,68879,2014-07-09 00:00:00.0,778,COMPLETE
68879,68880,2014-07-13 00:00:00.0,1117,COMPLETE
68880,68881,2014-07-19 00:00:00.0,2518,PENDING_PAYMENT
68881,68882,2014-07-22 00:00:00.0,10000,ON_HOLD


In [86]:
%%sql

TRUNCATE TABLE orders

 * postgresql://postgres:***@localhost:5432/retail_db
Done.


[]

In [88]:
pd.read_sql('orders', conn_url)

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE
3,4,2013-07-25 00:00:00.0,8827,CLOSED
4,5,2013-07-25 00:00:00.0,11318,COMPLETE
...,...,...,...,...
68878,68879,2014-07-09 00:00:00.0,778,COMPLETE
68879,68880,2014-07-13 00:00:00.0,1117,COMPLETE
68880,68881,2014-07-19 00:00:00.0,2518,PENDING_PAYMENT
68881,68882,2014-07-22 00:00:00.0,10000,ON_HOLD
