In [0]:
store_df = spark.read.csv("/Volumes/workspace/retail/retaildata/store_data.csv", header=True, inferSchema=True)
sales_df = spark.read.csv("/Volumes/workspace/retail/retaildata/sales_data.csv", header=True, inferSchema=True)
display(sales_df)
display(store_df)

In [0]:
from pyspark.sql.functions import col, to_date
#Data cleaning sales_data

sales_df = sales_df.fillna({"quantity": 0, "total_amount": 0})
sales_df = sales_df.dropDuplicates()
sales_df = sales_df.filter(col("sale_id").isNotNull())
sales_df = sales_df.filter(col("store_id").isNotNull())
sales_df = sales_df.filter(col("sale_date").isNotNull())
sales_df = sales_df.filter(col("quantity") >= 0)
sales_df = sales_df.filter(col("total_amount") >= 0)


display(sales_df)

In [0]:
from pyspark.sql.functions import avg
#Data cleaning store_data

store_df = store_df.filter(col("store_id").isNotNull())
avg_store_size = store_df.select(avg("store_size")).first()[0]
store_df = store_df.fillna({"store_size": avg_store_size})
store_df = store_df.filter(col("open_date").isNotNull())


display(store_df)

In [0]:
#Data transformation -- join
#combined_df = sales_df.join(store_df, sales_df.store_id == store_df.store_id, "inner")
combined_df = sales_df.join(store_df, on ="store_id", how="inner")
display(combined_df)


In [0]:
from pyspark.sql.functions import year, col, round
combined_df = combined_df.withColumn("year", year(col("sale_date")))
combined_df = combined_df.withColumn("sales_per_sqft", round(col("total_amount") / col("store_size"), 2))
display(combined_df)

In [0]:
#Total sales and total quantity per store and region 
combined_df.createOrReplaceTempView("combined")
store_sales_sql = spark.sql("""
    SELECT 
        store_id, 
        store_region, 
        SUM(total_amount) AS total_sales, 
        SUM(quantity) AS total_quantity
    FROM combined
    GROUP BY store_id, store_region
""")

display(store_sales_sql)


In [0]:
#Top five products by total quantity sold
top_products_sql = spark.sql("""
    SELECT 
        product_id, 
        SUM(quantity) AS total_quantity
    FROM combined
    GROUP BY product_id
    ORDER BY total_quantity DESC
    LIMIT 5
""")
display(top_products_sql)

In [0]:
#Top five stores by sales 
store_sales_sql.createOrReplaceTempView("store_sales")
top_store = spark.sql("
    select 
        store_id, 
        total_sales 
    from store_sales 
    order by total_sales 
    desc limit 5"
)

display(top_store)

In [0]:
top_products_sql.write.parquet("/Volumes/workspace/retail/retaildata/top_products.parquet")
store_sales_sql.write.parquet("/Volumes/workspace/retail/retaildata/store_sales.parquet")
top_store.write.parquet("/Volumes/workspace/retail/retaildata/top_store.parquet")