In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, datediff
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("SessionGap").getOrCreate()

data = [
    ("U1", "2025-01-01"),
    ("U1", "2025-01-05"),
    ("U1", "2025-01-10"),
    ("U2", "2025-01-02"),
    ("U2", "2025-01-04"),
]

df = spark.createDataFrame(data, ["user_id", "session_date"])

window_spec = Window.partitionBy("user_id").orderBy("session_date")

df_with_gap = df.withColumn(
    "prev_session_date", lag("session_date").over(window_spec)
).withColumn(
    "days_since_last_session",
    datediff(col("session_date"), col("prev_session_date"))
)

df_with_gap.show()


+-------+------------+-----------------+-----------------------+
|user_id|session_date|prev_session_date|days_since_last_session|
+-------+------------+-----------------+-----------------------+
|     U1|  2025-01-01|             NULL|                   NULL|
|     U1|  2025-01-05|       2025-01-01|                      4|
|     U1|  2025-01-10|       2025-01-05|                      5|
|     U2|  2025-01-02|             NULL|                   NULL|
|     U2|  2025-01-04|       2025-01-02|                      2|
+-------+------------+-----------------+-----------------------+



In [None]:
"""
SELECT customer_id
FROM purchases
GROUP BY customer_id
HAVING COUNT(*) = 1;
"""