In [1]:
from datetime import timedelta
from delta.tables import DeltaTable
from loguru import logger
from typing import Mapping, Iterable, Union

from pyspark import SparkConf
from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, types as T

from event_sessions.utils.spark import create_conf, DEFAULT_CONF
from event_sessions.utils.parameters import USER_ACTION_IDS, INACTIVITY_SECONDS
from event_sessions.sessions.events_to_sessions import rewrite_impacted_users

# SparkSession

In [None]:
# DEFAULT_CONF = {
#     "spark.app.name": "sessions",
#     "spark.master": "local[*]",
#     "spark.sql.session.timeZone": "UTC",
#     "spark.sql.shuffle.partitions": "200",
#     "spark.default.parallelism": "200",
#     "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
#     "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
#     "spark.jars.packages": "io.delta:delta-spark_2.12:3.2.0",
#     "spark.sql.warehouse.dir": "./spark-warehouse",
#     "spark.driver.memory": "1g",
#     "spark.sql.adaptive.enabled": "true",
#     "spark.sql.adaptive.coalescePartitions.enabled": "true",
#     "spark.sql.adaptive.advisoryPartitionSizeInBytes": "268435456",  # 256MB
# }

spark_config = create_conf(DEFAULT_CONF)
spark = (
    SparkSession.builder.appName("sessions")
    .enableHiveSupport()
    .config(conf=spark_config)
    .config("spark.ui.enabled", "true")
    .config("spark.ui.port", "4040")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.driver.host", "localhost")
    .config("spark.publicDns", "localhost")
    .config("spark.driver.extraJavaOptions", "-Djava.net.preferIPv4Stack=true")
    .getOrCreate()
)
spark

# Parameters

In [3]:
input_path = "/home/demon/jetbrains/github/sessions/data/demo/csv/events"
output_path = "/home/demon/jetbrains/github/sessions/data/demo/sessions"

run_date_str = "2025-09-20"  # which ingest partition to read

# Step 0: create delta (just once)

In [None]:
# Step 0: create delta (just once)
# schema = T.StructType([
#     T.StructField("user_id", T.StringType(), True),
#     T.StructField("event_id", T.StringType(), True),
#     T.StructField("product_code", T.StringType(), True),
#     T.StructField("timestamp", T.TimestampType(), True),
#     T.StructField("event_date", T.DateType(), True),
#     T.StructField("session_start_ts", T.TimestampType(), True),
#     T.StructField("session_end_ts", T.TimestampType(), True),
#     T.StructField("session_id", T.StringType(), True),
# ])

# (
#     spark.createDataFrame([], schema)
#         .write.format("delta")
#         .mode("overwrite")
#         .partitionBy("event_date")
#         .save(output_path)
# )

# Step 1: Daily raw partition

In [6]:
# Step 1: Read the daily raw partition
# run_date_str = run_date.date().strftime("%Y-%m-%d")
read_partition_path = f"{input_path}/date={run_date_str}"
logger.debug(f"[DEBUG] read_partition_path={read_partition_path}")

# read from csv (there is parquet in the main job)
df_raw = (
    spark.read.option("header", True).csv(read_partition_path, inferSchema=True)
      .select("user_id","event_id","product_code",F.col("timestamp").cast("timestamp").alias("timestamp"))
)

df = (
    df_raw.select(
        F.col("user_id").cast("string").alias("user_id"),
        F.col("event_id").cast("string").alias("event_id"),
        F.col("product_code").cast("string").alias("product_code"),
        F.col("timestamp").cast("timestamp").alias("timestamp"),
    )
    .withColumn("event_date", F.to_date("timestamp"))
)

df.show(truncate=False)

[32m2025-09-25 00:59:09.532[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m<module>[0m:[36m4[0m - [34m[1m[DEBUG] read_partition_path=/home/demon/jetbrains/github/sessions/data/demo/csv/events/date=2025-09-20[0m


+-------+--------+------------+-------------------+----------+
|user_id|event_id|product_code|timestamp          |event_date|
+-------+--------+------------+-------------------+----------+
|u1     |a       |pyc         |2025-09-18 09:00:00|2025-09-18|
|u1     |z       |pyc         |2025-09-18 09:03:00|2025-09-18|
|u1     |b       |pyc         |2025-09-18 09:04:59|2025-09-18|
|u1     |y       |pyc         |2025-09-18 09:10:30|2025-09-18|
|u1     |a       |pyc         |2025-09-18 10:00:00|2025-09-18|
|u1     |x       |pyc         |2025-09-18 10:02:00|2025-09-18|
|u1     |c       |pyc         |2025-09-18 10:07:01|2025-09-18|
|u2     |b       |pyc         |2025-09-20 23:58:00|2025-09-20|
|u2     |x       |pyc         |2025-09-20 00:02:00|2025-09-20|
|u2     |c       |pyc         |2025-09-20 00:02:59|2025-09-20|
|u2     |d       |pyc         |2025-09-20 00:10:00|2025-09-20|
|u3     |x       |idea        |2025-09-18 12:00:00|2025-09-18|
+-------+--------+------------+-------------------+----

# Step 2: Compute all dates / timestamps - dmin..dmax are the actual event_date bounds inside today's batch

In [7]:
# Step 2: Compute all dates / timestamps - dmin..dmax are the actual event_date bounds inside today's batch
mm = df.agg(
    F.min("event_date").alias("dmin"),
    F.max("event_date").alias("dmax")
).first()
dmin, dmax = mm["dmin"], mm["dmax"]

# Context window for building sessions: [ctx_left_ts, ctx_right_ts_excl)
left_ctx_date = dmin - timedelta(days=1)  # look back 1 day
right_ctx_date = dmax + timedelta(days=1)  # look ahead 1 day
ctx_left_ts = f"{left_ctx_date} 00:00:00"
ctx_right_ts_excl = f"{(right_ctx_date + timedelta(days=1))} 00:00:00"  # = dmax+2 00:00

# Write window: rewrite impacted users for [dmin .. dmax+1]
write_left_date = dmin
write_right_date = dmax + timedelta(days=1)
write_left_ts = f"{dmin} 00:00:00"
write_right_ts_excl = f"{(dmax + timedelta(days=2))} 00:00:00"  # = dmax+2 00:00

logger.info(
    f"Dates: run_date_str={run_date_str}, "
    f"ctx=[{ctx_left_ts} .. {ctx_right_ts_excl}), "
    f"write=[{write_left_ts} .. {write_right_ts_excl}), "
    f"new batch dates=[{write_left_date} .. {write_right_date}]"
)

[32m2025-09-25 00:59:11.697[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m20[0m - [1mDates: run_date_str=2025-09-20, ctx=[2025-09-17 00:00:00 .. 2025-09-22 00:00:00), write=[2025-09-18 00:00:00 .. 2025-09-22 00:00:00), new batch dates=[2025-09-18 .. 2025-09-21][0m


# Step 3: Detect impacted keys to limit IO

In [8]:
# Step 3: Detect impacted keys to limit IO
impacted_keys = df.select("user_id", "product_code").distinct()
impacted_keys.show(truncate=False)

+-------+------------+
|user_id|product_code|
+-------+------------+
|u3     |idea        |
|u1     |pyc         |
|u2     |pyc         |
+-------+------------+



# Step 4: Read Delta once for impacted keys, then split into ctx/write

In [9]:
# Step 4: Read Delta once for impacted keys, then split into ctx/write
existing_superset = (
    spark.read.format("delta").load(output_path)
    .select("user_id", "event_id", "product_code", "timestamp", "event_date")
    .join(impacted_keys, ["user_id", "product_code"], "inner")
    .where(
        (F.col("timestamp") >= F.lit(ctx_left_ts)) &
        (F.col("timestamp") < F.lit(ctx_right_ts_excl))
    )
    .cache()  # TODO: not necessary but possible
)
existing_superset.count()

existing_ctx = existing_superset
existing_write = existing_superset.where(
    (F.col("timestamp") >= F.lit(write_left_ts)) &
    (F.col("timestamp") < F.lit(write_right_ts_excl))
)

existing_superset.show()

25/09/25 00:59:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------+--------+---------+----------+
|user_id|product_code|event_id|timestamp|event_date|
+-------+------------+--------+---------+----------+
+-------+------------+--------+---------+----------+



# Step 5: Build sessions over (new + context) only for impacted keys - Basic session detection logic

In [10]:
# Step 5: Build sessions over (new + context) only for impacted keys - Basic session detection logic
all_for_sessions = df.select(
    "user_id", "event_id", "product_code", "timestamp", "event_date"
).unionByName(
    existing_ctx.select("user_id", "event_id", "product_code", "timestamp", "event_date")
)

df_window = (
    all_for_sessions
    .where(
        (F.col("timestamp") >= F.lit(ctx_left_ts)) &
        (F.col("timestamp") < F.lit(ctx_right_ts_excl))
    )
    .repartition("user_id", "product_code")   # reduce skew
)

ua = (
    df_window
    .filter(F.col("event_id").isin([*USER_ACTION_IDS]))
    .select("user_id", "product_code", F.col("timestamp").alias("ts"))
)

w = Window.partitionBy("user_id", "product_code").orderBy("ts")
ua = ua.withColumn("prev_ts", F.lag("ts").over(w))
ua = ua.withColumn(
    "is_new",
    F.when(
        F.col("prev_ts").isNull() |
        ((F.col("ts").cast("long") - F.col("prev_ts").cast("long")) >= INACTIVITY_SECONDS),
        F.lit(1)
    ).otherwise(F.lit(0))
)
ua = ua.withColumn(
    "sess_seq",
    F.sum("is_new").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

w_sess = Window.partitionBy("user_id", "product_code", "sess_seq")
sessions = (
    ua
    .withColumn("session_start_ts", F.min("ts").over(w_sess))
    .withColumn("last_user_action_ts", F.max("ts").over(w_sess))
    .select("user_id", "product_code", "sess_seq", "session_start_ts", "last_user_action_ts")
    .dropDuplicates(["user_id", "product_code", "sess_seq"])
    .withColumn(
        "session_end_ts",
        F.expr(f"last_user_action_ts + INTERVAL {INACTIVITY_SECONDS} SECONDS")
    )
    .drop("sess_seq")
    .withColumn(
        "session_id",
        F.concat_ws(
            "#",
            F.col("user_id"),
            F.col("product_code"),
            F.date_format(F.col("session_start_ts"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
        )
    )
)

sessions.show(truncate=False)

+-------+------------+-------------------+-------------------+-------------------+-------------------------------+
|user_id|product_code|session_start_ts   |last_user_action_ts|session_end_ts     |session_id                     |
+-------+------------+-------------------+-------------------+-------------------+-------------------------------+
|u1     |pyc         |2025-09-18 09:00:00|2025-09-18 09:04:59|2025-09-18 09:09:59|u1#pyc#2025-09-18T09:00:00.000Z|
|u1     |pyc         |2025-09-18 10:00:00|2025-09-18 10:00:00|2025-09-18 10:05:00|u1#pyc#2025-09-18T10:00:00.000Z|
|u1     |pyc         |2025-09-18 10:07:01|2025-09-18 10:07:01|2025-09-18 10:12:01|u1#pyc#2025-09-18T10:07:01.000Z|
|u2     |pyc         |2025-09-20 00:02:59|2025-09-20 00:02:59|2025-09-20 00:07:59|u2#pyc#2025-09-20T00:02:59.000Z|
|u2     |pyc         |2025-09-20 23:58:00|2025-09-20 23:58:00|2025-09-21 00:03:00|u2#pyc#2025-09-20T23:58:00.000Z|
+-------+------------+-------------------+-------------------+------------------

# Step 6: Full write set for [dmin .. dmax+1] - keep existing + add missing new

In [11]:
# Step 6: Full write set for [dmin .. dmax+1] - keep existing + add missing new
new_only = (
    df.where(
        (F.col("timestamp") >= F.lit(write_left_ts)) &
        (F.col("timestamp") < F.lit(write_right_ts_excl))
    )
    .join(
        existing_write.select("user_id", "product_code", "timestamp").distinct(),
        on=["user_id", "product_code", "timestamp"],
        how="left_anti"
    )
)

to_write_base = existing_write.unionByName(new_only)

to_write_base.show(truncate=False)

+-------+------------+--------+-------------------+----------+
|user_id|product_code|event_id|timestamp          |event_date|
+-------+------------+--------+-------------------+----------+
|u1     |pyc         |a       |2025-09-18 09:00:00|2025-09-18|
|u1     |pyc         |z       |2025-09-18 09:03:00|2025-09-18|
|u1     |pyc         |b       |2025-09-18 09:04:59|2025-09-18|
|u1     |pyc         |y       |2025-09-18 09:10:30|2025-09-18|
|u1     |pyc         |a       |2025-09-18 10:00:00|2025-09-18|
|u1     |pyc         |x       |2025-09-18 10:02:00|2025-09-18|
|u1     |pyc         |c       |2025-09-18 10:07:01|2025-09-18|
|u2     |pyc         |b       |2025-09-20 23:58:00|2025-09-20|
|u2     |pyc         |x       |2025-09-20 00:02:00|2025-09-20|
|u2     |pyc         |c       |2025-09-20 00:02:59|2025-09-20|
|u2     |pyc         |d       |2025-09-20 00:10:00|2025-09-20|
|u3     |idea        |x       |2025-09-18 12:00:00|2025-09-18|
+-------+------------+--------+-------------------+----

# Step 7: Assign session_id to all events in the write window (broadcast range join)

In [12]:
# Step 7: Assign session_id to all events in the write window (broadcast range join)
s_for_join = sessions.select(
    F.col("user_id").alias("s_user_id"),
    F.col("product_code").alias("s_product_code"),
    "session_start_ts", "session_end_ts", "session_id"
)

updates = (
    to_write_base
    .withColumn("ts", F.col("timestamp"))
    .join(
        F.broadcast(s_for_join),
        on=[
            F.col("user_id") == F.col("s_user_id"),
            F.col("product_code") == F.col("s_product_code"),
            F.col("ts") >= F.col("session_start_ts"),
            F.col("ts") <= F.col("session_end_ts"),
        ],
        how="left"
    )
    .drop("s_user_id", "s_product_code", "ts")
    .withColumn(
        "session_start_ts",
        F.when(F.col("session_id").isNotNull(), F.col("session_start_ts"))
         .otherwise(F.lit(None).cast("timestamp"))
    )
    .withColumn(
        "session_end_ts",
        F.when(F.col("session_id").isNotNull(), F.col("session_end_ts"))
         .otherwise(F.lit(None).cast("timestamp"))
    )
    .withColumn("event_date", F.to_date("timestamp"))
    .select(
        "user_id", "event_id", "product_code", "timestamp",
        "event_date", "session_start_ts", "session_end_ts", "session_id"
    )
)

updates.show(truncate=False)

+-------+--------+------------+-------------------+----------+-------------------+-------------------+-------------------------------+
|user_id|event_id|product_code|timestamp          |event_date|session_start_ts   |session_end_ts     |session_id                     |
+-------+--------+------------+-------------------+----------+-------------------+-------------------+-------------------------------+
|u1     |a       |pyc         |2025-09-18 09:00:00|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:09:59|u1#pyc#2025-09-18T09:00:00.000Z|
|u1     |z       |pyc         |2025-09-18 09:03:00|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:09:59|u1#pyc#2025-09-18T09:00:00.000Z|
|u1     |b       |pyc         |2025-09-18 09:04:59|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:09:59|u1#pyc#2025-09-18T09:00:00.000Z|
|u1     |y       |pyc         |2025-09-18 09:10:30|2025-09-18|NULL               |NULL               |NULL                           |
|u1     |a       |pyc         |2025-09-18 10:00:00|2025

# Step 8: Delete impacted users for [dmin .. dmax+1] and append fresh result (MERGE delete)

In [13]:
# Step 8: Delete impacted users for [dmin .. dmax+1] and append fresh result (MERGE delete)

# Repartition by event_date and a hash-based day_bucket to evenly distribute data and control files per day
files_per_day = 1
updates_b = updates.withColumn("day_bucket", F.pmod(F.xxhash64("user_id"), F.lit(files_per_day)))
num_days = (write_right_date - write_left_date).days + 1
updates = updates_b.repartition(num_days * files_per_day, "event_date", "day_bucket").drop("day_bucket")

rewrite_impacted_users(
    spark=spark,
    output_path=output_path,
    impacted_keys=impacted_keys,
    updates=updates,
    write_left_date=write_left_date,
    write_right_date=write_right_date,
    run_date_str=run_date_str,
)

[32m2025-09-25 00:59:27.270[0m | [1mINFO    [0m | [36mevent_sessions.sessions.events_to_sessions[0m:[36mrewrite_impacted_users[0m:[36m50[0m - [1mRewrote impacted users for dates [2025-09-18 .. 2025-09-21] from batch date=2025-09-20; table_version=1[0m


{'status': 'ok',
 'output_path': '/home/demon/jetbrains/github/sessions/data/demo/sessions',
 'left_date': '2025-09-18',
 'right_date': '2025-09-21',
 'table_version': 1}

# Result

In [14]:
# sample20250920.csv analysis

# user_id,event_id,timestamp,product_code
# u1,a,2025-09-18 09:00:00,pyc
# u1,z,2025-09-18 09:03:00,pyc
# u1,b,2025-09-18 09:04:59,pyc

# u1,y,2025-09-18 09:10:30,pyc

# u1,a,2025-09-18 10:00:00,pyc
# u1,x,2025-09-18 10:02:00,pyc

# u1,c,2025-09-18 10:07:01,pyc

# u2,b,2025-09-20 23:58:00,pyc

# u2,x,2025-09-20 00:02:00,pyc

# u2,c,2025-09-20 00:02:59,pyc

# u2,d,2025-09-20 00:10:00,pyc

# u3,x,2025-09-18 12:00:00,idea

In [15]:
df_result = spark.read.format("delta").load(output_path)
df_result.orderBy("user_id", "timestamp").show(truncate=False, n=20)

+-------+--------+------------+-------------------+----------+-------------------+-------------------+-------------------------------+
|user_id|event_id|product_code|timestamp          |event_date|session_start_ts   |session_end_ts     |session_id                     |
+-------+--------+------------+-------------------+----------+-------------------+-------------------+-------------------------------+
|u1     |a       |pyc         |2025-09-18 09:00:00|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:09:59|u1#pyc#2025-09-18T09:00:00.000Z|
|u1     |z       |pyc         |2025-09-18 09:03:00|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:09:59|u1#pyc#2025-09-18T09:00:00.000Z|
|u1     |b       |pyc         |2025-09-18 09:04:59|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:09:59|u1#pyc#2025-09-18T09:00:00.000Z|
|u1     |y       |pyc         |2025-09-18 09:10:30|2025-09-18|NULL               |NULL               |NULL                           |
|u1     |a       |pyc         |2025-09-18 10:00:00|2025

# spark-submit

In [16]:
# Run in terminal
# spark-submit --packages io.delta:delta-spark_2.12:3.2.0 src/event_sessions/sessions/events_to_sessions.py --run-date 2025-09-21

In [17]:
# sample20250921.csv analysis

# user_id,event_id,timestamp,product_code
# u1,a,2025-09-18 09:00:00,pyc
# u1,z,2025-09-18 09:03:00,pyc
# u1,b,2025-09-18 09:04:59,pyc
# u1,b,2025-09-18 09:08:59,pyc <- "+"
# u1,y,2025-09-18 09:10:30,pyc <- "new in session"
# u1,v,2025-09-18 09:10:50,pyc <- "new in session"

# u1,a,2025-09-18 10:00:00,pyc
# u1,x,2025-09-18 10:02:00,pyc

# u1,c,2025-09-18 10:07:01,pyc

# u2,b,2025-09-19 23:58:00,pyc <- "new session (new user event)"
# u2,x,2025-09-20 00:02:00,pyc <- "new in session"
# u2,c,2025-09-20 00:02:59,pyc <- "new in session (new user event)"
# u2,a,2025-09-20 00:02:59,pyc <- "new in session (new user event)"

# u2,d,2025-09-20 00:10:00,pyc <- "same ide event without session"

# u3,a,2025-09-18 11:58:00,idea
# u3,x,2025-09-18 12:00:00,idea

In [18]:
df_result_20250921 = spark.read.format("delta").load(output_path)
df_result_20250921.orderBy("user_id", "timestamp").show(truncate=False, n=20)

+-------+--------+------------+-------------------+----------+-------------------+-------------------+--------------------------------+
|user_id|event_id|product_code|timestamp          |event_date|session_start_ts   |session_end_ts     |session_id                      |
+-------+--------+------------+-------------------+----------+-------------------+-------------------+--------------------------------+
|u1     |a       |pyc         |2025-09-18 09:00:00|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:13:59|u1#pyc#2025-09-18T09:00:00.000Z |
|u1     |z       |pyc         |2025-09-18 09:03:00|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:13:59|u1#pyc#2025-09-18T09:00:00.000Z |
|u1     |b       |pyc         |2025-09-18 09:04:59|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:13:59|u1#pyc#2025-09-18T09:00:00.000Z |
|u1     |b       |pyc         |2025-09-18 09:08:59|2025-09-18|2025-09-18 09:00:00|2025-09-18 09:13:59|u1#pyc#2025-09-18T09:00:00.000Z |
|u1     |y       |pyc         |2025-09-18 09:10: