<a href="https://colab.research.google.com/github/dineshkumarDE/learnPython/blob/main/pysparkScenarios.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession.builder.appName("SparkLearning").getOrCreate()
data = [
 ("D001", "2024-03-28 10:00:00"),
 ("D001", "2024-03-28 10:04:00"),
 ("D001", "2024-03-28 10:25:00"),
 ("D001", "2024-03-28 10:29:00"),
 ("D001", "2024-03-28 11:00:00")
]


In [26]:
datadf=spark.createDataFrame(data,["device_id","logtime"]).withColumn("logtime",col("logtime").cast("timestamp"))

In [27]:
windowdata=Window.partitionBy("device_id").orderBy("logtime")

datadf=datadf.withColumn("prevtime",lag("logtime").over(windowdata))

datadf.show()

+---------+-------------------+-------------------+
|device_id|            logtime|           prevtime|
+---------+-------------------+-------------------+
|     D001|2024-03-28 10:00:00|               NULL|
|     D001|2024-03-28 10:04:00|2024-03-28 10:00:00|
|     D001|2024-03-28 10:25:00|2024-03-28 10:04:00|
|     D001|2024-03-28 10:29:00|2024-03-28 10:25:00|
|     D001|2024-03-28 11:00:00|2024-03-28 10:29:00|
+---------+-------------------+-------------------+



In [28]:
datadf=datadf.withColumn("timediff",unix_timestamp("logtime")-unix_timestamp("prevtime"))
datadf.show()

+---------+-------------------+-------------------+--------+
|device_id|            logtime|           prevtime|timediff|
+---------+-------------------+-------------------+--------+
|     D001|2024-03-28 10:00:00|               NULL|    NULL|
|     D001|2024-03-28 10:04:00|2024-03-28 10:00:00|     240|
|     D001|2024-03-28 10:25:00|2024-03-28 10:04:00|    1260|
|     D001|2024-03-28 10:29:00|2024-03-28 10:25:00|     240|
|     D001|2024-03-28 11:00:00|2024-03-28 10:29:00|    1860|
+---------+-------------------+-------------------+--------+



In [29]:
datadf=datadf.withColumn("newsession",(col("timediff")>600).cast("int"))
datadf.show()

+---------+-------------------+-------------------+--------+----------+
|device_id|            logtime|           prevtime|timediff|newsession|
+---------+-------------------+-------------------+--------+----------+
|     D001|2024-03-28 10:00:00|               NULL|    NULL|      NULL|
|     D001|2024-03-28 10:04:00|2024-03-28 10:00:00|     240|         0|
|     D001|2024-03-28 10:25:00|2024-03-28 10:04:00|    1260|         1|
|     D001|2024-03-28 10:29:00|2024-03-28 10:25:00|     240|         0|
|     D001|2024-03-28 11:00:00|2024-03-28 10:29:00|    1860|         1|
+---------+-------------------+-------------------+--------+----------+



In [30]:
datadf=datadf.withColumn("sessionid",sum("newsession").over(windowdata.rowsBetween(Window.unboundedPreceding,0)))
datadf.show()

+---------+-------------------+-------------------+--------+----------+---------+
|device_id|            logtime|           prevtime|timediff|newsession|sessionid|
+---------+-------------------+-------------------+--------+----------+---------+
|     D001|2024-03-28 10:00:00|               NULL|    NULL|      NULL|     NULL|
|     D001|2024-03-28 10:04:00|2024-03-28 10:00:00|     240|         0|        0|
|     D001|2024-03-28 10:25:00|2024-03-28 10:04:00|    1260|         1|        1|
|     D001|2024-03-28 10:29:00|2024-03-28 10:25:00|     240|         0|        1|
|     D001|2024-03-28 11:00:00|2024-03-28 10:29:00|    1860|         1|        2|
+---------+-------------------+-------------------+--------+----------+---------+



In [31]:
result=datadf.groupBy("device_id","sessionid").agg(min("logtime").alias("start_time"),max("logtime").alias("end_time")).orderBy("start_time")
result.show()

+---------+---------+-------------------+-------------------+
|device_id|sessionid|         start_time|           end_time|
+---------+---------+-------------------+-------------------+
|     D001|     NULL|2024-03-28 10:00:00|2024-03-28 10:00:00|
|     D001|        0|2024-03-28 10:04:00|2024-03-28 10:04:00|
|     D001|        1|2024-03-28 10:25:00|2024-03-28 10:29:00|
|     D001|        2|2024-03-28 11:00:00|2024-03-28 11:00:00|
+---------+---------+-------------------+-------------------+



In [32]:
# prompt: how to not get NULL sessionid because that is not actually a session

datadf = datadf.withColumn("newsession", when(col("timediff").isNull(), 1).otherwise((col("timediff") > 600).cast("int")))
datadf.show()
datadf = datadf.withColumn("sessionid", sum("newsession").over(windowdata.rowsBetween(Window.unboundedPreceding, 0)))
datadf.show()
result = datadf.groupBy("device_id", "sessionid").agg(min("logtime").alias("start_time"), max("logtime").alias("end_time")).orderBy("start_time")
result.show()


+---------+-------------------+-------------------+--------+----------+---------+
|device_id|            logtime|           prevtime|timediff|newsession|sessionid|
+---------+-------------------+-------------------+--------+----------+---------+
|     D001|2024-03-28 10:00:00|               NULL|    NULL|         1|     NULL|
|     D001|2024-03-28 10:04:00|2024-03-28 10:00:00|     240|         0|        0|
|     D001|2024-03-28 10:25:00|2024-03-28 10:04:00|    1260|         1|        1|
|     D001|2024-03-28 10:29:00|2024-03-28 10:25:00|     240|         0|        1|
|     D001|2024-03-28 11:00:00|2024-03-28 10:29:00|    1860|         1|        2|
+---------+-------------------+-------------------+--------+----------+---------+

+---------+-------------------+-------------------+--------+----------+---------+
|device_id|            logtime|           prevtime|timediff|newsession|sessionid|
+---------+-------------------+-------------------+--------+----------+---------+
|     D001|2024