####List out the folders and files in DBFS

In [0]:
dbutils.fs.ls("dbfs:/FileStore/DataProcessing using Spark/Data")

Out[5]: [FileInfo(path='dbfs:/FileStore/DataProcessing using Spark/Data/customers/', name='customers/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/DataProcessing using Spark/Data/order_items/', name='order_items/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/DataProcessing using Spark/Data/orders/', name='orders/', size=0, modificationTime=0)]

####Define the Schema

In [0]:
from pyspark.sql.types import *

customer_schema = StructType([
    StructField("customer_id",IntegerType(),True),
    StructField("customer_fname",StringType(),True),
    StructField("customer_lname",StringType(),True),
    StructField("customer_email",StringType(),True),
    StructField("customer_password",StringType(),True),
    StructField("customer_address",StringType(),True),
    StructField("customer_city",StringType(),True),
    StructField("customer_state",StringType(),True),
    StructField("customer_zipcode",IntegerType(),True),
])

In [0]:
order_schema = StructType([
    StructField("order_id",IntegerType(),True),
    StructField("order_date",DateType(),True),
    StructField("order_customer_id",IntegerType(),True),
    StructField("order_status",StringType(),True)
])

In [0]:
order_item_schema = StructType([
    StructField("order_item_id",IntegerType(),True),
    StructField("order_item_order_id",IntegerType(),True),
    StructField("order_item_product_id",IntegerType(),True),
    StructField("order_item_quantity",FloatType(),True),
    StructField("order_item_subtotal",FloatType(),True),
    StructField("order_item_product_price",FloatType(),True)
])

####Read the CSV Files from the DBFS

In [0]:
customer_df = spark.read.csv("dbfs:/FileStore/DataProcessing using Spark/Data/customers/",schema=customer_schema)

In [0]:
customer_df.show(5,truncate=False)

+-----------+--------------+--------------+--------------+-----------------+-----------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_address       |customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+-----------------------+-------------+--------------+----------------+
|1          |Richard       |Hernandez     |XXXXXXXXX     |XXXXXXXXX        |6303 Heather Plaza     |Brownsville  |TX            |78521           |
|2          |Mary          |Barrett       |XXXXXXXXX     |XXXXXXXXX        |9526 Noble Embers Ridge|Littleton    |CO            |80126           |
|3          |Ann           |Smith         |XXXXXXXXX     |XXXXXXXXX        |3422 Blue Pioneer Bend |Caguas       |PR            |725             |
|4          |Mary          |Jones         |XXXXXXXXX     |XXXXXXXXX        |8324 Little Common     |San Marcos   |CA  

In [0]:
order_df = spark.read.csv("dbfs:/FileStore/DataProcessing using Spark/Data/orders/", schema = order_schema)

In [0]:
order_df.show(5,truncate=False)

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|order_status   |
+--------+----------+-----------------+---------------+
|1       |2013-07-25|11599            |CLOSED         |
|2       |2013-07-25|256              |PENDING_PAYMENT|
|3       |2013-07-25|12111            |COMPLETE       |
|4       |2013-07-25|8827             |CLOSED         |
|5       |2013-07-25|11318            |COMPLETE       |
+--------+----------+-----------------+---------------+
only showing top 5 rows



In [0]:
order_item_df = spark.read.csv("dbfs:/FileStore/DataProcessing using Spark/Data/order_items/", schema=order_item_schema)

In [0]:
order_item_df.show(5,truncate=False)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|1            |1                  |957                  |1.0                |299.98             |299.98                  |
|2            |2                  |1073                 |1.0                |199.99             |199.99                  |
|3            |2                  |502                  |5.0                |250.0              |50.0                    |
|4            |2                  |403                  |1.0                |129.99             |129.99                  |
|5            |4                  |897                  |2.0                |49.98              |24.99                   |
+-------------+-

####Join the Customer and Order Table

In [0]:
customer_order_df = customer_df.join(order_df,customer_df['customer_id'] == order_df['order_customer_id'])

In [0]:
customer_order_df.show(5,truncate=False)

+-----------+--------------+--------------+--------------+-----------------+-------------------------+-------------+--------------+----------------+--------+----------+-----------------+---------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_address         |customer_city|customer_state|customer_zipcode|order_id|order_date|order_customer_id|order_status   |
+-----------+--------------+--------------+--------------+-----------------+-------------------------+-------------+--------------+----------------+--------+----------+-----------------+---------------+
|11599      |Mary          |Malone        |XXXXXXXXX     |XXXXXXXXX        |8708 Indian Horse Highway|Hickory      |NC            |28601           |1       |2013-07-25|11599            |CLOSED         |
|256        |David         |Rodriguez     |XXXXXXXXX     |XXXXXXXXX        |7605 Tawny Horse Falls   |Chicago      |IL            |60625           |2       |2013-07-25|256              |PE

In [0]:
from pyspark.sql.functions import struct

cust_order_struct = customer_order_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details')).orderBy('customer_id')

In [0]:
cust_order_struct.show(5,truncate=False)

+-----------+------------------------------------+
|customer_id|order_details                       |
+-----------+------------------------------------+
|1          |{22945, 2013-12-13, COMPLETE}       |
|2          |{15192, 2013-10-29, PENDING_PAYMENT}|
|2          |{33865, 2014-02-18, COMPLETE}       |
|2          |{57963, 2013-08-02, ON_HOLD}        |
|2          |{67863, 2013-11-30, COMPLETE}       |
+-----------+------------------------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import collect_list

final_df = cust_order_struct.groupBy('customer_id').agg(collect_list('order_details').alias('order_details')).orderBy('customer_id')

In [0]:
final_df.show(5,truncate=False)

+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customer_id|order_details                                                                                                                                                                                                             |
+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1          |[{22945, 2013-12-13, COMPLETE}]                                                                                                                                                                                           |
|2          |[{15192, 2013-10-29, PENDING_PAYMENT}, {33865, 2014-02-

####Export the Data into JSON file

In [0]:
final_df.coalesce(1).write.mode('overwrite').json('dbfs:/FileStore/DataProcessing using Spark/Data/final')

####Join the all tables

In [0]:
customer_details = customer_df \
.join(order_df,customer_df['customer_id'] == order_df['order_customer_id']) \
.join(order_item_df,order_df['order_id'] == order_item_df['order_item_order_id'])

In [0]:
customer_details.display()

customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_address,customer_city,customer_state,customer_zipcode,order_id,order_date,order_customer_id,order_status,order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price
11599,Mary,Malone,XXXXXXXXX,XXXXXXXXX,8708 Indian Horse Highway,Hickory,NC,28601,1,2013-07-25,11599,CLOSED,1,1,957,1.0,299.98,299.98
256,David,Rodriguez,XXXXXXXXX,XXXXXXXXX,7605 Tawny Horse Falls,Chicago,IL,60625,2,2013-07-25,256,PENDING_PAYMENT,4,2,403,1.0,129.99,129.99
256,David,Rodriguez,XXXXXXXXX,XXXXXXXXX,7605 Tawny Horse Falls,Chicago,IL,60625,2,2013-07-25,256,PENDING_PAYMENT,3,2,502,5.0,250.0,50.0
256,David,Rodriguez,XXXXXXXXX,XXXXXXXXX,7605 Tawny Horse Falls,Chicago,IL,60625,2,2013-07-25,256,PENDING_PAYMENT,2,2,1073,1.0,199.99,199.99
8827,Brian,Wilson,XXXXXXXXX,XXXXXXXXX,8396 High Corners,San Antonio,TX,78240,4,2013-07-25,8827,CLOSED,8,4,1014,4.0,199.92,49.98
8827,Brian,Wilson,XXXXXXXXX,XXXXXXXXX,8396 High Corners,San Antonio,TX,78240,4,2013-07-25,8827,CLOSED,7,4,502,3.0,150.0,50.0
8827,Brian,Wilson,XXXXXXXXX,XXXXXXXXX,8396 High Corners,San Antonio,TX,78240,4,2013-07-25,8827,CLOSED,6,4,365,5.0,299.95,59.99
8827,Brian,Wilson,XXXXXXXXX,XXXXXXXXX,8396 High Corners,San Antonio,TX,78240,4,2013-07-25,8827,CLOSED,5,4,897,2.0,49.98,24.99
11318,Mary,Henry,XXXXXXXXX,XXXXXXXXX,3047 Silent Embers Maze,Caguas,PR,725,5,2013-07-25,11318,COMPLETE,13,5,403,1.0,129.99,129.99
11318,Mary,Henry,XXXXXXXXX,XXXXXXXXX,3047 Silent Embers Maze,Caguas,PR,725,5,2013-07-25,11318,COMPLETE,12,5,957,1.0,299.98,299.98


####Denormalization the all tables

In [0]:
denorm_df = customer_details \
.select('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status',struct('order_item_id','order_item_product_id','order_item_subtotal').alias('order_item_details')) \
.groupBy('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status').agg(collect_list('order_item_details').alias('order_item_details')).orderBy('customer_id') \
.select('customer_id','customer_fname','customer_lname','customer_email',struct('order_id','order_date','order_status','order_item_details').alias('order_details')) \
.groupBy('customer_id','customer_fname','customer_lname','customer_email').agg(collect_list('order_details').alias('order_details')).orderBy('customer_id')

####Export the data as JSON File

In [0]:
denorm_df.coalesce(1).write.mode('overwrite').json('dbfs:/FileStore/DataProcessing using Spark/Data/denorm')

## Analyse the Denormalized data using Spark SQL
We shall perform the below analysis on our data
* Get the Details of the order placed by the customer on 2014 January 1st
* Compute the monthly customer Revenue

####Read the dataframe from json files

In [0]:
json_df = spark.read.json('dbfs:/FileStore/DataProcessing using Spark/Data/denorm')

In [0]:
json_df.display()

customer_email,customer_fname,customer_id,customer_lname,order_details
XXXXXXXXX,Richard,1,Hernandez,"List(List(2013-12-13, 22945, List(List(57439, 191, 499.95)), COMPLETE))"
XXXXXXXXX,Mary,2,Barrett,"List(List(2013-08-02, 57963, List(List(145023, 1014, 149.94), List(145022, 1014, 99.96), List(145021, 627, 199.95), List(145020, 1073, 199.99), List(145019, 365, 119.98)), ON_HOLD), List(2014-02-18, 33865, List(List(84538, 502, 50.0), List(84537, 1073, 199.99), List(84536, 957, 299.98)), COMPLETE), List(2013-10-29, 15192, List(List(38007, 1014, 99.96)), PENDING_PAYMENT), List(2013-11-30, 67863, List(List(169674, 1004, 399.98)), COMPLETE))"
XXXXXXXXX,Ann,3,Smith,"List(List(2014-07-15, 56178, List(List(140510, 957, 299.98), List(140509, 502, 150.0), List(140508, 957, 299.98), List(140507, 365, 299.95), List(140506, 502, 100.0)), PENDING), List(2014-07-24, 57617, List(List(144132, 1073, 199.99), List(144131, 1014, 99.96), List(144130, 365, 239.96), List(144129, 365, 239.96)), COMPLETE), List(2014-02-26, 35158, List(List(87813, 273, 27.99), List(87812, 1004, 399.98)), COMPLETE), List(2013-12-14, 61453, List(List(153690, 1073, 199.99), List(153689, 957, 299.98), List(153688, 957, 299.98), List(153687, 403, 129.99)), COMPLETE), List(2013-12-19, 23662, List(List(59209, 502, 50.0), List(59208, 502, 100.0), List(59207, 191, 99.99)), COMPLETE))"
XXXXXXXXX,Mary,4,Jones,"List(List(2014-05-28, 49339, List(List(123340, 365, 299.95), List(123339, 365, 119.98), List(123338, 502, 100.0)), COMPLETE), List(2014-06-10, 51157, List(List(127853, 365, 59.99), List(127852, 365, 119.98), List(127851, 365, 119.98)), CLOSED), List(2013-09-19, 9023, List(List(22498, 1014, 149.94), List(22497, 885, 24.99), List(22496, 627, 79.98), List(22495, 1014, 99.96)), COMPLETE), List(2013-09-24, 9704, List(List(24241, 365, 179.97), List(24240, 365, 59.99), List(24239, 905, 124.95), List(24238, 365, 179.97)), COMPLETE))"
XXXXXXXXX,Robert,5,Hudson,"List(List(2014-03-06, 36472, List(List(91068, 957, 299.98), List(91067, 235, 174.95), List(91066, 1014, 199.92), List(91065, 403, 129.99), List(91064, 1014, 99.96)), PROCESSING), List(2014-05-05, 45832, List(List(114561, 365, 239.96)), PENDING_PAYMENT), List(2014-04-05, 41333, List(List(103183, 403, 129.99)), COMPLETE))"
XXXXXXXXX,Mary,6,Smith,"List(List(2013-09-09, 7485, List(List(18769, 1014, 149.94), List(18768, 191, 499.95), List(18767, 502, 200.0), List(18766, 627, 199.95)), PROCESSING), List(2013-09-10, 7787, List(List(19475, 365, 179.97), List(19474, 627, 199.95), List(19473, 1014, 249.9)), PENDING), List(2013-12-10, 22457, List(List(56204, 1073, 199.99), List(56203, 403, 129.99), List(56202, 1004, 399.98), List(56201, 1014, 99.96), List(56200, 1014, 99.96)), PENDING_PAYMENT), List(2014-02-13, 32895, List(List(82281, 502, 250.0), List(82280, 502, 100.0), List(82279, 191, 299.97)), PENDING_PAYMENT))"
XXXXXXXXX,Melissa,7,Wilcox,"List(List(2014-03-01, 35559, List(List(88802, 191, 199.98), List(88801, 1014, 249.9), List(88800, 897, 49.98)), PROCESSING), List(2014-01-17, 28539, List(List(71414, 403, 129.99), List(71413, 1004, 399.98)), PENDING_PAYMENT), List(2014-01-02, 26052, List(List(65203, 1014, 199.92), List(65202, 1004, 399.98), List(65201, 1004, 399.98), List(65200, 403, 129.99)), PENDING_PAYMENT), List(2013-09-25, 9977, List(List(24928, 502, 150.0), List(24927, 502, 150.0)), PROCESSING), List(2014-01-06, 26730, List(List(66951, 502, 150.0), List(66950, 191, 499.95), List(66949, 1004, 399.98), List(66948, 191, 199.98)), ON_HOLD), List(2013-12-22, 61683, List(List(154218, 191, 499.95), List(154217, 403, 129.99)), COMPLETE), List(2014-01-09, 62132, List(List(155343, 1004, 399.98), List(155342, 957, 299.98), List(155341, 403, 129.99), List(155340, 957, 299.98), List(155339, 502, 100.0)), ON_HOLD))"
XXXXXXXXX,Megan,8,Smith,"List(List(2013-09-16, 8497, List(List(21219, 502, 250.0), List(21218, 1014, 249.9), List(21217, 1014, 149.94), List(21216, 1014, 199.92)), CLOSED), List(2013-09-10, 7688, List(List(19227, 365, 59.99), List(19226, 502, 200.0)), PROCESSING), List(2014-01-22, 29383, List(List(73519, 917, 109.95), List(73518, 502, 200.0), List(73517, 917, 43.98)), COMPLETE), List(2014-01-08, 62064, List(List(155181, 502, 150.0)), PENDING_PAYMENT), List(2014-05-30, 68507, List(List(171306, 1073, 199.99), List(171305, 365, 239.96)), PENDING), List(2013-11-24, 19801, List(List(49487, 191, 499.95)), PENDING), List(2013-12-08, 22297, List(List(55831, 365, 59.99), List(55830, 191, 199.98), List(55829, 502, 150.0), List(55828, 1073, 199.99), List(55827, 957, 299.98)), COMPLETE), List(2014-04-27, 65018, List(List(162522, 957, 299.98)), PENDING_PAYMENT))"
XXXXXXXXX,Mary,9,Perez,"List(List(2014-07-03, 54350, List(List(135917, 1004, 399.98), List(135916, 37, 174.95), List(135915, 403, 129.99)), CLOSED), List(2014-04-17, 64753, List(List(161850, 1004, 399.98)), PENDING_PAYMENT), List(2013-10-12, 12828, List(List(32074, 906, 24.99), List(32073, 627, 199.95)), COMPLETE), List(2013-11-19, 18918, List(List(47314, 502, 100.0), List(47313, 365, 299.95), List(47312, 1014, 99.96), List(47311, 957, 299.98), List(47310, 1073, 199.99)), PENDING_PAYMENT), List(2013-09-12, 67610, List(List(169042, 1004, 399.98), List(169041, 364, 299.99), List(169040, 1073, 199.99)), PENDING_PAYMENT))"
XXXXXXXXX,Melissa,10,Smith,"List(List(2014-05-01, 45239, List(List(113029, 502, 150.0), List(113028, 365, 299.95), List(113027, 502, 200.0), List(113026, 502, 200.0)), COMPLETE), List(2014-07-15, 56133, List(List(140389, 1014, 149.94), List(140388, 1014, 99.96), List(140387, 627, 39.99), List(140386, 897, 124.95)), COMPLETE))"


In [0]:
from pyspark.sql.functions import explode

json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')).filter("order_details.order_date LIKE '2014-01-01'").orderBy('customer_id').select('customer_id','customer_fname','order_details.order_id','order_details.order_status').show(3,truncate=False)

+-----------+--------------+--------+------------+
|customer_id|customer_fname|order_id|order_status|
+-----------+--------------+--------+------------+
|206        |Mary          |25966   |CLOSED      |
|279        |Anna          |25918   |COMPLETE    |
|363        |Jennifer      |25980   |COMPLETE    |
+-----------+--------------+--------+------------+
only showing top 3 rows



####Compute the monthly customer Revenue

In [0]:
from pyspark.sql.functions import col

flatten=json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
select('customer_id','customer_fname',col('order_details.order_date').alias('order_date'),col('order_details.order_id').alias('order_id'),col('order_details.order_status').alias('order_status'),explode('order_details.order_item_details').alias('order_item_details')). \
select('customer_id','customer_fname','order_date','order_id','order_status','order_item_details.order_item_id','order_item_details.order_item_product_id','order_item_details.order_item_subtotal')


In [0]:
flatten.show(10)

+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|customer_id|customer_fname|order_date|order_id|   order_status|order_item_id|order_item_product_id|order_item_subtotal|
+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|          1|       Richard|2013-12-13|   22945|       COMPLETE|        57439|                  191|             499.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145023|                 1014|             149.94|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145022|                 1014|              99.96|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145021|                  627|             199.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145020|                 1073|             199.99|
|          2|          Mary|2013

In [0]:
from pyspark.sql.functions import to_date,date_format
from pyspark.sql import Row
from pyspark.sql.functions import sum as _sum

flatten.select('customer_id','customer_fname',col("order_date"),to_date(col("order_date"),"yyyy-MM-dd").alias("order_date_converted"),'order_status','order_item_subtotal'). \
filter("order_status IN ('COMPLETE','CLOSED')"). \
groupBy('customer_id','customer_fname',date_format('order_date_converted','yyyy-MM').alias('order_month')). \
agg(_sum('order_item_subtotal').alias('Revenue')). \
orderBy('order_month'). \
show()

+-----------+--------------+-----------+------------------+
|customer_id|customer_fname|order_month|           Revenue|
+-----------+--------------+-----------+------------------+
|       1478|          Anna|    2013-07|           1784.76|
|       1180|          Mary|    2013-07|           1129.94|
|         16|       Tiffany|    2013-07|             39.99|
|       2418|         Helen|    2013-07|1099.8400000000001|
|        943|          John|    2013-07| 829.8900000000001|
|       1104|         Linda|    2013-07|            699.96|
|       1265|        Albert|    2013-07|            199.99|
|        965|          Sean|    2013-07|494.95000000000005|
|       1932|       Shirley|    2013-07| 929.9100000000001|
|        121|          Mary|    2013-07| 609.9300000000001|
|         66|          Mary|    2013-07| 749.9300000000001|
|       2129|       William|    2013-07|            589.91|
|        137|      Jonathan|    2013-07|229.98000000000002|
|       2321|          Mary|    2013-07|