In [1539]:
import time

pipeline_start = time.time()

#1 describe schemas


In [1540]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    IntegerType,
    BooleanType,
)

In [1541]:
users_schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("signup_date", StringType(), True),
        StructField("plan", StringType(), True),
        StructField("country", StringType(), True),
        StructField("marketing_opt_in", BooleanType(), True),
    ]
)


In [1542]:
items_schema = StructType(
    [
        StructField("item_id", IntegerType(), True),
        StructField("category", StringType(), True),
        StructField("price", DoubleType(), True),
    ]
)

In [1543]:
events_schema = StructType(
    [
        StructField("ts", StringType()),
        StructField("event", StringType()),
        StructField("user_id", IntegerType()),
        StructField("item_id", IntegerType()),
        StructField(
            "context",
            StructType(
                [
                    StructField("country", StringType()),
                    StructField("device", StringType()),
                    StructField("locale", StringType()),
                    StructField("session_id", StringType()),
                ]
            ),
        ),
        StructField(
            "props",
            StructType(
                [
                    StructField("price", DoubleType()),
                    StructField("payment_method", StringType()),
                    StructField("dwell_ms", IntegerType()),
                ]
            ),
        ),
        StructField("exp", StructType([StructField("ab_group", StringType())])),
    ]
)

#2 read data


In [1544]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Revenue Analysis").getOrCreate()

spark.sparkContext.setJobGroup("revenue-analysis", "Revenue Analysis Pipeline - Data Processing and KPI Calculation")

spark.sparkContext.setJobDescription("Stage 1: Loading raw data (users, items, events)")



In [1545]:
users_df = spark.read.schema(users_schema).json("./data/users.jsonl")
users_df.show(5, truncate=False)
print(f"Total users: {users_df.count()}")
print(f"Users partitions: {users_df.rdd.getNumPartitions()}")

+---+-----------+----+-------+----------------+
|id |signup_date|plan|country|marketing_opt_in|
+---+-----------+----+-------+----------------+
|1  |2024-10-01 |free|TH     |false           |
|2  |2025-05-02 |free|DE     |true            |
|3  |2024-08-10 |pro |DE     |true            |
|4  |2025-01-20 |free|DE     |false           |
|5  |2025-04-27 |free|US     |true            |
+---+-----------+----+-------+----------------+
only showing top 5 rows
Total users: 20000
Users partitions: 1


In [1546]:
items_df = spark.read.schema(items_schema).json("./data/items.jsonl")
items_df.show(5, truncate=False)

print(f"Total items: {items_df.count()}")
print(f"Items partitions: {items_df.rdd.getNumPartitions()}")


+-------+-----------+-----+
|item_id|category   |price|
+-------+-----------+-----+
|1      |sports     |NULL |
|2      |sports     |NULL |
|3      |sports     |NULL |
|4      |books      |NULL |
|5      |electronics|NULL |
+-------+-----------+-----+
only showing top 5 rows


Total items: 5000
Items partitions: 1


In [1547]:
events_df = spark.read.schema(events_schema).json("./data/events/")
events_df.show(5, truncate=False)
print(f"Total events: {events_df.count()}")
print(f"Events partitions: {events_df.rdd.getNumPartitions()}")


+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+
|ts                              |event   |user_id|item_id|context                                                   |props               |exp|
+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+
|2025-09-27T15:26:07.741915+00:00|view    |2484   |3640   |{DE, web, pt_BR, 8abdca9e-195d-41b0-ab3f-68cbeae1ad8a}    |{NULL, NULL, 2953}  |{A}|
|2025-09-13T18:06:24.741915+00:00|purchase|13203  |1841   |{US, android, de_DE, 662e452a-ae57-49a1-a222-60f66ba7b925}|{159.61, card, NULL}|{A}|
|2025-09-09T02:59:53.741915+00:00|view    |3166   |4991   |{DE, ios, de_DE, 031db753-abab-40fc-b105-86892dd724f6}    |{NULL, NULL, 4129}  |{B}|
|2025-09-21T20:33:59.741915+00:00|view    |11992  |4647   |{VN, ios, en_US, 60a5c78b-fc7a-42a7-949d-862871aba4ff}    |{NULL, NULL, 4028}

#3 fix some datatypes


In [1548]:
from pyspark.sql.functions import col, to_date, to_timestamp, when

In [1549]:
users_df = users_df.withColumn("signup_date", to_date(col("signup_date")))
users_df = users_df.withColumnRenamed("id", "user_id")
users_df.show(5, truncate=False)
print(f"Total users: {users_df.count()}")

+-------+-----------+----+-------+----------------+
|user_id|signup_date|plan|country|marketing_opt_in|
+-------+-----------+----+-------+----------------+
|1      |2024-10-01 |free|TH     |false           |
|2      |2025-05-02 |free|DE     |true            |
|3      |2024-08-10 |pro |DE     |true            |
|4      |2025-01-20 |free|DE     |false           |
|5      |2025-04-27 |free|US     |true            |
+-------+-----------+----+-------+----------------+
only showing top 5 rows
Total users: 20000


In [1550]:
events_df = events_df.withColumn("timestamp", to_timestamp(col("ts")))
events_df = events_df.withColumn("date", to_date(col("ts")))
events_df.show(5, truncate=False)
print(f"Total events: {events_df.count()}")

+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+--------------------------+----------+
|ts                              |event   |user_id|item_id|context                                                   |props               |exp|timestamp                 |date      |
+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+--------------------------+----------+
|2025-09-27T15:26:07.741915+00:00|view    |2484   |3640   |{DE, web, pt_BR, 8abdca9e-195d-41b0-ab3f-68cbeae1ad8a}    |{NULL, NULL, 2953}  |{A}|2025-09-27 22:26:07.741915|2025-09-27|
|2025-09-13T18:06:24.741915+00:00|purchase|13203  |1841   |{US, android, de_DE, 662e452a-ae57-49a1-a222-60f66ba7b925}|{159.61, card, NULL}|{A}|2025-09-14 01:06:24.741915|2025-09-13|
|2025-09-09T02:59:53.741915+00:00|view    |3166   |4991   |{DE, ios, de_DE, 031db753-abab-

#4 add revenue column


In [1551]:
events_df = events_df.withColumn(
    "revenue",
    when(col("event") == "purchase", col("props.price").cast(DoubleType())).otherwise(
        0.0
    ),
)

events_df.show(5, truncate=False)
print(f"Total events: {events_df.count()}")
print(f"Events partitions: {events_df.rdd.getNumPartitions()}")

+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+--------------------------+----------+-------+
|ts                              |event   |user_id|item_id|context                                                   |props               |exp|timestamp                 |date      |revenue|
+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+--------------------------+----------+-------+
|2025-09-27T15:26:07.741915+00:00|view    |2484   |3640   |{DE, web, pt_BR, 8abdca9e-195d-41b0-ab3f-68cbeae1ad8a}    |{NULL, NULL, 2953}  |{A}|2025-09-27 22:26:07.741915|2025-09-27|0.0    |
|2025-09-13T18:06:24.741915+00:00|purchase|13203  |1841   |{US, android, de_DE, 662e452a-ae57-49a1-a222-60f66ba7b925}|{159.61, card, NULL}|{A}|2025-09-14 01:06:24.741915|2025-09-13|159.61 |
|2025-09-09T02:59:53.741915+00:00|view    |3166   

In [1552]:
events_df = events_df.filter(col("revenue") >= 0.0)

events_df.show(5, truncate=False)
print(f"Total events: {events_df.count()}")
print(f"Events partitions: {events_df.rdd.getNumPartitions()}")

+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+--------------------------+----------+-------+
|ts                              |event   |user_id|item_id|context                                                   |props               |exp|timestamp                 |date      |revenue|
+--------------------------------+--------+-------+-------+----------------------------------------------------------+--------------------+---+--------------------------+----------+-------+
|2025-09-27T15:26:07.741915+00:00|view    |2484   |3640   |{DE, web, pt_BR, 8abdca9e-195d-41b0-ab3f-68cbeae1ad8a}    |{NULL, NULL, 2953}  |{A}|2025-09-27 22:26:07.741915|2025-09-27|0.0    |
|2025-09-13T18:06:24.741915+00:00|purchase|13203  |1841   |{US, android, de_DE, 662e452a-ae57-49a1-a222-60f66ba7b925}|{159.61, card, NULL}|{A}|2025-09-14 01:06:24.741915|2025-09-13|159.61 |
|2025-09-09T02:59:53.741915+00:00|view    |3166   

#5 join data


In [1553]:
spark.sparkContext.setJobDescription("Stage 2: Joining events with items and users")


In [1554]:
# events_df = events_df.repartition(8)
# events_df = events_df.repartition(4, col("date"))
# events_df = events_df.repartition(2, col("date"))
# events_df = events_df.repartition(4)
# events_df = events_df.repartition(8)
events_df = events_df.repartition(12, col("date"))

In [1555]:
from pyspark.sql.functions import broadcast, count, sum, countDistinct


In [1556]:
events_df = events_df.join(broadcast(items_df), on="item_id", how="inner").join(
    broadcast(users_df), on="user_id", how="inner"
)

# broadcast helps here because items table and users table
# are much smaller (20k and 5k)
# each worker can do its join without sharing data (shuffling) over other workers


events_df.show(5, truncate=False)
print(f"Total events: {events_df.count()}")
print(f"Events partitions: {events_df.rdd.getNumPartitions()}")

+-------+-------+--------------------------------+--------+----------------------------------------------------------+--------------------+---+--------------------------+----------+-------+--------+-----+-----------+----+-------+----------------+
|user_id|item_id|ts                              |event   |context                                                   |props               |exp|timestamp                 |date      |revenue|category|price|signup_date|plan|country|marketing_opt_in|
+-------+-------+--------------------------------+--------+----------------------------------------------------------+--------------------+---+--------------------------+----------+-------+--------+-----+-----------+----+-------+----------------+
|5368   |2147   |2025-09-11T10:33:47.741915+00:00|view    |{IN, web, en_US, 7a15820f-913d-4be2-b426-2701516e96be}    |{NULL, NULL, 4917}  |{A}|2025-09-11 17:33:47.741915|2025-09-11|0.0    |toys    |NULL |2024-09-22 |free|IN     |false           |
|6507   |570

In [1557]:
spark.sparkContext.setJobDescription("Stage 3: Calculating daily KPIs and 7-day rolling revenue")


In [1558]:
# events_df = events_df.repartition(4, col("date"))


In [1559]:
daily_kpi_df = events_df.groupBy("date", "country", "category").agg(
    count("*").alias("events_total"),
    count(when(col("event") == "purchase", True)).alias("purchases"),
    sum("revenue").alias("revenue"),
    countDistinct("user_id").alias("unique_users")
)

daily_kpi_df.show(20, truncate=False)
print(f"Total grouped records: {daily_kpi_df.count()}")
print(f"Daily KPI partitions: {daily_kpi_df.rdd.getNumPartitions()}")

+----------+-------+-----------+------------+---------+------------------+------------+
|date      |country|category   |events_total|purchases|revenue           |unique_users|
+----------+-------+-----------+------------+---------+------------------+------------+
|2025-09-11|IN     |toys       |87          |5        |1031.24           |84          |
|2025-08-26|IN     |sports     |240         |4        |588.4             |226         |
|2025-08-26|BR     |fashion    |247         |11       |1489.74           |236         |
|2025-08-26|TH     |fashion    |229         |7        |925.9300000000001 |218         |
|2025-08-26|BR     |toys       |261         |12       |1969.3600000000001|241         |
|2025-09-11|TH     |books      |90          |1        |209.65            |86          |
|2025-09-11|BR     |electronics|76          |0        |0.0               |76          |
|2025-08-26|DE     |sports     |216         |3        |452.44            |208         |
|2025-08-26|VN     |sports     |

#6 combine data

In [1560]:
from pyspark.sql import Window

In [1561]:
window_spec = Window.partitionBy("country", "category").orderBy("date").rowsBetween(-6, 0)

daily_kpi_df = daily_kpi_df.withColumn("revenue_7d", sum("revenue").over(window_spec))

daily_kpi_df.show(20, truncate=False)
print(f"Total weekly records: {daily_kpi_df.count()}")
print(f"Weekly KPI partitions: {daily_kpi_df.rdd.getNumPartitions()}")


+----------+-------+--------+------------+---------+------------------+------------+------------------+
|date      |country|category|events_total|purchases|revenue           |unique_users|revenue_7d        |
+----------+-------+--------+------------+---------+------------------+------------+------------------+
|2025-08-25|BR     |books   |233         |5        |949.1000000000001 |225         |949.1000000000001 |
|2025-08-26|BR     |books   |216         |11       |1681.91           |211         |2631.01           |
|2025-08-27|BR     |books   |195         |3        |280.39            |189         |2911.4            |
|2025-08-28|BR     |books   |185         |9        |1171.16           |180         |4082.5600000000004|
|2025-08-29|BR     |books   |160         |7        |1099.1            |153         |5181.66           |
|2025-08-30|BR     |books   |149         |8        |1203.58           |146         |6385.24           |
|2025-08-31|BR     |books   |151         |5        |822.28      

#8 write to parquet

In [1562]:
# daily_kpi_df = daily_kpi_df.coalesce(4)
# daily_kpi_df = daily_kpi_df.repartition(4, col("date"))
daily_kpi_df = daily_kpi_df.repartition(12, col("date"))

In [1563]:
spark.sparkContext.setJobDescription("Stage 4: Writing results to parquet")

In [1564]:
daily_kpi_df.write.partitionBy("date").mode("overwrite").parquet("out/daily_kpi/")

test_df = spark.read.parquet("out/daily_kpi/date=2025-08-25")
test_df.show(5)


+-------+-----------+------------+---------+-----------------+------------+-----------------+
|country|   category|events_total|purchases|          revenue|unique_users|       revenue_7d|
+-------+-----------+------------+---------+-----------------+------------+-----------------+
|     BR|      books|         233|        5|949.1000000000001|         225|949.1000000000001|
|     BR|electronics|         215|        6|873.1999999999999|         211|873.1999999999999|
|     BR|    fashion|         234|        7|           608.23|         216|           608.23|
|     BR|       home|         230|       10|          1468.25|         218|          1468.25|
|     BR|     sports|         230|        5|           629.35|         220|           629.35|
+-------+-----------+------------+---------+-----------------+------------+-----------------+
only showing top 5 rows


In [1565]:
total_time = time.time() - pipeline_start

print(f"Total pipeline time: {total_time:.2f} seconds")

Total pipeline time: 8.61 seconds


In [1566]:
# without repartition: 
#     12.26 secondsm, nothing changed


# before join
# 12.24, repartition(4)
# 11.90, repartition(8)
# 10.53 seconds, repartition(4, col("date"))
# 9.81 seconds, repartition(8, col("date"))

# better to repartition by date, before join

# before aggregation:
# 11.75 seconds, no so good

# before write:
# 9.68, repartition(4, col("date"))
# 10.47, coalesce(4) alone


# best time (not avg):
# 8.27,  with repartition before join, and before write, sometimes 10s

# i think would be better to use all 12 cores that i have
# theres not much difference between 8 and 12 partitions

# but with 12 partitions doesnt get more than 10 seconds
# sometimes 8 partitions is faster, sometimes slower

# i would say 12 for stability