In [2]:
'''
You are given an events dataset where each record contains a user_id, timestamp, and event_type.

Your task is to compute, for each calendar day, the top 3 users with the highest number of distinct event types that occurred on that day. Break ties by:

Higher total events that day (count of rows for that user-day).
Then lexicographically smaller user_id.
Return the result sorted by date (ascending), rank (ascending), and user_id (ascending) for readability.

Input Schema & Example
Column Name	Data Type
user_id	String
timestamp	String
event_type	String
Output Schema
Column Name	Data Type
date	Date
user_id	String
distinct_event_types	Integer
total_events	Integer
rank	Integer
rank ranges from 1 to 3 per date, representing the userâ€™s position for that day.
Starter Code
from pyspark.sql import SparkSession, functions as F, Window

spark = SparkSession.builder.getOrCreate()

data = [
    # 2025-06-01
    ("u1", "2025-06-01 00:10:00", "click"),
    ("u1", "2025-06-01 01:20:00", "view"),
    ("u1", "2025-06-01 02:30:00", "click"),
    ("u2", "2025-06-01 10:00:00", "view"),
    ("u2", "2025-06-01 11:00:00", "purchase"),
    ("u3", "2025-06-01 12:00:00", "view"),
    ("u3", "2025-06-01 13:00:00", "view"),
    ("u4", "2025-06-01 23:59:59", "click"),

    # 2025-06-02
    ("u1", "2025-06-02 00:00:01", "view"),
    ("u1", "2025-06-02 08:00:00", "purchase"),
    ("u2", "2025-06-02 09:00:00", "click"),
    ("u2", "2025-06-02 09:05:00", "view"),
    ("u2", "2025-06-02 09:10:00", "share"),
    ("u3", "2025-06-02 22:00:00", "view"),

    ("u5", "2025-06-03 10:00:00", "click"),
    ("u5", "2025-06-03 10:01:00", "view"),
    ("u5", "2025-06-03 10:02:00", "share"),
    ("u6", "2025-06-03 10:03:00", "click"),
    ("u6", "2025-06-03 10:04:00", "view"),
    ("u6", "2025-06-03 10:05:00", "share"),
    ("u6", "2025-06-03 10:06:00", "view"),
    ("u7", "2025-06-03 11:00:00", "view"),

    ("u1", "2025-06-04 09:00:00", "click"),
    ("u2", "2025-06-04 09:05:00", "click"),

    ("u8", "2025-06-01 23:59:59", "view"),
    ("u8", "2025-06-02 00:00:00", "view"),
]

columns = ["user_id", "timestamp", "event_type"]
df = spark.createDataFrame(data, columns)

# Your logic goes here to create df_result

display(df_result)

'''

# Initialize Spark session
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName('Spark Playground').getOrCreate()

data = [
    # 2025-06-01
    ("u1", "2025-06-01 00:10:00", "click"),
    ("u1", "2025-06-01 01:20:00", "view"),
    ("u1", "2025-06-01 02:30:00", "click"),
    ("u2", "2025-06-01 10:00:00", "view"),
    ("u2", "2025-06-01 11:00:00", "purchase"),
    ("u3", "2025-06-01 12:00:00", "view"),
    ("u3", "2025-06-01 13:00:00", "view"),
    ("u4", "2025-06-01 23:59:59", "click"),

    # 2025-06-02
    ("u1", "2025-06-02 00:00:01", "view"),
    ("u1", "2025-06-02 08:00:00", "purchase"),
    ("u2", "2025-06-02 09:00:00", "click"),
    ("u2", "2025-06-02 09:05:00", "view"),
    ("u2", "2025-06-02 09:10:00", "share"),
    ("u3", "2025-06-02 22:00:00", "view"),

    ("u5", "2025-06-03 10:00:00", "click"),
    ("u5", "2025-06-03 10:01:00", "view"),
    ("u5", "2025-06-03 10:02:00", "share"),
    ("u6", "2025-06-03 10:03:00", "click"),
    ("u6", "2025-06-03 10:04:00", "view"),
    ("u6", "2025-06-03 10:05:00", "share"),
    ("u6", "2025-06-03 10:06:00", "view"),
    ("u7", "2025-06-03 11:00:00", "view"),

    ("u1", "2025-06-04 09:00:00", "click"),
    ("u2", "2025-06-04 09:05:00", "click"),

    ("u8", "2025-06-01 23:59:59", "view"),
    ("u8", "2025-06-02 00:00:00", "view"),
]

columns = ["user_id", "timestamp", "event_type"]
df = spark.createDataFrame(data, columns)

# Extract date from timestamp and calculate aggregates per user per day
df_with_extra_columns = (
  df.withColumn("date", F.to_date(F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd"))
  .groupBy(F.col("date"), F.col("user_id"))
  .agg(
    F.countDistinct(F.col("event_type")).alias("distinct_event_types"), # Count of unique event types
    F.count(F.col("event_type")).alias("total_events") # Total events per user per day
  )
)

# Define window specification for ranking
window_spec = Window.partitionBy("date").orderBy(
  F.col("distinct_event_types").desc(), # Higher unique event types first
  F.col("total_events").desc(), # Break ties with total events
  "user_id" # Break further ties lexicographically
)

# Apply row_number to rank users per day and filter top 3
df_result = (
  df_with_extra_columns.withColumn("rank", F.row_number().over(window_spec)) # Assign rank
  .filter(F.col("rank") <= 3) # Keep only top 3 users per day
  .orderBy("date", "rank", "user_id") # Sort final output for readability
)

# Display results
df_result.show()

+----------+-------+--------------------+------------+----+
|      date|user_id|distinct_event_types|total_events|rank|
+----------+-------+--------------------+------------+----+
|2025-06-01|     u1|                   2|           3|   1|
|2025-06-01|     u2|                   2|           2|   2|
|2025-06-01|     u3|                   1|           2|   3|
|2025-06-02|     u2|                   3|           3|   1|
|2025-06-02|     u1|                   2|           2|   2|
|2025-06-02|     u3|                   1|           1|   3|
|2025-06-03|     u6|                   3|           4|   1|
|2025-06-03|     u5|                   3|           3|   2|
|2025-06-03|     u7|                   1|           1|   3|
|2025-06-04|     u1|                   1|           1|   1|
|2025-06-04|     u2|                   1|           1|   2|
+----------+-------+--------------------+------------+----+

