In [46]:
#!pip3 install pandas --quiet
import json
import pandas as pd

Read Orders data into orders_data pandas dataframe

In [47]:
# example to read data from a csv file into a pandas dataframe
# order_columns = ['order_id', 'order_date', 'order_customer_id', 'order_status']

# function to get column names from a schema.json file

def get_column_names(schema_file, table_name, sorting_key='column_position'):
    column_details = schema_file[table_name]
    columns = sorted(column_details, key=lambda col: col[sorting_key])
    return [col['column_name'] for col in columns]

# Read schemas file to schema_file object
schema_file = json.load(open('../data-engineering-spark/data/retail_db/schemas.json'))
order_columns = get_column_names(schema_file, 'orders')
orders_data = pd.read_csv('../data-engineering-spark/data/retail_db/orders/part-00000', names=order_columns)
orders_data.shape

(68883, 4)

In [48]:
# example to filter data from pandas dataframe
orders_data.query('order_status == "COMPLETE"').shape

(22899, 4)

In [49]:
# example to generate a derived column and aggregate dataframe

orders_data['year_month'] = orders_data.apply(lambda orders: orders.order_date[:7], axis=1)
orders_data. \
    groupby(['year_month', 'order_status'])['order_id']. \
    agg(orders_count='count'). \
    reset_index()

# orders_data.shape

Unnamed: 0,year_month,order_status,orders_count
0,2013-07,CANCELED,22
1,2013-07,CLOSED,161
2,2013-07,COMPLETE,515
3,2013-07,ON_HOLD,81
4,2013-07,PAYMENT_REVIEW,19
...,...,...,...
112,2014-07,PAYMENT_REVIEW,54
113,2014-07,PENDING,517
114,2014-07,PENDING_PAYMENT,979
115,2014-07,PROCESSING,561


Read customers data into customers_data pandas dataframe

In [50]:
customer_columns = get_column_names(schema_file, 'customers')
customers_data = pd.read_csv('../data-engineering-spark/data/retail_db/customers/part-00000', names=customer_columns)
customers_data.shape

(12435, 9)

Create a joined customer_orders dataframe using customers and orders dataframe

In [51]:
# set index for customers and orders dataframe
orders_data = orders_data.set_index('order_customer_id')
customers_data = customers_data.set_index('customer_id')

customer_orders = customers_data. \
                    join(orders_data, how='inner'). \
                    reset_index(names='customer_id'). \
                    groupby('customer_id')['order_id']. \
                    agg(count_of_orders='count'). \
                    reset_index(). \
                    query('count_of_orders >=10')
                    
customer_orders

Unnamed: 0,customer_id,count_of_orders
70,71,10
171,172,10
173,174,12
196,197,11
219,221,15
...,...,...
12311,12341,10
12317,12347,10
12375,12406,10
12400,12431,16
