In [208]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import date_format, avg, max, desc


# Creating spark session

In [209]:
scSpark = SparkSession.builder.appName("Store Analysis").getOrCreate()

# Reading , Merging and Formatting our csv files

In [231]:
transactions_merged_df = scSpark.read.csv("./store_transactions/transactions_*.csv", inferSchema=True, header=True)

customers_df = scSpark.read.csv("customers.csv", inferSchema=True, header=True)
products_df = scSpark.read.csv("products.csv", inferSchema=True, header=True)

# Printing our data types of our columns

In [211]:
transactions_merged_df.printSchema()
products_df.printSchema()
customers_df.printSchema()

root
 |-- StoreId: integer (nullable = true)
 |-- TransactionId: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- ProductId: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- TransactionTime: timestamp (nullable = true)

root
 |-- ProductId: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- UnitPrice: double (nullable = true)

root
 |-- CustomerId: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)



# Displaying our processed File

In [212]:
transactions_merged_df.limit(5).show()
products_df.limit(5).show()
customers_df.limit(5).show()

+-------+-------------+----------+---------+--------+-------------------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|
+-------+-------------+----------+---------+--------+-------------------+
|      3|          454|        35|        3|       3|2022-12-23 17:36:11|
|      3|          524|        37|        9|      11|2022-12-23 22:02:51|
|      3|          562|         4|        3|       4|2022-12-23 02:51:50|
|      3|          581|        35|       14|      56|2022-12-23 17:05:54|
|      3|          200|        34|       15|      24|2022-12-23 07:15:01|
+-------+-------------+----------+---------+--------+-------------------+

+---------+------------+--------+---------+
|ProductId|        Name|Category|UnitPrice|
+---------+------------+--------+---------+
|        1|  Red Shorts|  Shorts|    89.75|
|        2|White Shorts|  Shorts|    89.27|
|        3| Blue Shorts|  Shorts|   118.88|
|        4|Green Shorts|  Shorts|   121.43|
|        5|Black Shorts| 

# What are the daily total sales for the store with id 1?

In [215]:
# filtering out only the rows with storeId 1 from transactions table and then selecting productid, quantity, Transactiontime columns from it
df_tran_store1 = transactions_merged_df.filter(transactions_merged_df.StoreId ==1).select("ProductId","Quantity","TransactionTime")

# this line will add the column with only date(removing time) and reformatting the date to string.
df_tran_store1 = df_tran_store1.withColumn("TransactionTime", date_format("TransactionTime" ,"yyyy-MM-dd"))

# join the products table column product ID and unit price, on product id which will state the rows in parallel
# manner according to the product id. using inner will only state those rows matched on transaction store productid

df = df_tran_store1.join(products_df.select("ProductId","UnitPrice"), on = "ProductId", how = "inner")

# will add the column Total sales with vaulues of ,product of unit price and quantity which will gives us sales of every row specified
df = df.withColumn("Total_SALES", col("UnitPrice").cast("double") * col("Quantity").cast("double"))

# this will give the output we want by grouping the dates and calculating the sum of daily sales of the same date and naming it 
#Total daily sales

daily_sales = df.groupBy("TransactionTime").agg(sum("Total_SALES").alias("Total Daily Sales"))
#df.printSchema()
#df.show()
daily_sales.show()
#df.count()

+---------------+------------------+
|TransactionTime| Total Daily Sales|
+---------------+------------------+
|     2022-12-23|41264.000000000015|
+---------------+------------------+



As we can see our total daily sales are listed above. AS there was only one day sales of store id 1 in the original data so it would list only 1 day sales with its specified Date

# What are the mean sales for the store with id 2?

In [216]:
# almost all the steps are same just some changes are made changes the table and the store id etc. 

df_tran_store2 = transactions_merged_df.filter(transactions_merged_df.StoreId ==2).select("ProductId","Quantity","TransactionTime")
df_tran_store2 = df_tran_store2.withColumn("TransactionTime", date_format("TransactionTime" ,"yyyy-MM-dd"))

df1 = df_tran_store2.join(products_df.select("ProductId","UnitPrice"), on = "ProductId", how = "inner")
df1 = df1.withColumn("Total_SALES", col("UnitPrice").cast("double") * col("Quantity").cast("double"))

# this time we will us avg function to get avg sales on each day
Avg_sale = df1.groupBy("TransactionTime").agg(avg("Total_SALES").alias("Daily Avg Sales"))
Avg_sale.show()

+---------------+-----------------+
|TransactionTime|  Daily Avg Sales|
+---------------+-----------------+
|     2022-12-23|513.4598039215689|
+---------------+-----------------+



As we can see our Avg daily sales are listed above. AS there was only one day sales of store id 2 in the original data so it would list only 1 day sales with its specified Date.

# What is the email of the client who spent the most when summing up purchases from all of the stores?

In [233]:
# almost all the steps are same just some changes are made changes the table and the store id etc. 

df_trans_all = transactions_merged_df.select("ProductId","Quantity","CustomerId")

df2 = df_trans_all.join(customers_df.select("CustomerId","Email"), on = "CustomerId", how = "inner")
df2 = df2.join(products_df.select("ProductId","UnitPrice"), on = "ProductId", how = "inner")
df2 = df2.withColumn("Customer_purchases", col("UnitPrice").cast("double") * col("Quantity").cast("double"))

# this time we have group by customer id, summing the customer purchases, which was also the same product of price and quantiy to get 
# the sales price of every row
customer_purchases = df2.groupBy("CustomerId","Email").agg(sum("Customer_purchases").alias("customer_purchases"))

# from customer purchases after grouping, we are ordering by descing order and get the email of that person with the highest value or highes purchases
highest_val = customer_purchases.orderBy(desc("customer_purchases")).first()["Email"]


#customer_purchases.show(100)
highest_val

'dwayne.johnson@gmail.com'

dwayne.johnson@gmail.com is the email of the client spent the most summing up from all stores. spent 10653.08 on his total purchases

# Which 5 products are most frequently bought across all stores?

In [234]:
# almost all the steps are same just some changes are made changes the table and the store id etc. 
# selecting all the necessary columns as done always matching and comparing on transaction table product id 
df3 = df_trans_all.join(products_df.select("ProductId","Name"), on = "ProductId", how = "inner")

#this will group the transaction on id add a column count counting the number of transactions
product_count = df3.groupBy('ProductId','Name').count()

# this will arrange in descending order
top_products = product_count.orderBy(desc("count"))
# and will retrieve top 5 rows which will be 5 most frequently bought
top_5_products = top_products.limit(5)

top_5_products.show()

+---------+-------------+-----+
|ProductId|         Name|count|
+---------+-------------+-----+
|        2| White Shorts|   20|
|        5| Black Shorts|    9|
|       19| Green jacket|    9|
|       15|White t-shirt|    8|
|        1|   Red Shorts|    7|
+---------+-------------+-----+



These 5 products are most frequently bought accross all stores.