In [0]:
sales_df = spark.read.csv("/Volumes/sample_catalog/default/db_catalog/sales_data.csv", header=True, inferSchema=True)
store_df = spark.read.csv("/Volumes/sample_catalog/default/db_catalog/store_data.csv", header=True, inferSchema=True)

display(sales_df)
display(store_df)

In [0]:
# Data clean-up for sales data

from pyspark.sql.functions import col, to_date

sales_df = sales_df.fillna({"quantity": 0, "total_amount": 0})
sales_df = sales_df.dropDuplicates()
sales_df = sales_df.filter(col("sale_id").isNotNull())
sales_df = sales_df.filter(col("store_id").isNotNull())
sales_df = sales_df.filter(col("sale_date").isNotNull())
sales_df = sales_df.filter((col("quantity") > 0) & (col("total_amount") > 0))





display(sales_df)

In [0]:
# Data clean-up for store data
from pyspark.sql.functions import avg

store_df = store_df.filter(col("store_id").isNotNull())
avg_store_size = store_df.select(avg("store_size")).first()[0]
print(avg_store_size)
store_df = store_df.fillna({"store_size": avg_store_size})
store_df = store_df.filter(col("open_date").isNotNull())

display(store_df)


In [0]:
# Data transformation between sales and store data

combined_df = sales_df.join(store_df, on = "store_id", how = "inner")
display(combined_df)


In [0]:
# Data aggregation between sales and store data

from pyspark.sql.functions import year, col, round

combined_df = combined_df.withColumn("year", year(col("sale_date")))
combined_df = combined_df.withColumn("sale_per_sqft", round(col("total_amount") / col("store_size"), 2))

display(combined_df)

In [0]:
# Creating a viwew of the combined data to calculate the total sales and total quantity per store and region 

combined_df.createOrReplaceTempView("combined_v")
store_sales_sql = spark.sql("""
    SELECT 
        store_id, 
        store_region, 
        SUM(total_amount) as total_sales, 
        SUM(quantity) as total_quantity 
    FROM combined_v
    GROUP BY store_id, store_region
""")

display(store_sales_sql)

In [0]:
# Find out top 5 products by toital quantity sold

top_products_sql = spark.sql("""
    SELECT 
        product_id, 
        SUM(quantity) AS total_quantity 
    FROM combined_v
    GROUP BY product_id
    ORDER BY total_quantity DESC
    LIMIT 5
""")

display(top_products_sql)

In [0]:
# Find out top 5 stores by quantity sold

store_sales_sql.createOrReplaceTempView("store_sales")
top_store = spark.sql("""
    SELECT 
        store_id, 
        total_sales 
    FROM store_sales
    ORDER BY total_sales DESC
    LIMIT 5
""")

display(top_store)


In [0]:
# Save the output results as a parquet file to the volume or path

top_products_sql.write.parquet("/Volumes/sample_catalog/default/db_catalog/top_products")
top_store.write.parquet("/Volumes/sample_catalog/default/db_catalog/top_store")