In [0]:
from pyspark.sql.functions import to_date
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [0]:
from pyspark.sql import SparkSession


# Create a Spark session
spark = SparkSession.builder.appName("TopConsecutiveUsers").getOrCreate()

# Define the schema for your data
schema = ["user_id", "timestamp"]
data = [
    ("user1", "2023-08-26 10:10:00"),
    ("user1", "2023-08-26 10:10:25"),
    ("user2", "2023-08-26 12:00:00"),
    ("user2", "2023-08-26 12:10:00"),
    ("user1", "2023-08-27 14:30:00"),
    ("user1", "2023-08-28 16:00:00"),
    ("user2", "2023-08-27 16:30:00"),
    ("user1", "2023-08-29 18:00:00"),
]

# Create a DataFrame from the data
df = spark.createDataFrame(data, schema=schema)
df.show()

+-------+-------------------+
|user_id|          timestamp|
+-------+-------------------+
|  user1|2023-08-26 10:10:00|
|  user1|2023-08-26 10:10:25|
|  user2|2023-08-26 12:00:00|
|  user2|2023-08-26 12:10:00|
|  user1|2023-08-27 14:30:00|
|  user1|2023-08-28 16:00:00|
|  user2|2023-08-27 16:30:00|
|  user1|2023-08-29 18:00:00|
+-------+-------------------+



In [0]:
# Convert the timestamp column to a date type
df = df.withColumn("date", to_date("timestamp"))
df.show()

+-------+-------------------+----------+
|user_id|          timestamp|      date|
+-------+-------------------+----------+
|  user1|2023-08-26 10:10:00|2023-08-26|
|  user1|2023-08-26 10:10:25|2023-08-26|
|  user2|2023-08-26 12:00:00|2023-08-26|
|  user2|2023-08-26 12:10:00|2023-08-26|
|  user1|2023-08-27 14:30:00|2023-08-27|
|  user1|2023-08-28 16:00:00|2023-08-28|
|  user2|2023-08-27 16:30:00|2023-08-27|
|  user1|2023-08-29 18:00:00|2023-08-29|
+-------+-------------------+----------+



In [0]:
# Define a window specification to partition by user and order by date
window_spec = Window.partitionBy("user_id").orderBy("date")
df=df.withColumn("lag_dt",F.lag("date").over(window_spec))
# Calculate the difference between consecutive dates
df = df.withColumn("date_diff", F.datediff("date","lag_dt" ))
df.show()

+-------+-------------------+----------+----------+---------+
|user_id|          timestamp|      date|    lag_dt|date_diff|
+-------+-------------------+----------+----------+---------+
|  user1|2023-08-26 10:10:00|2023-08-26|      null|     null|
|  user1|2023-08-26 10:10:25|2023-08-26|2023-08-26|        0|
|  user1|2023-08-27 14:30:00|2023-08-27|2023-08-26|        1|
|  user1|2023-08-28 16:00:00|2023-08-28|2023-08-27|        1|
|  user1|2023-08-29 18:00:00|2023-08-29|2023-08-28|        1|
|  user2|2023-08-26 12:00:00|2023-08-26|      null|     null|
|  user2|2023-08-26 12:10:00|2023-08-26|2023-08-26|        0|
|  user2|2023-08-27 16:30:00|2023-08-27|2023-08-26|        1|
+-------+-------------------+----------+----------+---------+



In [0]:
# Identify consecutive login sequences
df = df.withColumn("is_consecutive", F.when(F.col("date_diff") == 1, 1).otherwise(0))
df.show()

+-------+-------------------+----------+----------+---------+--------------+
|user_id|          timestamp|      date|    lag_dt|date_diff|is_consecutive|
+-------+-------------------+----------+----------+---------+--------------+
|  user1|2023-08-26 10:10:00|2023-08-26|      null|     null|             0|
|  user1|2023-08-26 10:10:25|2023-08-26|2023-08-26|        0|             0|
|  user1|2023-08-27 14:30:00|2023-08-27|2023-08-26|        1|             1|
|  user1|2023-08-28 16:00:00|2023-08-28|2023-08-27|        1|             1|
|  user1|2023-08-29 18:00:00|2023-08-29|2023-08-28|        1|             1|
|  user2|2023-08-26 12:00:00|2023-08-26|      null|     null|             0|
|  user2|2023-08-26 12:10:00|2023-08-26|2023-08-26|        0|             0|
|  user2|2023-08-27 16:30:00|2023-08-27|2023-08-26|        1|             1|
+-------+-------------------+----------+----------+---------+--------------+



In [0]:
# Calculate the cumulative sum of consecutive logins
df = df.withColumn("consecutive_sum", F.sum("is_consecutive").over(window_spec))
df.show()


+-------+-------------------+----------+----------+---------+--------------+---------------+
|user_id|          timestamp|      date|    lag_dt|date_diff|is_consecutive|consecutive_sum|
+-------+-------------------+----------+----------+---------+--------------+---------------+
|  user1|2023-08-26 10:10:00|2023-08-26|      null|     null|             0|              0|
|  user1|2023-08-26 10:10:25|2023-08-26|2023-08-26|        0|             0|              0|
|  user1|2023-08-27 14:30:00|2023-08-27|2023-08-26|        1|             1|              1|
|  user1|2023-08-28 16:00:00|2023-08-28|2023-08-27|        1|             1|              2|
|  user1|2023-08-29 18:00:00|2023-08-29|2023-08-28|        1|             1|              3|
|  user2|2023-08-26 12:00:00|2023-08-26|      null|     null|             0|              0|
|  user2|2023-08-26 12:10:00|2023-08-26|2023-08-26|        0|             0|              0|
|  user2|2023-08-27 16:30:00|2023-08-27|2023-08-26|        1|         

In [0]:

# Find the maximum consecutive logins for each user
max_consecutive_df = df.groupBy("user_id").agg(F.max("consecutive_sum").alias("max_consecutive_days"))
max_consecutive_df.show()

+-------+--------------------+
|user_id|max_consecutive_days|
+-------+--------------------+
|  user1|                   3|
|  user2|                   1|
+-------+--------------------+



In [0]:
# Find the top 1 users with the most consecutive logins
top_1_users = max_consecutive_df.orderBy(F.desc("max_consecutive_days")).limit(1)

top_1_users.show()

+-------+--------------------+
|user_id|max_consecutive_days|
+-------+--------------------+
|  user1|                   3|
+-------+--------------------+

