In [0]:
# Step 1: Check if all the required files are placed

%fs ls dbfs:/FileStore/tables/Data

In [0]:
# Create Dataframes for orders,order_items, and customers. 
#The CSV files we are using don't have a schema hence while creating the data frame we define the schema.

customer_df=spark.read.csv('dbfs:/FileStore/tables/Data/customer/part_00000',schema="""customer_id INT,customer_fname STRING,customer_lname STRING,customer_email STRING,customer_password STRING,customer_street STRING,customer_city STRING,customer_state STRING,customer_zipcode INT""")

orders_df=spark.read.csv('dbfs:/FileStore/tables/Data/orders/part_00000',schema="""order_id INT,order_date DATE,order_customer_id INT,order_status STRING""")

order_items_df=spark.read.csv('dbfs:/FileStore/tables/Data/order_items/part_00000',schema="""order_item_id INT,order_item_order_id INT,order_item_product_id INT,order_item_quantity INT,order_item_subtotal FLOAT,order_item_product_price FLOAT""")

In [0]:
# View DataFrame
order_items_df.show(2)

In [0]:
# Step 4: Join Our Tables into a new DataFrame(oreder_details) to create a Denormalized data frame.

# Joining customers and orders table initially.

customers_orders_df=customer_df.join(orders_df,customer_df['customer_id']==orders_df['order_customer_id'])

#Project the required Data using SELECT clause.

customers_orders_df.select('customer_id','order_id','order_date','order_status').orderBy('customer_id').show(10)

In [0]:
# Consolidating order_id,order_date, and order_status to structure data type.

from pyspark.sql.functions import struct
customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details')).orderBy('customer_id').show(10)

In [0]:
# Generate an array of struct. field using order_details. Here we are grouping the customer_id and storing the order_details in form of an array.

customer_order_struct=customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details'))
from pyspark.sql.functions import collect_list
final_df=customer_order_struct.groupBy('customer_id').agg(collect_list('order_details').alias('order_details')).orderBy('customer_id')

In [0]:
# Step 5: Export Data Frame into a JSON File.

final_df.coalesce(1).write.json('dbfs:/FileStore/tables/Data/final')

In [0]:
# Previously we had performed the Denormalization for orders and customers. Now we will perform for the entire three tables.

## Joining the tables

customer_details=customer_df. \
join(orders_df,customer_df['customer_id']==orders_df['order_customer_id']). \
join(order_items_df,orders_df['order_id']==order_items_df['order_item_order_id'])

## Create a Denormalized Data Frame by combining all the required feild under order_detail as Structure Data Type.

denorm_df=customer_details. \
select('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status',struct('order_item_id','order_item_product_id','order_item_subtotal').alias('order_item_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status'). \
agg(collect_list('order_item_details').alias('order_item_details')). \
orderBy('customer_id'). \
select('customer_id','customer_fname','customer_lname','customer_email',struct('order_id','order_date','order_status','order_item_details').alias('order_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email'). \
agg(collect_list('order_details').alias('order_details')). \
orderBy('customer_id')

In [0]:
# Export Data Frame into a JSON File.

denorm_df.coalesce(1).write.json('dbfs:/FileStore/tables/Data/denorm')

In [0]:
# Now we have the required data to do our analysis. Now it's time to analyze the Denormalized data using Spark.

# We shall perform the below analysis on our data

# Get the Details of the order placed by the customer on 2014 January 1st
# Compute the monthly customer Revenue


In [0]:
# Problem Statement — 1:
# Read the Data Frame.

json_df=spark.read.json('dbfs:/FileStore/tables/Data/denorm/part-00000-tid-4357456608139543307-49cdb4fe-37a2-4435-be01-b6711f29eb3d-211-1-c000.json')
json_df.show(2)

In [0]:
json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
filter('order_details.order_date LIKE "2014-01-01%"'). \
orderBy('customer_id'). \
select('customer_id','customer_fname','order_details.order_id','order_details.order_date','order_details.order_status'). \
show(10)

In [0]:
# Problem Statement — 2:
# To calculate the monthly customer revenue we need to perform aggregations(SUM) on order_item_subtotal from the order_items table.
# In our input data, we have wrapped all the details into a struct data type Hence it's time to flatten all the details.

flatten=json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
select('customer_id','customer_fname',col('order_details.order_date').alias('order_date'),col('order_details.order_id').alias('order_id'),col('order_details.order_status').alias('order_status'),explode('order_details.order_item_details').alias('order_item_details')). \
select('customer_id','customer_fname','order_date','order_id','order_status','order_item_details.order_item_id','order_item_details.order_item_product_id','order_item_details.order_item_subtotal')

In [0]:
# After flattening our data let's write the logic to get the monthly revenue

from pyspark.sql.functions import to_date
from pyspark.sql import Row
from pyspark.sql.functions import sum as _sum

flatten.select('customer_id','customer_fname',col("order_date"),to_date(col("order_date"),"yyyy-MM-dd").alias("order_date_converted"),'order_status','order_item_subtotal'). \
filter("order_status IN ('COMPLETE','CLOSED')"). \
groupBy('customer_id','customer_fname',date_format('order_date_converted','yyyy-MM').alias('order_month')). \
agg(_sum('order_item_subtotal').alias('Revenue')). \
orderBy('order_month'). \
show()