In [5]:
import glob
import os
from datetime import datetime, timedelta
import pandas_profiling
import pandas as pd
from sqlalchemy import create_engine


Matplotlib is building the font cache; this may take a moment.


In [6]:
# Adapter layer

def get_file_path(root_path,batch_date,file_date_format,arg_file_name):
    arg_date_dt = datetime.strptime(batch_date,"%Y-%m-%d").date() - timedelta(days=1)
    arg_batch_date = arg_date_dt.strftime(file_date_format)
    return glob.glob(os.path.join(root_path ,arg_file_name.format(arg_batch_date)))

def read_csv(file_path_list,columns):
    dfs_list = []
    for file_path in file_path_list:
        dfs_list.append(pd.read_csv(file_path,header=None,names=columns))
    df = pd.concat(dfs_list,ignore_index=True)
    return df

def get_db_connection(conn_string):
    db = create_engine(conn_string)
    conn = db.connect()
    return conn

def write_to_postgres(df,conn,table_name):
    df.to_sql(table_name, con=conn, if_exists='append',index=False,chunksize=1000)

In [17]:
# Application layer

def extract(root_path,batch_date,file_name,file_date_format,columns):
    file_paths = get_file_path(root_path,batch_date,file_date_format,file_name)
    print(file_paths)
    df = read_csv(file_paths,columns)
    return df

def general_formatting(df,batch_date_str):
    df["batch_date"] = datetime.strptime(batch_date_str,"%Y-%m-%d").date()
    return df

def sales_formatting(df):
    df["transaction_time"] = pd.to_datetime(df["transaction_time"])
    return df

def load(df,table_name,conn_string):
    conn = get_db_connection(conn_string)
    write_to_postgres(df,conn,table_name)
    return True

def el_sales(batch_date_str,date_format,root_path,sales_file_name,sales_columns,conn_string,sales_table_name):
    sales_df = extract(root_path,batch_date_str,sales_file_name,date_format,sales_columns)
    formatted_sales_df = general_formatting(sales_df,batch_date_str)
    formatted_sales_df = sales_formatting(formatted_sales_df)
    profile = formatted_sales_df.profile_report(title = "Profiling Report")
    profile.to_file("/Users/brayanjules/etl_practice1/ingestion_tool/ingestion/{0}.html".format(sales_table_name))
   ## load(formatted_sales_df,sales_table_name,conn_string)

def el_stores(batch_date_str,date_format,root_path,store_file_name,store_columns,conn_string,stores_table_name):
    store_df = extract(root_path,batch_date_str,store_file_name,date_format,store_columns)
    formatted_store_df = general_formatting(store_df,batch_date_str)
    profile = formatted_store_df.profile_report(title = "Profiling Report")
    profile.to_file("/Users/brayanjules/etl_practice1/ingestion_tool/ingestion/{0}.html".format(stores_table_name))
    ##load(formatted_store_df,stores_table_name,conn_string)

def main():
    batch_date_str = "2020-01-27"
    date_format = '%Y%m%d'
    file_path = '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/'
    sales_columns = ["store_token","transaction_id","receipt_token","transaction_time","amount","source_id","user_role"]
    sales_file_name = 'sales_*.csv.gz'
    store_columns = ["store_group","store_token","store_name"]
    store_file_name = 'stores_*.csv.gz'
    conn_string = 'postgresql://postgres:000000@localhost:5433/postgres'
    sales_table_name = 'sales_t'
    store_table_name = 'stores_t'
    el_sales(batch_date_str,date_format,file_path,sales_file_name,sales_columns,conn_string,sales_table_name)
    el_stores(batch_date_str,date_format,file_path,store_file_name,store_columns,conn_string,store_table_name)


In [18]:
main()

['/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200119.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200216.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200202.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200302.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200223.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200209.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200126.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/sales_20200112.csv.gz']


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

['/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200112.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200126.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200209.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200223.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200302.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200202.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200119.csv.gz', '/Users/brayanjules/Projects/personal/data_engineer/datasets/sales_marketplace/stores_20200216.csv.gz']


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]