In [0]:
%fs ls dbfs:/FileStore/tables/data

path,name,size,modificationTime
dbfs:/FileStore/tables/data/customer/,customer/,0,0
dbfs:/FileStore/tables/data/order/,order/,0,0
dbfs:/FileStore/tables/data/order_item/,order_item/,0,0
dbfs:/FileStore/tables/data/orders/,orders/,0,0
dbfs:/FileStore/tables/data/orders.csv,orders.csv,2999944,1678731221000


In [0]:
orders_items_df=spark.read.csv('dbfs:/FileStore/tables/data/order_item/order_items.csv',schema="""order_item_id INT,order_item_order_id INT,order_item_product_id INT,order_item_quantity INT,order_item_subtotal FLOAT,order_item_product_price FLOAT""")


In [0]:
orders_df=spark.read.csv('dbfs:/FileStore/tables/data/order/orders.csv',schema="""order_id INT,order_date DATE,order_customer_id INT,order_status STRING""")



In [0]:
customer_df=spark.read.csv('dbfs:/FileStore/tables/data/customer/customer.csv',schema="""customer_id INT,customer_fname STRING,customer_lname STRING,customer_email STRING,customer_password STRING,customer_street STRING,customer_city STRING,customer_state STRING,customer_zipcode INT""")


In [0]:
orders_items_df.show(2)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
only showing top 2 rows



In [0]:
customers_orders_df=customer_df.join(orders_df,customer_df['customer_id']==orders_df['order_customer_id'])

In [0]:
customers_orders_df.select('customer_id','order_id','order_date','order_status').orderBy('customer_id').show(10)

+-----------+--------+----------+---------------+
|customer_id|order_id|order_date|   order_status|
+-----------+--------+----------+---------------+
|          1|   22945|2013-12-13|       COMPLETE|
|          2|   33865|2014-02-18|       COMPLETE|
|          2|   15192|2013-10-29|PENDING_PAYMENT|
|          2|   57963|2013-08-02|        ON_HOLD|
|          2|   67863|2013-11-30|       COMPLETE|
|          3|   35158|2014-02-26|       COMPLETE|
|          3|   57617|2014-07-24|       COMPLETE|
|          3|   22646|2013-12-11|       COMPLETE|
|          3|   23662|2013-12-19|       COMPLETE|
|          3|   56178|2014-07-15|        PENDING|
+-----------+--------+----------+---------------+
only showing top 10 rows



In [0]:
#Consolidating order_id,order_date, and order_status to structure data type.
from pyspark.sql.functions import struct
customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details')).orderBy('customer_id').show(10)

+-----------+--------------------+
|customer_id|       order_details|
+-----------+--------------------+
|          1|{22945, 2013-12-1...|
|          2|{33865, 2014-02-1...|
|          2|{15192, 2013-10-2...|
|          2|{57963, 2013-08-0...|
|          2|{67863, 2013-11-3...|
|          3|{35158, 2014-02-2...|
|          3|{57617, 2014-07-2...|
|          3|{22646, 2013-12-1...|
|          3|{23662, 2013-12-1...|
|          3|{56178, 2014-07-1...|
+-----------+--------------------+
only showing top 10 rows



In [0]:
customer_order_struct=customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details'))

In [0]:
from pyspark.sql.functions import collect_list
final_df=customer_order_struct.groupBy('customer_id').agg(collect_list('order_details').alias('order_details')).orderBy('customer_id')

In [0]:
#Show the 10 first rows
final_df.show(10)

+-----------+--------------------+
|customer_id|       order_details|
+-----------+--------------------+
|          1|[{22945, 2013-12-...|
|          2|[{15192, 2013-10-...|
|          3|[{22646, 2013-12-...|
|          4|[{9023, 2013-09-1...|
|          5|[{13705, 2013-10-...|
|          6|[{7485, 2013-09-0...|
|          7|[{9977, 2013-09-2...|
|          8|[{7688, 2013-09-1...|
|          9|[{12828, 2013-10-...|
|         10|[{45239, 2014-05-...|
+-----------+--------------------+
only showing top 10 rows



In [0]:
#Export Data Frame into a JSON File.
final_df.coalesce(1).write.json('dbfs:/FileStore/tables/Data/final')

In [0]:
## Joining the tables
customer_details=customer_df. \
join(orders_df,customer_df['customer_id']==orders_df['order_customer_id']). \
join(orders_items_df,orders_df['order_id']==orders_items_df['order_item_order_id'])

In [0]:
## Create a Denormalized Data Frame by combining all the required feild under order_detail as Structure Data Type.

denorm_df=customer_details. \
select('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status',struct('order_item_id','order_item_product_id','order_item_subtotal').alias('order_item_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status'). \
agg(collect_list('order_item_details').alias('order_item_details')). \
orderBy('customer_id'). \
select('customer_id','customer_fname','customer_lname','customer_email',struct('order_id','order_date','order_status','order_item_details').alias('order_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email'). \
agg(collect_list('order_details').alias('order_details')). \
orderBy('customer_id')

In [0]:
#Export Data Frame into a JSON File.
denorm_df.coalesce(1).write.json('dbfs:/FileStore/tables/Data/denorm')

In [0]:
#analyze the Denormalized data using Spark.
json_df=spark.read.json('dbfs:/FileStore/tables/Data/denorm')

In [0]:
json_df.show(2)

+--------------+--------------+-----------+--------------+--------------------+
|customer_email|customer_fname|customer_id|customer_lname|       order_details|
+--------------+--------------+-----------+--------------+--------------------+
|     XXXXXXXXX|       Richard|          1|     Hernandez|[{2013-12-13, 229...|
|     XXXXXXXXX|          Mary|          2|       Barrett|[{2013-08-02, 579...|
+--------------+--------------+-----------+--------------+--------------------+
only showing top 2 rows



In [0]:
#Get the Details of the order placed by the customer on 2014 January 1st
from pyspark.sql.functions import explode
json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
filter('order_details.order_date LIKE "2014-01-01%"'). \
orderBy('customer_id'). \
select('customer_id','customer_fname','order_details.order_id','order_details.order_date','order_details.order_status'). \
show(10)

+-----------+--------------+--------+----------+---------------+
|customer_id|customer_fname|order_id|order_date|   order_status|
+-----------+--------------+--------+----------+---------------+
|        206|          Mary|   25966|2014-01-01|         CLOSED|
|        279|          Anna|   25918|2014-01-01|       COMPLETE|
|        363|      Jennifer|   25980|2014-01-01|       COMPLETE|
|        387|          Mary|   25970|2014-01-01|       COMPLETE|
|        470|          Mary|   61904|2014-01-01|       COMPLETE|
|        492|          Mary|   25964|2014-01-01|     PROCESSING|
|        505|          Mary|   25925|2014-01-01|PENDING_PAYMENT|
|        522|       William|   25972|2014-01-01|PENDING_PAYMENT|
|       1044|         Linda|   25895|2014-01-01|       COMPLETE|
|       1117|        Arthur|   25942|2014-01-01|     PROCESSING|
+-----------+--------------+--------+----------+---------------+
only showing top 10 rows



In [0]:
#Compute the monthly customer Revenue
#To calculate the monthly customer revenue we need to perform aggregations(SUM) on order_item_subtotal from the order_items table.
from pyspark.sql.functions import col
flatten=json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
select('customer_id','customer_fname',col('order_details.order_date').alias('order_date'),col('order_details.order_id').alias('order_id'),col('order_details.order_status').alias('order_status'),explode('order_details.order_item_details').alias('order_item_details')). \
select('customer_id','customer_fname','order_date','order_id','order_status','order_item_details.order_item_id','order_item_details.order_item_product_id','order_item_details.order_item_subtotal')

In [0]:
flatten.show(2)

+-----------+--------------+----------+--------+------------+-------------+---------------------+-------------------+
|customer_id|customer_fname|order_date|order_id|order_status|order_item_id|order_item_product_id|order_item_subtotal|
+-----------+--------------+----------+--------+------------+-------------+---------------------+-------------------+
|          1|       Richard|2013-12-13|   22945|    COMPLETE|        57439|                  191|             499.95|
|          2|          Mary|2013-08-02|   57963|     ON_HOLD|       145023|                 1014|             149.94|
+-----------+--------------+----------+--------+------------+-------------+---------------------+-------------------+
only showing top 2 rows



In [0]:
#After flattening our data let's write the logic to get the monthly revenue
from pyspark.sql.functions import to_date
from pyspark.sql import Row
from pyspark.sql.functions import sum as _sum

flatten.select('customer_id','customer_fname',col("order_date"),to_date(col("order_date"),"yyyy-MM-dd").alias("order_date_converted"),'order_status','order_item_subtotal'). \
filter("order_status IN ('COMPLETE','CLOSED')"). \
groupBy('customer_id','customer_fname',date_format('order_date_converted','yyyy-MM').alias('order_month')). \
agg(_sum('order_item_subtotal').alias('Revenue')). \
orderBy('order_month'). \
show()




+-----------+--------------+-----------+------------------+
|customer_id|customer_fname|order_month|           Revenue|
+-----------+--------------+-----------+------------------+
|       1478|          Anna|    2013-07|           1784.76|
|       1180|          Mary|    2013-07|           1129.94|
|         16|       Tiffany|    2013-07|             39.99|
|       2418|         Helen|    2013-07|1099.8400000000001|
|        943|          John|    2013-07| 829.8900000000001|
|       1104|         Linda|    2013-07|            699.96|
|       1265|        Albert|    2013-07|            199.99|
|        965|          Sean|    2013-07|494.95000000000005|
|       1932|       Shirley|    2013-07| 929.9100000000001|
|        121|          Mary|    2013-07| 609.9300000000001|
|         66|          Mary|    2013-07| 749.9300000000001|
|       2129|       William|    2013-07|            589.91|
|        137|      Jonathan|    2013-07|229.98000000000002|
|       2321|          Mary|    2013-07|