In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum,desc,countDistinct,weekofyear,date_format,avg


spark = SparkSession.builder \
    .appName("Retail System Analysis") \
    .enableHiveSupport() \
    .getOrCreate() 


In [7]:
# read data
ordersDf = spark.read.json("hdfs:///RetailSys_Staging_zone/orders.json")
productsDf = spark.read.json("hdfs:///RetailSys_Staging_zone/products.json")
salesDf = spark.read.json("hdfs:///RetailSys_Staging_zone/sales.json")


In [4]:
ordersDf.show(5)

                                                                                

+-------+-----------+--------------------+---------+------------+-----------+
|channel|customer_id|          order_date| order_id|total_amount|total_items|
+-------+-----------+--------------------+---------+------------+-----------+
|instore|  cust_0331|2025-10-01T00:00:00Z|ord_00001|      318.09|          2|
| online|  cust_0097|2025-10-01T00:00:00Z|ord_00002|      506.02|          3|
| online|  cust_1649|2025-10-05T00:00:00Z|ord_00003|      382.21|          5|
|instore|  cust_1093|2025-10-10T00:00:00Z|ord_00004|      952.95|          3|
|instore|  cust_1129|2025-10-14T00:00:00Z|ord_00005|      600.28|          2|
+-------+-----------+--------------------+---------+------------+-----------+
only showing top 5 rows



In [8]:
# a) Total revenue per product 

sales_with_revenue = salesDf.withColumn("revenue", col("price") * col("quantity"))

# Join with products to include product name and id 
product_revenue = sales_with_revenue.join(productsDf, "product_id", "inner") \
    .groupBy("product_id", "name") \
    .agg(_sum("revenue").alias("total_revenue")) \
    .orderBy(col("total_revenue").desc())

product_revenue.show(5)

                                                                                

+----------+-------------------+------------------+
|product_id|               name|     total_revenue|
+----------+-------------------+------------------+
|  prod_082|   Footwear Item 82|187030.55999999997|
|  prod_013|       Home Item 13|         171292.21|
|  prod_076|Electronics Item 76|         168699.96|
|  prod_085|       Home Item 85|160949.61999999997|
|  prod_018|    Apparel Item 18|          158062.3|
+----------+-------------------+------------------+
only showing top 5 rows



In [9]:
# b) Revenue per channel 
channel_revenue = sales_with_revenue.groupBy("channel") \
    .agg(_sum("revenue").alias("channel_revenue")).orderBy(col("channel_revenue").desc())

channel_revenue.show()


                                                                                

+-------+------------------+
|channel|   channel_revenue|
+-------+------------------+
|instore| 3977718.100000008|
| online|3899408.3400000017|
+-------+------------------+



In [13]:
# c) Daily sales revenue 
daily_revenue = sales_with_revenue.join(ordersDf, "order_id", "inner") \
    .groupBy("order_date") \
    .agg(_sum("revenue").alias("daily_revenue")) \
    .orderBy("order_date")
daily_revenue.show()




+--------------------+------------------+
|          order_date|     daily_revenue|
+--------------------+------------------+
|2025-10-01T00:00:00Z|176439.01999999993|
|2025-10-02T00:00:00Z| 184181.8000000001|
|2025-10-03T00:00:00Z|184849.48000000004|
|2025-10-04T00:00:00Z|154558.65000000008|
|2025-10-05T00:00:00Z|180879.57000000007|
|2025-10-06T00:00:00Z|         158469.31|
|2025-10-07T00:00:00Z|185637.04999999996|
|2025-10-08T00:00:00Z|210891.19999999998|
|2025-10-09T00:00:00Z|         175363.28|
|2025-10-10T00:00:00Z| 175994.7600000001|
|2025-10-11T00:00:00Z|         135857.48|
|2025-10-12T00:00:00Z|198013.79000000012|
|2025-10-13T00:00:00Z|200232.56999999998|
|2025-10-14T00:00:00Z|234739.06999999998|
|2025-10-15T00:00:00Z|192677.19000000006|
|2025-10-16T00:00:00Z| 217754.6900000001|
|2025-10-17T00:00:00Z|         157418.83|
|2025-10-18T00:00:00Z|161256.21999999994|
|2025-10-19T00:00:00Z|184489.56000000006|
|2025-10-20T00:00:00Z|216502.96999999988|
+--------------------+------------

                                                                                

In [18]:
# d) Top customers by spending 
top_customers = ordersDf.groupBy("customer_id") \
    .agg(_sum("total_amount").alias("total_per_customer")) \
    .orderBy(desc("total_per_customer")) \
    .limit(5)

top_customers.show()



+-----------+------------------+
|customer_id|total_per_customer|
+-----------+------------------+
|  cust_0646|2030.9900000000002|
|  cust_1243|           1804.23|
|  cust_1499|           1785.53|
|  cust_0465|           1748.54|
|  cust_1125|           1726.79|
+-----------+------------------+



                                                                                

In [21]:

# e) Category-level metrics
category_metrics = sales_with_revenue.join(productsDf, "product_id", "inner") \
    .groupBy("category") \
    .agg(
        _sum("revenue").alias("total_revenue"),
        _sum("quantity").alias("total_units_sold"),
        countDistinct("product_id").alias("num_products")
    ) \
    .orderBy(col("total_revenue").desc())

category_metrics.show(truncate=False)




+-----------+------------------+----------------+------------+
|category   |total_revenue     |total_units_sold|num_products|
+-----------+------------------+----------------+------------+
|Home       |1872506.72        |7408            |25          |
|Footwear   |1795285.2800000003|6380            |21          |
|Electronics|1526560.33        |6367            |22          |
|Accessories|1348166.02        |4899            |16          |
|Apparel    |1334608.09        |4765            |16          |
+-----------+------------------+----------------+------------+



                                                                                

In [24]:
from pyspark.sql.functions import to_timestamp
#i transformed date from string to timestamp in order to be able to apply to it weekofYear and dateformat methods
ordersdf = ordersdf.withColumn("order_date", to_timestamp("order_date"))
weekly_revenue = sales_with_revenue.join(ordersdf, "order_id", "inner") \
    .groupBy(weekofyear(col("order_date")).alias("week_number")) \
    .agg(_sum("revenue").alias("weekly_revenue")) \
    .orderBy("week_number")

weekly_revenue.show(truncate=False)



+-----------+------------------+
|week_number|weekly_revenue    |
+-----------+------------------+
|40         |880908.5200000003 |
|41         |1240226.8699999985|
|42         |1348568.1300000031|
|43         |430351.60999999964|
+-----------+------------------+



                                                                                

In [27]:
#G) Top selling products 
top_selling_products = sales_with_revenue.join(productsdf, "product_id", "inner") \
    .groupBy("product_id", "name", "category") \
    .agg(_sum("revenue").alias("total_revenue")) \
    .orderBy(desc("total_revenue")) \
    .limit(3)
    

top_selling_products.show(truncate=False)



+----------+-------------------+-----------+------------------+
|product_id|name               |category   |total_revenue     |
+----------+-------------------+-----------+------------------+
|prod_082  |Footwear Item 82   |Footwear   |187030.55999999997|
|prod_013  |Home Item 13       |Home       |171292.21         |
|prod_076  |Electronics Item 76|Electronics|168699.96         |
+----------+-------------------+-----------+------------------+



                                                                                

In [29]:
# J) Products with low sales (inventory alert) 
low_selling_products = sales_with_revenue.join(productsdf, "product_id", "inner") \
    .groupBy("product_id", "name", "category") \
    .agg(
        _sum("quantity").alias("total_units_sold"),
        _sum("revenue").alias("total_revenue")
    ) \
    .orderBy(col("total_units_sold").asc()) \
    .limit(5)

low_selling_products.show()




+----------+-------------------+-----------+----------------+-----------------+
|product_id|               name|   category|total_units_sold|    total_revenue|
+----------+-------------------+-----------+----------------+-----------------+
|  prod_012|    Apparel Item 12|    Apparel|             222|31688.11000000001|
|  prod_007|        Home Item 7|       Home|             234|         18648.87|
|  prod_006|        Home Item 6|       Home|             237|75112.13000000003|
|  prod_055|Electronics Item 55|Electronics|             238|90216.30000000002|
|  prod_054|Electronics Item 54|Electronics|             249|         58302.34|
+----------+-------------------+-----------+----------------+-----------------+



                                                                                

In [32]:
# H) Discount analysis 

discount_analysis = sales_with_revenue.join(productsdf, "product_id", "inner") \
    .withColumn("discount_amount", col("base_price") - col("price")) \
    .withColumn("discount_percentage", (col("discount_amount") / col("base_price")) * 100) \
    .groupBy("product_id", "name", "category") \
    .agg(
        avg("discount_percentage").alias("avg_discount_percent"),
        _sum("quantity").alias("total_units_sold"),
        _sum("revenue").alias("total_revenue")
    ) \
    .orderBy(col("avg_discount_percent").desc())

discount_analysis.show(truncate=False)




+----------+-------------------+-----------+--------------------+----------------+------------------+
|product_id|name               |category   |avg_discount_percent|total_units_sold|total_revenue     |
+----------+-------------------+-----------+--------------------+----------------+------------------+
|prod_011  |Accessories Item 11|Accessories|93.32862326732165   |288             |6737.879999999998 |
|prod_080  |Electronics Item 80|Electronics|91.24785955345712   |278             |10619.279999999997|
|prod_040  |Electronics Item 40|Electronics|88.97152790911836   |257             |14015.429999999998|
|prod_060  |Accessories Item 60|Accessories|87.56622145802312   |311             |9180.419999999996 |
|prod_058  |Accessories Item 58|Accessories|85.06251488228109   |287             |19269.870000000006|
|prod_034  |Footwear Item 34   |Footwear   |84.4866491959985    |286             |10772.64          |
|prod_036  |Apparel Item 36    |Apparel    |82.54231052244297   |276             |

                                                                                

In [34]:
spark.sql("CREATE DATABASE IF NOT EXISTS retail_analytics")
spark.sql("USE retail_analytics")
top_customers.write.mode("overwrite").saveAsTable("retail_analytics.top_customers")
discount_analysis.write.mode("overwrite").saveAsTable("retail_analytics.discount_analysis")


2025-10-29 10:05:59,198 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2025-10-29 10:05:59,204 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2025-10-29 10:06:07,872 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
2025-10-29 10:06:07,928 WARN metastore.ObjectStore: Failed to get database retail_analytics, returning NoSuchObjectException
2025-10-29 10:06:17,034 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
2025-10-29 10:06:17,240 WARN conf.HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
2025-10-29 10:06:17,240 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2025-10-29 10:06:17,243 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
                                                       

In [36]:
spark.sql("SHOW TABLES IN retail_analytics").show()


+----------------+-----------------+-----------+
|        database|        tableName|isTemporary|
+----------------+-----------------+-----------+
|retail_analytics|discount_analysis|      false|
|retail_analytics|    top_customers|      false|
+----------------+-----------------+-----------+



In [37]:
spark.sql("SELECT * FROM retail_analytics.top_customers").show()
spark.sql("SELECT * FROM retail_analytics.discount_analysis").show()


+-----------+------------------+
|customer_id|total_per_customer|
+-----------+------------------+
|  cust_0646|2030.9900000000002|
|  cust_1243|           1804.23|
|  cust_1499|           1785.53|
|  cust_0465|           1748.54|
|  cust_1125|           1726.79|
+-----------+------------------+



[Stage 54:>                                                         (0 + 1) / 1]

+----------+-------------------+-----------+--------------------+----------------+------------------+
|product_id|               name|   category|avg_discount_percent|total_units_sold|     total_revenue|
+----------+-------------------+-----------+--------------------+----------------+------------------+
|  prod_011|Accessories Item 11|Accessories|   93.32862326732165|             288| 6737.879999999998|
|  prod_080|Electronics Item 80|Electronics|   91.24785955345712|             278|10619.279999999997|
|  prod_040|Electronics Item 40|Electronics|   88.97152790911836|             257|14015.429999999998|
|  prod_060|Accessories Item 60|Accessories|   87.56622145802312|             311| 9180.419999999996|
|  prod_058|Accessories Item 58|Accessories|   85.06251488228109|             287|19269.870000000006|
|  prod_052|Electronics Item 52|Electronics|   70.28705014731486|             253|          36630.87|
|  prod_087|Electronics Item 87|Electronics|   61.26321428508491|             274|

                                                                                