In [0]:
'''Import functions from pyspark.sql.functions'''
from pyspark.sql.functions import col, count, avg, sum as spark_sum, desc, when, year, spark_partition_id

In [0]:
'''Read the customers, orders, order_items, products, and product_reviews tables in as spark dataframes''' 
customers_df = spark.read.csv("/Volumes/workspace/default/synthetic_e_commerce_data/customers.csv", header=True, inferSchema=True)
orders_df = spark.read.csv("/Volumes/workspace/default/synthetic_e_commerce_data/orders.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv("/Volumes/workspace/default/synthetic_e_commerce_data/order_items.csv", header=True, inferSchema=True)
products_df = spark.read.csv("/Volumes/workspace/default/synthetic_e_commerce_data/products.csv", header=True, inferSchema=True)
product_reviews_df = spark.read.csv("/Volumes/workspace/default/synthetic_e_commerce_data/product_reviews.csv", header=True, inferSchema=True)

customers_df.show(5)
orders_df.show(5)
order_items_df.show(5)
products_df.show(5)
product_reviews_df.show(5)

+-----------+---------------+--------------------+------+-----------+----------+
|customer_id|           name|               email|gender|signup_date|   country|
+-----------+---------------+--------------------+------+-----------+----------+
|          1|   Allison Hill|donaldgarcia@exam...| Other| 2025-02-15|     Benin|
|          2|   Amanda Davis|williamjohnson@ex...|  Male| 2024-03-30|   Reunion|
|          3|Connie Lawrence|blakeerik@example...|  Male| 2022-05-13|Cape Verde|
|          4| Phillip Garcia|  wdavis@example.net| Other| 2021-12-26|     Aruba|
|          5|Kimberly Dudley| smiller@example.net|Female| 2023-10-04|   Tokelau|
+-----------+---------------+--------------------+------+-----------+----------+
only showing top 5 rows
+--------+-----------+----------+------------+--------------+----------------+
|order_id|customer_id|order_date|total_amount|payment_method|shipping_country|
+--------+-----------+----------+------------+--------------+----------------+
|       1|

In [0]:
'''Filter the orders dataframe to only include orders made in 2024 with either cash or a credit card'''
filtered_order_pm_df = orders_df.filter((col("payment_method") == "Cash") | ((col("payment_method") == "Credit Card")))
filtered_orders_pm_and_year_df = filtered_order_pm_df.filter(year(col("order_date")) == 2024)
filtered_orders_pm_and_year_df.show(5)

+--------+-----------+----------+------------+--------------+----------------+
|order_id|customer_id|order_date|total_amount|payment_method|shipping_country|
+--------+-----------+----------+------------+--------------+----------------+
|       4|    1919911|2024-10-21|      776.77|   Credit Card|       Sri Lanka|
|       5|     281685|2024-10-26|         0.0|          Cash|      Costa Rica|
|      19|     492078|2024-09-18|     1645.81|   Credit Card|          Belize|
|      28|     395661|2024-04-26|     3786.49|          Cash|      Costa Rica|
|      43|     220998|2024-09-09|     2053.98|          Cash|  Cayman Islands|
+--------+-----------+----------+------------+--------------+----------------+
only showing top 5 rows


In [0]:
'''Merge the customers, order items, product, product review, and filtered orders dataframes into a single dataframe'''
customer_order_df = customers_df.join(filtered_orders_pm_and_year_df, ['customer_id'], 'inner')
customer_order_review_df = customer_order_df.join(product_reviews_df, ['customer_id'], 'inner')
customer_order_review_product_df = customer_order_review_df.join(products_df, ['product_id'], 'inner')
merged_df = customer_order_review_product_df.join(order_items_df,['order_id'],'inner')
merged_df.show(5)

+--------+----------+-----------+-----------------+--------------------+------+-----------+--------------------+----------+------------+--------------+----------------+---------+------+--------------------+-----------+---------------+--------+------+--------------+------+-------------+----------+--------+----------+
|order_id|product_id|customer_id|             name|               email|gender|signup_date|             country|order_date|total_amount|payment_method|shipping_country|review_id|rating|         review_text|review_date|   product_name|category| price|stock_quantity| brand|order_item_id|product_id|quantity|unit_price|
+--------+----------+-----------+-----------------+--------------------+------+-----------+--------------------+----------+------------+--------------+----------------+---------+------+--------------------+-----------+---------------+--------+------+--------------+------+-------------+----------+--------+----------+
|   15277|      1547|    1657261|     Stacy La

In [0]:
'''Create a column for the revenue. Calculate the revenue by taking the product of the quantity of the product that was purchased and the unit price of that product'''
merged_df = merged_df.withColumn('revenue',merged_df.quantity*merged_df.unit_price)
merged_df.select('order_item_id','revenue').show(5)

+-------------+-------+
|order_item_id|revenue|
+-------------+-------+
|         6255|  226.7|
|         9653| 570.12|
|        15775|1628.92|
|        15884| 841.26|
|        26798|1680.56|
+-------------+-------+
only showing top 5 rows


In [0]:
'''Find the 5 products with the highest average rating'''
avg_product_rating = merged_df.select(['product_name','rating']).groupBy('product_name').agg(avg('rating').alias('avg_rating')).orderBy(desc('avg_rating'))
avg_product_rating.show(5)

+----------------+------------------+
|    product_name|        avg_rating|
+----------------+------------------+
|However Director|               3.9|
|      Night Word|3.7718446601941746|
|     Trip Nation| 3.754166666666667|
|   Wide Remember| 3.746192893401015|
|      Wall Adult|3.7358490566037736|
+----------------+------------------+
only showing top 5 rows


In [0]:
'''Show number of partitions'''
num_partitions = merged_df.select(spark_partition_id()).distinct().count()
print(num_partitions)

8


In [0]:
'''Query 1: Find the 5 categories that have the most products in stock'''
merged_df = merged_df.repartition("category")
merged_df.createOrReplaceTempView("merged_df")

most_in_stock_categories = spark.sql("""
    SELECT category, SUM(stock_quantity) AS total_stock_quantity
    FROM merged_df
    GROUP BY category
    ORDER BY total_stock_quantity DESC
    LIMIT 5
""")

display(most_in_stock_categories)

category,total_stock_quantity
Clothing,424487981
Books,421463771
Electronics,420787899
Toys,416349413
Beauty,412862861


In [0]:
'''Explain the query plan'''
most_in_stock_categories.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonTopK(sortOrder=[total_stock_quantity#11812L DESC NULLS LAST], partitionOrderCount=0)
         +- PhotonShuffleExchangeSource
            +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#16884]
               +- PhotonShuffleExchangeSink SinglePartition
                  +- PhotonTopK(sortOrder=[total_stock_quantity#11812L DESC NULLS LAST], partitionOrderCount=0)
                     +- PhotonGroupingAgg(keys=[category#11183], functions=[sum(stock_quantity#11185)])
                        +- PhotonShuffleExchangeSource
                           +- PhotonShuffleMapStage REPARTITION_BY_COL, [id=#16876]
                              +- PhotonShuffleExchangeSink hashpartitioning(category#11183, 1024)
                                 +- PhotonProject [category#11183, stock_quantity#11185]
                                    +- PhotonShuffledHashJoin [order_

In [0]:
'''Write the results to a parquet file'''
most_in_stock_categories.write.mode("overwrite").parquet("/Volumes/workspace/default/synthetic_e_commerce_outputs/most_in_statck_categories")

In [0]:
'''Query 2: Find the 3 products that have generated the most revenue among female customers'''
merged_df = merged_df.repartition("gender", "product_name")
merged_df.createOrReplaceTempView("merged_df")

most_money_generating_products_women = spark.sql("""
    SELECT product_name, SUM(revenue) AS total_revenue
    FROM merged_df
    WHERE gender = 'Female'
    GROUP BY product_name
    ORDER BY total_revenue DESC
    LIMIT 3
""")

display(most_money_generating_products_women)

product_name,total_revenue
Decide Morning,235148.20000000013
Under Break,215501.27
Alone National,198856.4


In [0]:
'''Explain the query plan'''
most_money_generating_products_women.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonTopK(sortOrder=[total_revenue#12068 DESC NULLS LAST], partitionOrderCount=0)
         +- PhotonShuffleExchangeSource
            +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#22185]
               +- PhotonShuffleExchangeSink SinglePartition
                  +- PhotonTopK(sortOrder=[total_revenue#12068 DESC NULLS LAST], partitionOrderCount=0)
                     +- PhotonGroupingAgg(keys=[product_name#11182], functions=[finalmerge_sum(merge sum#12245) AS sum(revenue)#12243])
                        +- PhotonShuffleExchangeSource
                           +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#22177]
                              +- PhotonShuffleExchangeSink hashpartitioning(product_name#11182, 1024)
                                 +- PhotonGroupingAgg(keys=[product_name#11182], functions=[partial_sum(revenue#11563) AS sum#12245])
   

In [0]:
'''Write the results to a parquet file'''
most_money_generating_products_women.write.mode("overwrite").parquet("/Volumes/workspace/default/synthetic_e_commerce_outputs/most_money_generating_products_women")

In [0]:
'''Transformation vs Action'''

# Transformation: Filter the merged dataframe to only include US customers
us_customers = merged_df.filter(merged_df['country'] == 'United States of America')

# Action: Count the number of US customers
us_customers.count()

19876