In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    ArrayType,
    DateType,
    BooleanType,
    TimestampType,
    DoubleType,
)
from pyspark.sql.functions import (
    col,
    to_timestamp,
    to_date,
    when,
    broadcast,
    count,
    sum as spark_sum,
    count_distinct,
)
from pyspark.sql.window import Window

In [None]:
spark = (
    SparkSession.builder.appName("Revenue Analysis")
    .master("local[8]")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    # .config("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:9000")
    .getOrCreate()
)

spark_context = spark.sparkContext.getOrCreate()

print("\n===== Spark Context Info =====")
print(f"App Name      : {spark_context.appName}")
print(f"Master        : {spark_context.master}")
print(f"Application ID: {spark_context.applicationId}")
print(f"UI Web URL    : {spark_context.uiWebUrl}")
print(f"Version       : {spark_context.version}")
print(f"Python Ver    : {spark_context.pythonVer}")

In [None]:
!head -n 1 data/items.jsonl | jq "keys_unsorted" 
!head -n 1 data/users.jsonl | jq "keys_unsorted" 

In [None]:
!gunzip -c data/events/part-00.jsonl.gz | head -n 1 | jq "."

In [None]:
items_schema = StructType(
    [
        StructField("item_id", IntegerType(), False),
        StructField("category", StringType(), False),
        StructField("tags", ArrayType(StringType()), False),
    ]
)

user_schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("signup_date", DateType(), False),
        StructField("plan", StringType(), False),
        StructField("country", StringType(), False),
        StructField("marketing_opt_in", BooleanType(), False),
    ]
)

event_schema = StructType(
    [
        StructField("ts", TimestampType(), False),
        StructField("event", StringType(), False),
        StructField("user_id", IntegerType(), False),
        StructField("item_id", IntegerType(), False),
        StructField(
            "context",
            StructType(
                [
                    StructField("country", StringType(), False),
                    StructField("device", StringType(), False),
                    StructField("locale", StringType(), False),
                    StructField("session_id", StringType(), False),
                ]
            ),
        ),
        StructField(
            "props",
            StructType(
                [
                    StructField("price", DoubleType(), True),
                    StructField("payment_method", StringType(), True),
                    StructField("dwell_ms", IntegerType(), True),
                ]
            ),
        ),
        StructField(
            "exp",
            StructType(
                [
                    StructField("ab_group", StringType(), True),
                ]
            ),
        ),
    ]
)

In [None]:
df_users = spark.read.schema(user_schema).json("data/users.jsonl")
df_items = spark.read.schema(items_schema).json("data/items.jsonl")
df_events = spark.read.schema(event_schema).json(
    [
        "data/events/part-00.jsonl.gz",
        "data/events/part-01.jsonl.gz",
        "data/events/part-02.jsonl.gz",
        "data/events/part-03.jsonl.gz",
    ]
)


print(
    f"users | total - {df_users.count()} | partitions - {df_users.rdd.getNumPartitions()}"
)
df_users.show(5)
print(
    f"\nitems | total - {df_items.count()} | partitions - {df_items.rdd.getNumPartitions()}"
)
df_items.show(5)

In [None]:
df_events = spark.read.schema(event_schema).json(
    [
        "data/events/part-00.jsonl.gz",
        "data/events/part-01.jsonl.gz",
        "data/events/part-02.jsonl.gz",
        "data/events/part-03.jsonl.gz",
    ]
)

print(
    f"events | total - {df_events.count()} | partitions - {df_events.rdd.getNumPartitions()}"
)
df_events.show(5)

In [None]:
df_events = df_events.withColumn("timestamp", to_timestamp("ts")).withColumn(
    "date", to_date("ts")
)

In [None]:
df_events = (
    df_events.withColumn(
        "revenue",
        when(col("event") == "purchase", col("props.price").cast(DoubleType()))
        .otherwise(0.0)
        .cast(DoubleType()),
    )
).filter(col("revenue") >= 0.0)

print(
    f"events | total - {df_events.count()} | partitions - {df_events.rdd.getNumPartitions()}"
)

#### broadcast

In [None]:
# spark shuffles both side of joins
# shuffles -> computationally expension and i/o operations included
# broadcast -> ships small tables to every executors
# now executors can join locally with their respective partitions

df_joined = df_events.join(broadcast(df_items), on="item_id", how="left").join(
    broadcast(df_users), df_events.user_id == df_users.id, how="left"
)

df_joined.explain("extended")

In [None]:
df_aggregate = df_joined.groupBy("date", "country", "category").agg(
    count("*").alias("total_events"),
    count(when(col("event") == "purchase", 1)).alias("num_purchases"),
    spark_sum("revenue").alias("total_revenue"),
    count_distinct("user_id").alias("unique_users"),
)

print(
    f"aggregations | total - {df_aggregate.count()} | partitions - {df_aggregate.rdd.getNumPartitions()}"
)
df_aggregate.show(5)

In [None]:
w = Window.partitionBy("country", "category").orderBy("date").rowsBetween(-6, 0)

df_final = df_aggregate.withColumn("revenue_7d", spark_sum("total_revenue").over(w))

In [None]:
# one folder per date partitionBy('date')

df_final.write.mode("overwrite").partitionBy("date").parquet("out/daily_kpi/")

In [None]:
df_sample = spark.read.parquet("out/daily_kpi/date=2025-10-23")

print(
    f"out/daily_kpi/date=2025-10-23 | total - {df_sample.count()} | partitions - {df_sample.rdd.getNumPartitions()}"
)
df_sample.show(10, truncate=False)

In [None]:
# TODO: repartition findings