In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import sum, mean, desc, col


In [2]:
scSpark = SparkSession.builder.appName("Spark Example").getOrCreate()

In [3]:
transactions_merged_df = scSpark.read.csv("transactions_*.csv", header=True)
products_df = scSpark.read.csv("products.csv", header=True)
customers_df = scSpark.read.csv("customers.csv", header=True)

In [4]:
transactions_merged_df.limit(5).show()

+-------+-------------+----------+---------+--------+-------------------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|
+-------+-------------+----------+---------+--------+-------------------+
|      3|          454|        35|        3|       3|2022-12-23 17:36:11|
|      3|          524|        37|        9|      11|2022-12-23 22:02:51|
|      3|          562|         4|        3|       4|2022-12-23 02:51:50|
|      3|          581|        35|       14|      56|2022-12-23 17:05:54|
|      3|          200|        34|       15|      24|2022-12-23 07:15:01|
+-------+-------------+----------+---------+--------+-------------------+



In [5]:
products_df.limit(5).show()

+---------+------------+--------+---------+
|ProductId|        Name|Category|UnitPrice|
+---------+------------+--------+---------+
|        1|  Red Shorts|  Shorts|    89.75|
|        2|White Shorts|  Shorts|    89.27|
|        3| Blue Shorts|  Shorts|   118.88|
|        4|Green Shorts|  Shorts|   121.43|
|        5|Black Shorts|  Shorts|    74.58|
+---------+------------+--------+---------+



In [6]:
customers_df.limit(5).show()

+----------+--------------+--------------------+
|CustomerId|          Name|               Email|
+----------+--------------+--------------------+
|         1|Emilia Pedraza|emilia.pedraza@ex...|
|         2|  Thies Blümel|thies.blumel@exam...|
|         3| بهاره علیزاده|bhrh.aalyzdh@exam...|
|         4| Alevtin Paska|alevtin.paska@exa...|
|         5|Charlotte Wong|charlotte.wong@ex...|
+----------+--------------+--------------------+



In [7]:
joined_tran_pro_cus_df = transactions_merged_df.join(customers_df, on='CustomerId', how='inner').join(products_df, on='ProductId', how='inner')

In [8]:
joined_tran_pro_cus_df.limit(5).show()

+---------+----------+-------+-------------+--------+-------------------+--------------+--------------------+-------------+--------+---------+
|ProductId|CustomerId|StoreId|TransactionId|Quantity|    TransactionTime|          Name|               Email|         Name|Category|UnitPrice|
+---------+----------+-------+-------------+--------+-------------------+--------------+--------------------+-------------+--------+---------+
|        3|        35|      3|          454|       3|2022-12-23 17:36:11|Dwayne Johnson|dwayne.johnson@gm...|  Blue Shorts|  Shorts|   118.88|
|        9|        37|      3|          524|      11|2022-12-23 22:02:51| Brittany Holt|brittany.holt@exa...|Green Sandals|   Shoes|   137.53|
|        3|         4|      3|          562|       4|2022-12-23 02:51:50| Alevtin Paska|alevtin.paska@exa...|  Blue Shorts|  Shorts|   118.88|
|       14|        35|      3|          581|      56|2022-12-23 17:05:54|Dwayne Johnson|dwayne.johnson@gm...|  Red t-shirt|T-Shirts|   121.58|

In [9]:
# Convert TransactionTime column to DateType
df = joined_tran_pro_cus_df.withColumn("TransactionTime", joined_tran_pro_cus_df["TransactionTime"].cast(DateType()))

In [10]:
# Convert Quantity and UnitPrice columns to appropriate data types
df = df.withColumn("Quantity", df["Quantity"].cast(IntegerType()))
df = df.withColumn("UnitPrice", df["UnitPrice"].cast(IntegerType()))

# Calculate total sales for each transaction
sales_df = df.withColumn("Sales", df["Quantity"] * df["UnitPrice"])

# Group by TransactionTime and calculate daily total sales
daily_total_sales_StoreId_1 = sales_df.filter(df["StoreId"] == 1).groupBy("TransactionTime").agg(sum("Sales").alias("TotalSales"))

# Show the result
daily_total_sales_StoreId_1.show()

+---------------+----------+
|TransactionTime|TotalSales|
+---------------+----------+
|     2022-12-23|     41070|
+---------------+----------+



In [11]:
# Group by TransactionTime and calculate daily total sales
mean_sales_StoreId_2 = sales_df.filter(df["StoreId"] == 2).agg(mean("Sales").alias("TotalSales"))

# Show the result
mean_sales_StoreId_2.show()

+----------------+
|      TotalSales|
+----------------+
|511.921568627451|
+----------------+



In [12]:
# Group by CustomerId and calculate the total purchase amount for each customer
customer_total_purchase = sales_df.groupBy("CustomerId").agg(sum("Sales").alias("TotalPurchase"))

In [13]:
# Sort the data in descending order of the total purchase amount
customer_total_purchase = customer_total_purchase.orderBy(desc("TotalPurchase")).limit(1)


In [14]:
# Get the email of the customer who spent the most
most_spent_customer_email =customer_total_purchase.join(customers_df, on='CustomerId', how='inner').select("CustomerId", "TotalPurchase", "Email")
most_spent_customer_email.show()

+----------+-------------+--------------------+
|CustomerId|TotalPurchase|               Email|
+----------+-------------+--------------------+
|        35|        10598|dwayne.johnson@gm...|
+----------+-------------+--------------------+



In [15]:
frequently_bought_products = joined_tran_pro_cus_df.groupBy("ProductId", products_df["Name"]).agg(sum("Quantity").alias("TotalQuantity")).orderBy(desc("TotalQuantity")).limit(5)
frequently_bought_products.show()

+---------+-------------+-------------+
|ProductId|         Name|TotalQuantity|
+---------+-------------+-------------+
|       14|  Red t-shirt|         82.0|
|       24|   Blue Jeans|         77.0|
|       15|White t-shirt|         76.0|
|        5| Black Shorts|         75.0|
|       19| Green jacket|         74.0|
+---------+-------------+-------------+

