In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, lag, when, last, sum, coalesce
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder.appName("SessionIdAllocation").getOrCreate()

# Sample data
data = [
    ("user1", "2024-09-13 10:00:00"),
    ("user1", "2024-09-13 10:06:00"),
    ("user1", "2024-09-13 10:40:00"),
    ("user2", "2024-09-13 10:05:00"),
    ("user2", "2024-09-13 10:09:00"),
    ("user2", "2024-09-13 10:13:00"),
    ("user2", "2024-09-13 10:17:00"),
    ("user2", "2024-09-13 10:21:00"),
    ("user2", "2024-09-13 10:25:00"),
    ("user2", "2024-09-13 10:29:00"),
    ("user2", "2024-09-13 10:33:00"),
    ("user2", "2024-09-13 10:35:00"),
    ("user2", "2024-09-13 10:39:00"),
    ("user2", "2024-09-13 10:43:00"),
    ("user2", "2024-09-13 10:47:00"),
    ("user2", "2024-09-13 10:59:00"),
]

# Create DataFrame
df = spark.createDataFrame(data, ["user_id", "timestamp"])
df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Define window specification for time difference calculation
windowSpec = Window.partitionBy("user_id").orderBy(col("timestamp"))

# Calculate time difference from previous event
df = df.withColumn("prev_timestamp", lag("timestamp").over(windowSpec))
df = df.withColumn("time_diff", unix_timestamp("timestamp")\
    - unix_timestamp("prev_timestamp"))

# Initialize a map to store the user ID and the last active session timestamp
user_last_active = {}

# Define a UDF to update the session ID based on the map
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, TimestampType

def update_session_id(user_id, timestamp, time_diff):
    global user_last_active
    if user_id not in user_last_active or time_diff is None or time_diff >= 300\
        or (timestamp - user_last_active[user_id]).seconds >= 1800:
        user_last_active[user_id] = timestamp
        return timestamp
    else:
        return user_last_active[user_id]

update_session_id_udf = udf(update_session_id, TimestampType())

# Apply the UDF to update the session ID
df = df.withColumn("session_id", update_session_id_udf(col("user_id"), col("timestamp"), col("time_diff")))

# Show the DataFrame with session IDs
# df.show(truncate=False)

# Define window specification for session duration calculation
windowSpec2 = Window.partitionBy("user_id", "session_id").orderBy(col("timestamp"))

# Calculate session duration
df = df.withColumn("session_duration", sum(coalesce(col("time_diff"), col("time_diff") * 0)).over(windowSpec2))

# Assign new session if duration exceeds 30 minutes
df = df.withColumn("final_session_id", when(col("session_duration") > 1800, col("timestamp"))
                                           .otherwise(col("session_id")))

# Fill null final session IDs with the last non-null final session ID within the window
df = df.withColumn("final_session_id", last("final_session_id", ignorenulls=True).over(windowSpec))

# Show the final DataFrame sorted by timestamp
df.select("user_id", "timestamp", "final_session_id").orderBy("user_id", "timestamp").show(truncate=False)

+-------+-------------------+-------------------+
|user_id|timestamp          |final_session_id   |
+-------+-------------------+-------------------+
|user1  |2024-09-13 10:00:00|2024-09-13 10:00:00|
|user1  |2024-09-13 10:06:00|2024-09-13 10:06:00|
|user1  |2024-09-13 10:40:00|2024-09-13 10:40:00|
|user2  |2024-09-13 10:05:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:09:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:13:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:17:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:21:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:25:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:29:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:33:00|2024-09-13 10:05:00|
|user2  |2024-09-13 10:35:00|2024-09-13 10:35:00|
|user2  |2024-09-13 10:39:00|2024-09-13 10:35:00|
|user2  |2024-09-13 10:43:00|2024-09-13 10:35:00|
|user2  |2024-09-13 10:47:00|2024-09-13 10:35:00|
|user2  |2024-09-13 10:59:00|2024-09-13 10:59:00|
+-------+-------------------+-------------------+


In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, session_window, min, max, unix_timestamp, expr, explode, sequence, lead, when
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder.appName("SessionIdAllocation").getOrCreate()

# Sample data
data = [
    ("user1", "2024-09-13 10:00:00"),
    ("user1", "2024-09-13 10:06:00"),
    ("user1", "2024-09-13 10:40:00"),
    ("user2", "2024-09-13 10:05:00"),
    ("user2", "2024-09-13 10:09:00"),
    ("user2", "2024-09-13 10:13:00"),
    ("user2", "2024-09-13 10:17:00"),
    ("user2", "2024-09-13 10:21:00"),
    ("user2", "2024-09-13 10:25:00"),
    ("user2", "2024-09-13 10:29:00"),
    ("user2", "2024-09-13 10:33:00"),
    ("user2", "2024-09-13 10:35:00"),
    ("user2", "2024-09-13 10:39:00"),
    ("user2", "2024-09-13 10:43:00"),
    ("user2", "2024-09-13 10:47:00"),
    ("user2", "2024-09-13 10:59:00"),
]

# Create DataFrame
df = spark.createDataFrame(data, ["user_id", "timestamp"])
df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Apply session window with a timeout of 5 minutes (300 seconds)
sessionized_df = df.groupBy("user_id", session_window(col("timestamp"), "5 minutes"))\
    .agg(min("timestamp").alias("session_start"), max("timestamp").alias("session_end"))

# Calculate session duration
sessionized_df = sessionized_df.withColumn("session_duration",
    unix_timestamp(col("session_end")) - unix_timestamp(col("session_start")))

# Split sessions exceeding 30 minutes
max_session_duration = 30 * 60  # 30 minutes in seconds

# Create a DataFrame for sessions that do not exceed 30 minutes
short_sessions_df = sessionized_df.filter(col("session_duration") <= max_session_duration)

# Create a DataFrame for sessions that exceed 30 minutes
long_sessions_df = sessionized_df.filter(col("session_duration") > max_session_duration)

# Calculate the number of 30-minute intervals within each long session
long_sessions_df = long_sessions_df.withColumn("num_intervals",
    (col("session_duration") / max_session_duration).cast("integer"))

# Generate the start and end times for each interval
long_sessions_df = long_sessions_df.withColumn("exploded_session_start",
    explode(sequence(col("session_start"), col("session_end"), expr("interval 30 minutes"))))

long_sessions_df = long_sessions_df.withColumn("exploded_session_end",
    lead("exploded_session_start", 1).over(Window.partitionBy("user_id").orderBy("exploded_session_start")))

# Adjust the end time of the last interval
long_sessions_df = long_sessions_df.withColumn("exploded_session_end",
    when(col("exploded_session_end").isNull(), col("session_end")).otherwise(col("exploded_session_end")))

# Drop unnecessary columns
long_sessions_df = long_sessions_df.drop("session_start", "session_end", "session_duration", "num_intervals")

# Rename columns to match the original schema
long_sessions_df = long_sessions_df.withColumnRenamed("exploded_session_start", "session_start")\
    .withColumnRenamed("exploded_session_end", "session_end")

# Show the results
long_sessions_df.show(truncate=False)
short_sessions_df.show(truncate=False)

# Stop the Spark session
spark.stop()

24/09/19 23:49:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+-------+------------------------------------------+-------------------+-------------------+
|user_id|session_window                            |session_start      |session_end        |
+-------+------------------------------------------+-------------------+-------------------+
|user2  |{2024-09-13 10:05:00, 2024-09-13 10:52:00}|2024-09-13 10:05:00|2024-09-13 10:35:00|
|user2  |{2024-09-13 10:05:00, 2024-09-13 10:52:00}|2024-09-13 10:35:00|2024-09-13 10:47:00|
+-------+------------------------------------------+-------------------+-------------------+

+-------+------------------------------------------+-------------------+-------------------+----------------+
|user_id|session_window                            |session_start      |session_end        |session_duration|
+-------+------------------------------------------+-------------------+-------------------+----------------+
|user1  |{2024-09-13 10:00:00, 2024-09-13 10:05:00}|2024-09-13 10:00:00|2024-09-13 10:00:00|0               |
|