In [1]:
import json
import pandas as pd 

In [8]:
from environs import Env
env= Env()
env.read_env('.env')

True

In [11]:
PSQL_DB_HOST=env.str('PSQL_DB_HOST')
PSQL_DB_PORT=env.int('PSQL_DB_PORT')
PSQL_DB_USER=env.str('PSQL_DB_USER')
PSQL_DB_PASSWORD=env.str('PSQL_DB_PASSWORD')
PSQL_DB_DATABASE=env.str('PSQL_DB_DATABASE')

SOURCE_DIR=env.str('SOURCE_DIR')
TARGET_DIR=env.str('TARGET_DIR')

In [20]:
conn_string = f'postgresql://{PSQL_DB_USER}:{PSQL_DB_PASSWORD}@{PSQL_DB_HOST}:{PSQL_DB_PORT}/{PSQL_DB_DATABASE}'

In [2]:
schemas = json.load(open('data/retail_db/schemas.json', 'r'))

In [5]:
def get_column_name(schema, category, sort_key='column_position'):
    """This Function helps in getting column names sorted in order of column_position.
    Args:
        schema: json data loaded from schema.json file
        category: name of table (e.g. orders, customers etc.)
        sort_key: column_position is the default sort key used to sort the columns
    Returns: List of Column Names
    """
    try:
        if category in schema.keys():
            category_data = schema[category]
            columns = sorted(category_data, key=lambda col: col[sort_key])
            return [column['column_name'] for column in columns] 
        else:
            return None
    except Exception as ex:
        print(ex)

In [6]:
order_columns = get_column_name(schema=schemas, category='orders')

In [7]:
order_df = pd.read_csv(
    'data/retail_db/orders/part-00000',
    names=order_columns
)

In [17]:
order_df.head()

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE
3,4,2013-07-25 00:00:00.0,8827,CLOSED
4,5,2013-07-25 00:00:00.0,11318,COMPLETE


In [21]:
order_df.to_sql(
    name='orders',
    con=conn_string,
    if_exists='append',
    index=False
)

883

In [24]:
sql_query = '''
Select count(*) from orders limit 10
'''

pd.read_sql(sql=sql_query, con=conn_string)

Unnamed: 0,count
0,68883


## CHUNK SIZE AND PROCESS

In [25]:
df_order_reader = pd.read_csv(
    'data/retail_db/orders/part-00000',
    names=order_columns,
    chunksize=10000
)

In [None]:
for idx, df in enumerate(df_order_reader):
    print(f'Processing chunk {idx} with size {df.size}')
    df.to_sql(
        'orders',
        conn_string,
        if_exists='append',
        index=False
    )