In [0]:
%python
sales_df_path = "/Volumes/workspace/retail_project/sales_store_data/sales_data.csv"

sales_df = spark.read.format("csv").option("header", "true").option("inferschema", "true").load(sales_df_path)

display(sales_df)

In [0]:
%python
from pyspark.sql.functions import col, to_date

sales_df = sales_df.fillna({"quantity": 0, "total_amount": 0})
sales_df = sales_df.dropDuplicates()
sales_df = sales_df.filter(col("sale_id").isNotNull() & col("store_id").isNotNull() & col("sale_date").isNotNull())
sales_df = sales_df.filter((col("quantity") >= 0) & (col("total_amount") >= 0))

display(sales_df)

In [0]:
%python
store_df_path = "/Volumes/workspace/retail_project/sales_store_data/store_data.csv"

store_df = spark.read.format("csv").option("header", "true").option("inferschema", "true").load(store_df_path)

display(store_df)

In [0]:
from pyspark.sql.functions import avg

store_df = store_df.filter(col("store_id").isNotNull() & col("open_date").isNotNull())
avg_store_size = store_df.select(avg("store_size")).first()[0]
print(avg_store_size)
store_df = store_df.fillna({"store_size": avg_store_size})
display(store_df)

In [0]:
combined_df = sales_df.join(store_df, on="store_id", how="inner")

display(combined_df)

In [0]:
from pyspark.sql.functions import year, col, round

combined_df = combined_df.withColumn("year", year(col("sale_date")))
combined_df = combined_df.withColumn("sales_per_sqft", round(col("total_amount") / col("store_size"), 2))

display(combined_df)

In [0]:
%python
combined_df.createOrReplaceTempView("combined")
store_sales_sql = spark.sql("""
    SELECT
        store_id,
        store_region,
        SUM(CAST(total_amount AS DOUBLE)) as total_sales,
        SUM(CAST(quantity AS INT)) as total_quantity
    FROM combined
    GROUP BY store_id, store_region
""")

display(store_sales_sql)

In [0]:
top_products_sql = spark.sql("""
    SELECT 
        product_id,
        SUM(quantity) AS total_quantity
        FROM combined
        GROUP BY product_id
        ORDER BY total_quantity DESC
        LIMIT 5
""")

display(top_products_sql)

In [0]:
%python
store_sales_sql.createOrReplaceTempView("store_sales")
top_store = spark.sql("""
    SELECT store_id, total_sales 
    FROM store_sales 
    ORDER BY total_sales DESC 
    LIMIT 5
""")

display(top_store)

In [0]:
top_products_sql.write.parquet("/Volumes/workspace/retail_project/sales_store_data/top_products_parquet")
top_store.write.parquet("/Volumes/workspace/retail_project/sales_store_data/top_store_parquet")