In [None]:
file_path = "abfss://raw-data@icezydatalake01.dfs.core.windows.net/raw-data/icezysales.csv"

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(file_path)

df.show(10)

In [None]:
from pyspark.sql.functions import col, lit, lower

cleaned_df = df.fillna({
    "category_code": "unknown",
    "brand": "unknown",
    "price": 0.0,
})

cleaned_df = cleaned_df.withColumn("event_type", lower(col("event_type"))) \
                       .withColumn("price", col("price").cast("double"))

In [None]:
from pyspark.sql.functions import count

session_activity = cleaned_df.groupBy("user_session").agg(
    count("*").alias("events_in_session")
)

In [None]:
from pyspark.sql.functions import sum

event_spending = cleaned_df.groupBy("event_type").agg(
    sum("price").alias("total_value")
)

In [None]:
viewed_products = cleaned_df.filter(col("event_type") == "view") \
    .groupBy("product_id") \
    .count() \
    .orderBy("count", ascending=False)

In [None]:
brand_sales = cleaned_df.filter(col("event_type") == "cart") \
    .groupBy("brand") \
    .agg(sum("price").alias("total_cart_value")) \
    .orderBy("total_cart_value", ascending=False)

In [None]:
base_path = "abfss://raw-data@icezydatalake01.dfs.core.windows.net/analytics_output/"

session_activity.write.mode("overwrite").parquet(base_path + "session_activity")
event_spending.write.mode("overwrite").parquet(base_path + "event_spending")
viewed_products.write.mode("overwrite").parquet(base_path + "viewed_products")
brand_sales.write.mode("overwrite").parquet(base_path + "brand_sales")