https://stackoverflow.com/questions/54662219/spark-advanced-window-with-dynamic-last

Problem: Given a time series data which is a clickstream of user activity is stored in hive, ask is to enrich the data with session id using spark.

Session Definition

    Session expires after inactivity of 1 hour
    Session remains active for a total duration of 2 hours

In [32]:
from pyspark.sql.functions import unix_timestamp, from_unixtime,col, lit, udf, datediff, lead, explode,to_date
from pyspark.sql import SparkSession,Window,DataFrame
import datetime
from pyspark.sql.types import StringType,BooleanType,DateType,LongType,ArrayType
from typing import List
import pyspark.sql.functions as f
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
streaming_data=[("U1","2019-01-01T11:00:00Z") , 
("U1","2019-01-01T11:15:00Z") , 
("U1","2019-01-01T12:00:00Z") , 
("U1","2019-01-01T12:20:00Z") , 
("U1","2019-01-01T15:00:00Z") , 
("U2","2019-01-01T11:00:00Z") , 
("U2","2019-01-02T11:00:00Z") , 
("U2","2019-01-02T11:25:00Z") , 
("U2","2019-01-02T11:50:00Z") , 
("U2","2019-01-02T12:15:00Z") , 
("U2","2019-01-02T12:40:00Z") , 
("U2","2019-01-02T13:05:00Z") , 
("U2","2019-01-02T13:20:00Z") ]
schema=("UserId","Click_Time")
df_stream=spark.createDataFrame(streaming_data,schema)
df_stream=df_stream.withColumn("Click_Time",df_stream["Click_Time"].cast("timestamp"))
df_stream1=df_stream.withColumn("Click_Time",f.unix_timestamp("Click_Time"))

In [35]:
df_stream.show()

+------+-------------------+
|UserId|         Click_Time|
+------+-------------------+
|    U1|2019-01-01 16:30:00|
|    U1|2019-01-01 16:45:00|
|    U1|2019-01-01 17:30:00|
|    U1|2019-01-01 17:50:00|
|    U1|2019-01-01 20:30:00|
|    U2|2019-01-01 16:30:00|
|    U2|2019-01-02 16:30:00|
|    U2|2019-01-02 16:55:00|
|    U2|2019-01-02 17:20:00|
|    U2|2019-01-02 17:45:00|
|    U2|2019-01-02 18:10:00|
|    U2|2019-01-02 18:35:00|
|    U2|2019-01-02 18:50:00|
+------+-------------------+



In [33]:
df_stream1.show()

+------+----------+
|UserId|Click_Time|
+------+----------+
|    U1|1546340400|
|    U1|1546341300|
|    U1|1546344000|
|    U1|1546345200|
|    U1|1546354800|
|    U2|1546340400|
|    U2|1546426800|
|    U2|1546428300|
|    U2|1546429800|
|    U2|1546431300|
|    U2|1546432800|
|    U2|1546434300|
|    U2|1546435200|
+------+----------+



In [29]:
window_spec=Window.partitionBy("UserId").orderBy("Click_Time")
df_stream=df_stream\
.withColumn("time_diff",
            (f.unix_timestamp("Click_Time")-f.unix_timestamp(f.lag(f.col("Click_Time"),1).over(window_spec)))/(60*60)).na.fill(0)

df_stream=df_stream.withColumn("cond_",f.when(f.col("time_diff")>1,1).otherwise(0))
df_stream=df_stream.withColumn("temp_session",f.sum(f.col("cond_")).over(window_spec))
df_stream.show(20) 

+------+-------------------+------------------+-----+------------+
|UserId|         Click_Time|         time_diff|cond_|temp_session|
+------+-------------------+------------------+-----+------------+
|    U2|2019-01-01 16:30:00|               0.0|    0|           0|
|    U2|2019-01-02 16:30:00|              24.0|    1|           1|
|    U2|2019-01-02 16:55:00|0.4166666666666667|    0|           1|
|    U2|2019-01-02 17:20:00|0.4166666666666667|    0|           1|
|    U2|2019-01-02 17:45:00|0.4166666666666667|    0|           1|
|    U2|2019-01-02 18:10:00|0.4166666666666667|    0|           1|
|    U2|2019-01-02 18:35:00|0.4166666666666667|    0|           1|
|    U2|2019-01-02 18:50:00|              0.25|    0|           1|
|    U1|2019-01-01 16:30:00|               0.0|    0|           0|
|    U1|2019-01-01 16:45:00|              0.25|    0|           0|
|    U1|2019-01-01 17:30:00|              0.75|    0|           0|
|    U1|2019-01-01 17:50:00|0.3333333333333333|    0|         

In [30]:
new_window=Window.partitionBy("UserId","temp_session").orderBy("Click_Time")
new_spec=new_window.rowsBetween(Window.unboundedPreceding,Window.currentRow)
cond_2hr=(f.unix_timestamp("Click_Time")-f.unix_timestamp(f.lag(f.col("Click_Time"),1).over(new_window)))
df_stream=df_stream.withColumn("2hr_time_diff", cond_2hr).na.fill(0)
df_stream=df_stream.withColumn("temp_session_2hr",f.when(f.sum(f.col("2hr_time_diff")).over(new_spec)-(2*60*60)>0,1).otherwise(0))
df_stream.show(20) 

+------+-------------------+------------------+-----+------------+-------------+----------------+
|UserId|         Click_Time|         time_diff|cond_|temp_session|2hr_time_diff|temp_session_2hr|
+------+-------------------+------------------+-----+------------+-------------+----------------+
|    U2|2019-01-01 16:30:00|               0.0|    0|           0|            0|               0|
|    U2|2019-01-02 16:30:00|              24.0|    1|           1|            0|               0|
|    U2|2019-01-02 16:55:00|0.4166666666666667|    0|           1|         1500|               0|
|    U2|2019-01-02 17:20:00|0.4166666666666667|    0|           1|         1500|               0|
|    U2|2019-01-02 17:45:00|0.4166666666666667|    0|           1|         1500|               0|
|    U2|2019-01-02 18:10:00|0.4166666666666667|    0|           1|         1500|               0|
|    U2|2019-01-02 18:35:00|0.4166666666666667|    0|           1|         1500|               1|
|    U2|2019-01-02 1

In [31]:
new_window_2hr=Window.partitionBy(["UserId","temp_session","temp_session_2hr"]).orderBy("Click_Time")
hrs_cond_=(f.when(f.unix_timestamp(f.col("Click_Time"))-f.unix_timestamp(f.first(f.col("Click_Time")).over(new_window_2hr))-(2*60*60)>0,1).otherwise(0))
df_stream=df_stream.withColumn("final_session_groups",hrs_cond_)
df_stream.show(20) 

+------+-------------------+------------------+-----+------------+-------------+----------------+--------------------+
|UserId|         Click_Time|         time_diff|cond_|temp_session|2hr_time_diff|temp_session_2hr|final_session_groups|
+------+-------------------+------------------+-----+------------+-------------+----------------+--------------------+
|    U2|2019-01-01 16:30:00|               0.0|    0|           0|            0|               0|                   0|
|    U2|2019-01-02 16:30:00|              24.0|    1|           1|            0|               0|                   0|
|    U2|2019-01-02 16:55:00|0.4166666666666667|    0|           1|         1500|               0|                   0|
|    U2|2019-01-02 17:20:00|0.4166666666666667|    0|           1|         1500|               0|                   0|
|    U2|2019-01-02 17:45:00|0.4166666666666667|    0|           1|         1500|               0|                   0|
|    U2|2019-01-02 18:10:00|0.4166666666666667| 

In [15]:
df_stream=df_stream.withColumn("final_session",df_stream["temp_session_2hr"]+df_stream["temp_session"]+df_stream["final_session_groups"]+1)\
.drop("temp_session","final_session_groups","time_diff","temp_session_2hr","final_session_groups")
df_stream=df_stream.withColumn("session_id",f.concat(f.col("UserId"),f.col("final_session")))
df_stream.show(20,0) 

+------+-------------------+-----+-------------+-------------+----------+
|UserId|Click_Time         |cond_|2hr_time_diff|final_session|session_id|
+------+-------------------+-----+-------------+-------------+----------+
|U2    |2019-01-01 16:30:00|0    |0            |1            |U21       |
|U2    |2019-01-02 16:30:00|1    |0            |2            |U22       |
|U2    |2019-01-02 16:55:00|0    |1500         |2            |U22       |
|U2    |2019-01-02 17:20:00|0    |1500         |2            |U22       |
|U2    |2019-01-02 17:45:00|0    |1500         |2            |U22       |
|U2    |2019-01-02 18:10:00|0    |1500         |2            |U22       |
|U2    |2019-01-02 18:35:00|0    |1500         |3            |U23       |
|U2    |2019-01-02 18:50:00|0    |900          |3            |U23       |
|U1    |2019-01-01 16:30:00|0    |0            |1            |U11       |
|U1    |2019-01-01 16:45:00|0    |900          |1            |U11       |
|U1    |2019-01-01 17:30:00|0    |2700