In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [None]:
# Create Spark session
spark = SparkSession \
    .builder.appName("EcommerceAnalysis") \
    .getOrCreate()

In [3]:
file = "dataset_pq/2019-Oct/part-00000-6a99fc51-2c5d-4c07-8a45-fd3d071b37a8-c000.snappy.parquet"

In [None]:
# Load data (example)
df = spark.read.p
.option("table", "fleet-aleph-447822-a2.e_commerce_dataset.e-com-sample").load()

# Define window partitioned by user_id, product_id ordered by event_time
window_spec = Window.partitionBy("user_id", "product_id").orderBy("event_time")

# Add cumulative counts
df = df.withColumn("cumulative_views", F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).over(window_spec))
df = df.withColumn("cumulative_carts", F.sum(F.when(F.col("event_type") == "cart", 1).otherwise(0)).over(window_spec))
df = df.withColumn("cumulative_purchases", F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).over(window_spec))

# Filter only cart & purchase events, then get min views before each
result = df.filter(F.col("event_type").isin(["cart", "purchase"])) \
           .groupBy("user_id", "product_id") \
           .agg(F.min("cumulative_views").alias("views_before_carting"),
                F.min("cumulative_views").alias("views_before_purchasing"))

result.show()
