In [0]:
from pyspark.sql.types import StructField,StructType,IntegerType,StringType
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("kit_id", IntegerType(), True),
    StructField("login_date", StringType(), True),
    StructField("sessions_count", IntegerType(), True)
])
data = [
    (1, 2, "2016-03-01", 5),
    (1, 2, "2016-03-02", 6),
    (2, 3, "2017-06-25", 1),
    (3, 1, "2016-03-02", 0),
    (3, 4, "2018-07-03", 5)
]

sport_df = spark.createDataFrame(data, schema=schema)
sport_df.show()

+-------+------+----------+--------------+
|user_id|kit_id|login_date|sessions_count|
+-------+------+----------+--------------+
|      1|     2|2016-03-01|             5|
|      1|     2|2016-03-02|             6|
|      2|     3|2017-06-25|             1|
|      3|     1|2016-03-02|             0|
|      3|     4|2018-07-03|             5|
+-------+------+----------+--------------+



In [0]:
from pyspark.sql.functions import min
df = sport_df.groupBy("user_id").agg(min("login_date"))
df.show()

+-------+---------------+
|user_id|min(login_date)|
+-------+---------------+
|      1|     2016-03-01|
|      2|     2017-06-25|
|      3|     2016-03-02|
+-------+---------------+



In [0]:
from pyspark.sql.functions import dense_rank,col
from pyspark.sql.window import Window
window_spec = Window.partitionBy("user_id").orderBy("login_date")
df = sport_df.withColumn("rnk",dense_rank().over(window_spec))
df.filter(col("rnk") == 1).show()

+-------+------+----------+--------------+---+
|user_id|kit_id|login_date|sessions_count|rnk|
+-------+------+----------+--------------+---+
|      1|     2|2016-03-01|             5|  1|
|      2|     3|2017-06-25|             1|  1|
|      3|     1|2016-03-02|             0|  1|
+-------+------+----------+--------------+---+



In [0]:
from pyspark.sql.functions import dense_rank,col
from pyspark.sql.window import Window
window_spec = Window.partitionBy("user_id").orderBy("login_date")
df = sport_df.withColumn("rnk",dense_rank().over(window_spec))
df.filter(col("rnk") == 1).show()

+-------+------+----------+--------------+---+
|user_id|kit_id|login_date|sessions_count|rnk|
+-------+------+----------+--------------+---+
|      1|     2|2016-03-01|             5|  1|
|      2|     3|2017-06-25|             1|  1|
|      3|     1|2016-03-02|             0|  1|
+-------+------+----------+--------------+---+



In [0]:
sport_df.show()
from pyspark.sql.functions import lag,asc,sum
window_spec = Window.partitionBy("user_id").orderBy(asc("login_date"))
df = sport_df.withColumn("lag_count" , lag("sessions_count",1,0).over(window_spec))
df = df.withColumn("games_played_so_far",col("sessions_count") + col("lag_count"))
df.show()
#another approach
df1 = sport_df.withColumn("games_played_so_far",sum("sessions_count").over(window_spec))
df1.show()

+-------+------+----------+--------------+
|user_id|kit_id|login_date|sessions_count|
+-------+------+----------+--------------+
|      1|     2|2016-03-01|             5|
|      1|     2|2016-03-02|             6|
|      2|     3|2017-06-25|             1|
|      3|     1|2016-03-02|             0|
|      3|     4|2018-07-03|             5|
+-------+------+----------+--------------+

+-------+------+----------+--------------+---------+-------------------+
|user_id|kit_id|login_date|sessions_count|lag_count|games_played_so_far|
+-------+------+----------+--------------+---------+-------------------+
|      1|     2|2016-03-01|             5|        0|                  5|
|      1|     2|2016-03-02|             6|        5|                 11|
|      2|     3|2017-06-25|             1|        0|                  1|
|      3|     1|2016-03-02|             0|        0|                  0|
|      3|     4|2018-07-03|             5|        0|                  5|
+-------+------+----------+-

In [0]:
from pyspark.sql.functions import datediff,min
window_spec = Window.partitionBy("user_id").orderBy("login_date")
df = sport_df.withColumn("lag_date",min("login_date").over(window_spec))
df = df.withColumn("diff_col",datediff("login_date","lag_date"))
df.show()
df.filter(col("diff_col")==1).select(col("user_id")).show()

+-------+------+----------+--------------+----------+--------+
|user_id|kit_id|login_date|sessions_count|  lag_date|diff_col|
+-------+------+----------+--------------+----------+--------+
|      1|     2|2016-03-01|             5|2016-03-01|       0|
|      1|     2|2016-03-02|             6|2016-03-01|       1|
|      2|     3|2017-06-25|             1|2017-06-25|       0|
|      3|     1|2016-03-02|             0|2016-03-02|       0|
|      3|     4|2018-07-03|             5|2016-03-02|     853|
+-------+------+----------+--------------+----------+--------+

+-------+
|user_id|
+-------+
|      1|
+-------+

