In [0]:
%fs ls dbfs:/FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/,Data/,0,0
dbfs:/FileStore/tables/customers.txt,customers.txt,953719,1727672700000
dbfs:/FileStore/tables/order_items.txt,order_items.txt,5408880,1727672703000
dbfs:/FileStore/tables/orders.txt,orders.txt,2999944,1727672703000


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
# Define the schema for the customers DataFrame
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("customer_password", StringType(), True),
    StructField("customer_street", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_zipcode", IntegerType(), True)
])

# Define the schema for the orders DataFrame
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("order_customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

# Define the schema for the order items DataFrame
order_items_schema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_item_order_id", IntegerType(), True),
    StructField("order_item_product_id", IntegerType(), True),
    StructField("order_item_quantity", IntegerType(), True),
    StructField("order_item_subtotal", FloatType(), True),
    StructField("order_item_product_price", FloatType(), True)
])


In [0]:
customer_df = spark.read.format("csv").schema(customer_schema).load("/FileStore/tables/customers.txt")

In [0]:
customer_df.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|             725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

In [0]:
orders_df =  spark.read.format("csv").schema(orders_schema).load("/FileStore/tables/orders.txt")

In [0]:
orders_df.show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-25|              256|PENDING_PAYMENT|
|       3|2013-07-25|            12111|       COMPLETE|
|       4|2013-07-25|             8827|         CLOSED|
|       5|2013-07-25|            11318|       COMPLETE|
|       6|2013-07-25|             7130|       COMPLETE|
|       7|2013-07-25|             4530|       COMPLETE|
|       8|2013-07-25|             2911|     PROCESSING|
|       9|2013-07-25|             5657|PENDING_PAYMENT|
|      10|2013-07-25|             5648|PENDING_PAYMENT|
|      11|2013-07-25|              918| PAYMENT_REVIEW|
|      12|2013-07-25|             1837|         CLOSED|
|      13|2013-07-25|             9149|PENDING_PAYMENT|
|      14|2013-07-25|             9842|     PROCESSING|
|      15|2013-07-25|             2568|       CO

In [0]:
order_items_df =  spark.read.format("csv").schema(order_items_schema).load("/FileStore/tables/order_items.txt")

In [0]:
order_items_df.show()

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
|            6| 

In [0]:
customer_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: integer (nullable = true)



In [0]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [0]:
order_items_df.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)



In [0]:
from pyspark.sql import functions as F

In [0]:
# Join customer and orders DataFrames
customers_orders_df = customer_df.join(orders_df, customer_df['customer_id'] == orders_df['order_customer_id'])

In [0]:
customers_orders_df.show(10, truncate=False)

+-----------+--------------+--------------+--------------+-----------------+-------------------------+-------------+--------------+----------------+--------+----------+-----------------+---------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street          |customer_city|customer_state|customer_zipcode|order_id|order_date|order_customer_id|order_status   |
+-----------+--------------+--------------+--------------+-----------------+-------------------------+-------------+--------------+----------------+--------+----------+-----------------+---------------+
|11599      |Mary          |Malone        |XXXXXXXXX     |XXXXXXXXX        |8708 Indian Horse Highway|Hickory      |NC            |28601           |1       |2013-07-25|11599            |CLOSED         |
|256        |David         |Rodriguez     |XXXXXXXXX     |XXXXXXXXX        |7605 Tawny Horse Falls   |Chicago      |IL            |60625           |2       |2013-07-25|256              |PE

In [0]:
# Select and show specific order details
customers_orders_df.select('customer_id', 'order_id', 'order_date', 'order_status').orderBy('customer_id').show(10)

+-----------+--------+----------+---------------+
|customer_id|order_id|order_date|   order_status|
+-----------+--------+----------+---------------+
|          1|   22945|2013-12-13|       COMPLETE|
|          2|   33865|2014-02-18|       COMPLETE|
|          2|   15192|2013-10-29|PENDING_PAYMENT|
|          2|   57963|2013-08-02|        ON_HOLD|
|          2|   67863|2013-11-30|       COMPLETE|
|          3|   35158|2014-02-26|       COMPLETE|
|          3|   57617|2014-07-24|       COMPLETE|
|          3|   22646|2013-12-11|       COMPLETE|
|          3|   23662|2013-12-19|       COMPLETE|
|          3|   56178|2014-07-15|        PENDING|
+-----------+--------+----------+---------------+
only showing top 10 rows



In [0]:
# Create a struct for order details
order_details_df = customers_orders_df.select('customer_id',F.struct('order_id', 'order_date', 'order_status').alias('order_details')).orderBy('customer_id')

In [0]:
order_details_df.show(10)

+-----------+--------------------+
|customer_id|       order_details|
+-----------+--------------------+
|          1|{22945, 2013-12-1...|
|          2|{33865, 2014-02-1...|
|          2|{15192, 2013-10-2...|
|          2|{57963, 2013-08-0...|
|          2|{67863, 2013-11-3...|
|          3|{35158, 2014-02-2...|
|          3|{57617, 2014-07-2...|
|          3|{22646, 2013-12-1...|
|          3|{23662, 2013-12-1...|
|          3|{56178, 2014-07-1...|
+-----------+--------------------+
only showing top 10 rows



In [0]:
# Aggregate order details per customer
final_df = order_details_df.groupBy('customer_id').agg(F.collect_list('order_details').alias('order_details')).orderBy('customer_id')

In [0]:
final_df.show(2, truncate=False)

+-----------+----------------------------------------------------------------------------------------------------------------------------------+
|customer_id|order_details                                                                                                                     |
+-----------+----------------------------------------------------------------------------------------------------------------------------------+
|1          |[{22945, 2013-12-13, COMPLETE}]                                                                                                   |
|2          |[{15192, 2013-10-29, PENDING_PAYMENT}, {33865, 2014-02-18, COMPLETE}, {57963, 2013-08-02, ON_HOLD}, {67863, 2013-11-30, COMPLETE}]|
+-----------+----------------------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows



In [0]:

final_df.coalesce(1).write.mode("overwrite").json('dbfs:/FileStore/tables/Data/final')

In [0]:
%fs ls dbfs:/FileStore/tables/Data/final

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/final/_SUCCESS,_SUCCESS,0,1727679259000
dbfs:/FileStore/tables/Data/final/_committed_5690608636388892642,_committed_5690608636388892642,114,1727678070000
dbfs:/FileStore/tables/Data/final/_committed_6190298265112221589,_committed_6190298265112221589,214,1727678817000
dbfs:/FileStore/tables/Data/final/_committed_8323686024413630521,_committed_8323686024413630521,203,1727679259000
dbfs:/FileStore/tables/Data/final/_started_5690608636388892642,_started_5690608636388892642,0,1727678069000
dbfs:/FileStore/tables/Data/final/_started_6190298265112221589,_started_6190298265112221589,0,1727678816000
dbfs:/FileStore/tables/Data/final/_started_8323686024413630521,_started_8323686024413630521,0,1727679258000
dbfs:/FileStore/tables/Data/final/part-00000-tid-8323686024413630521-63ddcab8-4f31-4d08-a989-0a9b4ea66199-265-1-c000.json,part-00000-tid-8323686024413630521-63ddcab8-4f31-4d08-a989-0a9b4ea66199-265-1-c000.json,5475226,1727679258000


In [0]:
# Join customer details with order items
customer_details = customer_df.join(orders_df, customer_df['customer_id'] == orders_df['order_customer_id']).join(order_items_df, orders_df['order_id'] == order_items_df['order_item_order_id'])

In [0]:
# Denormalize data
denorm_df = (customer_details
    .select(
        'customer_id', 
        'customer_fname', 
        'customer_lname', 
        'customer_email',
        'order_id', 
        'order_date', 
        'order_status',
        F.struct('order_item_id', 'order_item_product_id', 'order_item_subtotal').alias('order_item_details')
    )
    .groupBy(
        'customer_id', 
        'customer_fname', 
        'customer_lname', 
        'customer_email', 
        'order_id', 
        'order_date', 
        'order_status'
    )
    .agg(F.collect_list('order_item_details').alias('order_item_details'))
    .select(
        'customer_id', 
        'customer_fname', 
        'customer_lname', 
        'customer_email',
        F.struct('order_id', 'order_date', 'order_status', 'order_item_details').alias('order_details')
    )
    .groupBy('customer_id', 'customer_fname', 'customer_lname', 'customer_email')
    .agg(F.collect_list('order_details').alias('order_details'))
    .orderBy('customer_id')
)

In [0]:
denorm_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- order_details: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- order_id: integer (nullable = true)
 |    |    |-- order_date: date (nullable = true)
 |    |    |-- order_status: string (nullable = true)
 |    |    |-- order_item_details: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- order_item_id: integer (nullable = true)
 |    |    |    |    |-- order_item_product_id: integer (nullable = true)
 |    |    |    |    |-- order_item_subtotal: float (nullable = true)



In [0]:

denorm_df.coalesce(1).write.mode('overwrite').json('dbfs:/FileStore/tables/Data/denorm')

In [0]:
%fs ls dbfs:/FileStore/tables/Data/denorm

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/denorm/_SUCCESS,_SUCCESS,0,1727679271000
dbfs:/FileStore/tables/Data/denorm/_committed_3019955075359224903,_committed_3019955075359224903,203,1727679271000
dbfs:/FileStore/tables/Data/denorm/_committed_6507255124505320869,_committed_6507255124505320869,214,1727678829000
dbfs:/FileStore/tables/Data/denorm/_committed_7516740248076485415,_committed_7516740248076485415,114,1727678291000
dbfs:/FileStore/tables/Data/denorm/_started_3019955075359224903,_started_3019955075359224903,0,1727679269000
dbfs:/FileStore/tables/Data/denorm/_started_6507255124505320869,_started_6507255124505320869,0,1727678827000
dbfs:/FileStore/tables/Data/denorm/_started_7516740248076485415,_started_7516740248076485415,0,1727678289000
dbfs:/FileStore/tables/Data/denorm/part-00000-tid-3019955075359224903-486ecdd6-4482-41c6-bc79-d9cdae15a218-284-1-c000.json,part-00000-tid-3019955075359224903-486ecdd6-4482-41c6-bc79-d9cdae15a218-284-1-c000.json,20949617,1727679271000


In [0]:
# Read the JSON file
json_df = spark.read.format("json").load('dbfs:/FileStore/tables/Data/denorm/')

In [0]:
json_df.show()

+--------------+--------------+-----------+--------------+--------------------+
|customer_email|customer_fname|customer_id|customer_lname|       order_details|
+--------------+--------------+-----------+--------------+--------------------+
|     XXXXXXXXX|       Richard|          1|     Hernandez|[{2013-12-13, 229...|
|     XXXXXXXXX|          Mary|          2|       Barrett|[{2013-08-02, 579...|
|     XXXXXXXXX|           Ann|          3|         Smith|[{2014-07-15, 561...|
|     XXXXXXXXX|          Mary|          4|         Jones|[{2014-05-28, 493...|
|     XXXXXXXXX|        Robert|          5|        Hudson|[{2014-03-06, 364...|
|     XXXXXXXXX|          Mary|          6|         Smith|[{2013-09-09, 748...|
|     XXXXXXXXX|       Melissa|          7|        Wilcox|[{2014-03-01, 355...|
|     XXXXXXXXX|         Megan|          8|         Smith|[{2013-09-16, 849...|
|     XXXXXXXXX|          Mary|          9|         Perez|[{2014-07-03, 543...|
|     XXXXXXXXX|       Melissa|         

In [0]:
# Flatten the nested JSON structure and extract order and order item details
flatten = (json_df
    .select('customer_id', 'customer_fname',
            F.explode('order_details').alias('order_details'))  # Exploding order details
    .select('customer_id', 'customer_fname',
            F.col('order_details.order_date').alias('order_date'),
            F.col('order_details.order_id').alias('order_id'),
            F.col('order_details.order_status').alias('order_status'),
            F.explode('order_details.order_item_details').alias('order_item_details'))  # Exploding order item details
    .select('customer_id', 'customer_fname', 'order_date', 'order_id', 
            'order_status', 'order_item_details.order_item_id',
            'order_item_details.order_item_product_id', 
            'order_item_details.order_item_subtotal')
)

In [0]:
flatten.show(10)

+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|customer_id|customer_fname|order_date|order_id|   order_status|order_item_id|order_item_product_id|order_item_subtotal|
+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|          1|       Richard|2013-12-13|   22945|       COMPLETE|        57439|                  191|             499.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145023|                 1014|             149.94|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145022|                 1014|              99.96|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145021|                  627|             199.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145020|                 1073|             199.99|
|          2|          Mary|2013

In [0]:
# Calculate revenue grouped by order month
revenue_group_df = flatten.select('customer_id', 'customer_fname', 'order_date', 
               F.to_date('order_date', "yyyy-MM-dd").alias("order_date_converted"),
               'order_status', 'order_item_subtotal') \
    .filter("order_status IN ('COMPLETE', 'CLOSED')") \
    .groupBy(F.date_format('order_date_converted', 'yyyy-MM').alias('order_month')) \
    .agg(F.sum('order_item_subtotal').alias('Revenue')) \
    .orderBy('order_month')

In [0]:
revenue_group_df.show(10)

+-----------+------------------+
|order_month|           Revenue|
+-----------+------------------+
|    2013-07| 333465.4500000003|
|    2013-08|1221828.9000000027|
|    2013-09|1302255.8000000028|
|    2013-10|1171686.9200000018|
|    2013-11|1379935.3300000008|
|    2013-12| 1277719.600000003|
|    2014-01|1230221.7400000019|
|    2014-02|1217770.0900000029|
|    2014-03|1271350.9700000028|
|    2014-04|1249723.5200000028|
+-----------+------------------+
only showing top 10 rows

