In [0]:
df_1 = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)

df_2 = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
df_1, df_2


from pyspark.sql.functions import lit, concat, to_timestamp
df_1 = df_1.withColumn("event_time", to_timestamp("event_time")).withColumn("month", lit("Oct"))
df_2= df_2.withColumn("event_time", to_timestamp("event_time")).withColumn("month", lit("Nov"))
df_full = df_1.union(df_2)

In [0]:

print("Dataset schema:")
df_full.printSchema()
df_full.show(5)

In [0]:
# Aggregate purchases per user-product for join example
from pyspark.sql.functions import sum as sum_, count, first
purchases = df_full.filter("event_type = 'purchase'") \
    .groupBy("user_id", "product_id") \
    .agg(sum_("price").alias("total_spent"), count("*").alias("purchase_count"))

# Complex join: purchases with user sessions (left join to keep all purchases)
user_sessions = df_full.groupBy("user_id").agg(first("user_session").alias("first_session"))
df_joined = purchases.join(user_sessions, "user_id", "left_outer") \
    .filter("total_spent > 10")  # Complex condition post-join

df_joined.show(10)


In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col

# Filter purchases and sort
purchases_sorted = df_full.filter("event_type = 'purchase'") \
    .select("user_id", "event_time", "price") \
    .orderBy("user_id", "event_time")

# Window spec: partition by user, running cumulative
window_spec = Window.partitionBy("user_id").orderBy("event_time").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_running = purchases_sorted.withColumn("running_total", sum_("price").over(window_spec))
df_running.show(20)


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
import pyspark.sql.functions as F

# UDF for price tier
def price_tier(price):
    if price > 100: return "Premium"
    elif price > 50: return "Mid"
    else: return "Budget"
tier_udf = udf(price_tier, StringType())

# Extract category depth (e.g., len(split(category_code, '.')))
def cat_depth(code):
    return len(code.split('.')) if code else 0
depth_udf = udf(cat_depth, IntegerType())

df_features = df_full.withColumn("price_tier", tier_udf("price")) \
    .withColumn("category_depth", depth_udf("category_code")) \
    .withColumn("is_purchase", (col("event_type") == "purchase").cast("int"))

df_features.filter("event_type = 'purchase'").select("user_id", "price_tier", "category_depth", "is_purchase").show(10)
