Question:
You are given a large dataset of user activity logs with the following schema:
user_id (string), event_type (string), event_timestamp (timestamp), device_type (string)
 
Sample Data:
+---------+------------+-------------------+-------------+
| user_id | event_type | event_timestamp   | device_type |
+---------+------------+-------------------+-------------+
| u1      | login      | 2023-08-01 08:00  | mobile      |
| u1      | purchase   | 2023-08-01 08:10  | mobile      |
| u2      | login      | 2023-08-01 09:00  | desktop     |
| u2      | logout     | 2023-08-01 09:30  | desktop     |
| u1      | logout     | 2023-08-01 08:30  | mobile      |
| u3      | login      | 2023-08-01 10:00  | tablet      |
| u3      | purchase   | 2023-08-01 10:05  | tablet      |
+---------+------------+-------------------+-------------+
 
Task:
For each user, calculate the average session duration (logout - login) in minutes, considering only sessions where both login and logout events exist. Ignore overlapping sessions.
 
Expected Output:
+---------+----------------------+
| user_id | avg_session_minutes  |
+---------+----------------------+
| u1      | 30.0                 |
| u2      | 30.0                 |
+---------+----------------------+

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Interview").getOrCreate()

In [None]:
schema = StructType([
    StructField('user_id', StringType(), False),
    StructField('event_type', StringType(), False),
    StructField('event_timestamp', StringType(), False),
    StructField('device_type', StringType(), False),
])
data = [
    ("u1", "login", "2023-08-01 08:00", "mobile"),
    ("u1", "purchase", "2023-08-01 08:10", "mobile"),
    ("u2", "login", "2023-08-01 09:00", "desktop"),
    ("u2", "logout", "2023-08-01 09:30", "desktop"),
    ("u1", "logout", "2023-08-01 08:30", "mobile"),
    ("u3", "login", "2023-08-01 10:00", "tablet"),
    ("u3", "purchase", "2023-08-01 10:05", "tablet"),
]

df = spark.createDataFrame(data, schema)

In [None]:
df = df.withColumn('event_timestamp', to_timestamp(col('event_timestamp')))
df.printSchema()
df.show()

In [None]:
login = df.filter(col('event_type') == 'login').withColumnRenamed("event_timestamp", "login_time")
logout = df.filter(col('event_type') == 'logout').withColumnRenamed("event_timestamp", "logout_time")

session = login.join(logout, "user_id", 'inner').filter(col("logout_time") > col("login_time"))
session.show()

In [None]:
session = session.withColumn('session_time', (unix_timestamp("logout_time") - unix_timestamp("login_time"))/60)

In [None]:
session.show()

In [None]:
result = session.select("user_id", "session_time")
result.show()