In [1]:
from pyspark.sql import SparkSession
import getpass
from pyspark.sql.functions import col, lit, to_date, date_format, dense_rank, max, min, datediff
from pyspark.sql.window import Window

In [2]:
user = getpass.getuser()

In [3]:
spark = SparkSession.builder.appName(f"{user} continous login problem")\
.config("spark.sql.warehouse.dir", f"/user/{user}/warehouse")\
.enableHiveSupport()\
.master("yarn")\
.getOrCreate()

In [4]:
data = [
(1, '2024-03-01'),
(1, '2024-03-02'),
(1, '2024-03-03'),
(1, '2024-03-04'),
(1, '2024-03-06'),
(1, '2024-03-10'),
(1, '2024-03-11'),
(1, '2024-03-12'),
(1, '2024-03-13'),
(1, '2024-03-14'),
(1, '2024-03-20'),
(1, '2024-03-25'),
(1, '2024-03-26'),
(1, '2024-03-27'),
(1, '2024-03-28'),
(1, '2024-03-29'),
(1, '2024-03-30'),
(2, '2024-03-01'),
(2, '2024-03-02'),
(2, '2024-03-03'),
(2, '2024-03-04'),
(3, '2024-03-01'),
(3, '2024-03-02'),
(3, '2024-03-03'),
(3, '2024-03-04'),
(3, '2024-03-04'),
(3, '2024-03-04'),
(3, '2024-03-05'),
(4, '2024-03-01'),
(4, '2024-03-02'),
(4, '2024-03-03'),
(4, '2024-03-04'),
(4, '2024-03-04')
]

In [5]:
schema = "user_id int , login_date string"

df = spark.createDataFrame(data = data , schema = schema)
df

user_id,login_date
1,2024-03-01
1,2024-03-02
1,2024-03-03
1,2024-03-04
1,2024-03-06
1,2024-03-10
1,2024-03-11
1,2024-03-12
1,2024-03-13
1,2024-03-14


In [6]:
df = df.withColumn("login_date", to_date("login_date"))

In [7]:
df = df.withColumn("dense_rank", dense_rank().over(Window.partitionBy("user_id").orderBy("login_date")))

In [8]:
new_df = df.withColumn("windowing_Col", col("login_date")- col("dense_rank")) 

In [9]:
new_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- login_date: date (nullable = true)
 |-- dense_rank: integer (nullable = true)
 |-- windowing_Col: date (nullable = true)



In [12]:
new_df.groupBy("user_id", "windowing_Col").agg(max("login_date").alias("end_date")\
                                        , min("login_date").alias("start_date"))\
.withColumn("consecutive_day_count", datediff(col("end_date") + 1, col("start_date")))\
.filter("consecutive_day_count >= 5").orderBy("user_id", "start_date")

user_id,windowing_Col,end_date,start_date,consecutive_day_count
1,2024-03-04,2024-03-14,2024-03-10,5
1,2024-03-13,2024-03-30,2024-03-25,6
3,2024-02-29,2024-03-05,2024-03-01,5
