In [0]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[?25l[K     |                                | 10 kB 16.2 MB/s eta 0:00:20[K     |                                | 20 kB 2.0 MB/s eta 0:02:35[K     |                                | 30 kB 2.4 MB/s eta 0:02:12[K     |                                | 40 kB 2.7 MB/s eta 0:01:57[K     |                                | 51 kB 2.8 MB/s eta 0:01:55[K     |                                | 61 kB 3.3 MB/s eta 0:01:37[K     |                                | 71 kB 3.6 MB/s eta 0:01:29[K     |                                | 81 kB 3.6 MB/s eta 0:01:29[K     |                                | 92 kB 4.0 MB/s eta 0:01:20[K     |                                | 102 kB 4.0 MB/s eta 0:01:19[K     |                                | 112 kB 4.0 MB/s eta 0:01:19[K     |                                | 122 kB 4.0 MB/s eta 0:01:19[K     |                                | 133 kB 4.0 MB/s eta 0:01:19[K     |  

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
import pyspark.sql.functions as f
from pyspark.sql.window import Window

Create a SparkSession

In [0]:
spark=SparkSession.builder.appName("BadmintonAnalyis").getOrCreate()

Create Schema

In [0]:
schema=StructType([StructField("user_id",IntegerType(),True),StructField("kit_id",IntegerType(),True),StructField("login_date",StringType(),True),StructField("sessions_count",IntegerType(),True)])

In [0]:
Data=[(1,2,"2016-03-01",5),(1,2,"2016-03-02",6),(2,3,"2017-06-25",1),(3,1,"2016-03-02",0),(3,4,"2018-07-03",5)]

Create Dataframe


In [0]:
df=spark.createDataFrame(Data,schema=schema)

In [0]:
df.show()

+-------+------+----------+--------------+
|user_id|kit_id|login_date|sessions_count|
+-------+------+----------+--------------+
|      1|     2|2016-03-01|             5|
|      1|     2|2016-03-02|             6|
|      2|     3|2017-06-25|             1|
|      3|     1|2016-03-02|             0|
|      3|     4|2018-07-03|             5|
+-------+------+----------+--------------+



First Approach Use GroupBy and Min

In [0]:
df1=df.groupBy("user_id").agg(f.min("login_date").alias("min_date"))

In [0]:
df1.show()

+-------+----------+
|user_id|  min_date|
+-------+----------+
|      1|2016-03-01|
|      2|2017-06-25|
|      3|2016-03-02|
+-------+----------+



Secong Approach Using window functions

In [0]:
df.show()

+-------+------+----------+--------------+
|user_id|kit_id|login_date|sessions_count|
+-------+------+----------+--------------+
|      1|     2|2016-03-01|             5|
|      1|     2|2016-03-02|             6|
|      2|     3|2017-06-25|             1|
|      3|     1|2016-03-02|             0|
|      3|     4|2018-07-03|             5|
+-------+------+----------+--------------+



In [0]:
Division=Window.partitionBy("user_id").orderBy("login_date")

In [0]:
df2=df.withColumn("ran",f.rank().over(Division))

In [0]:
df2.filter(df2.ran==1).select("user_id","login_date").show()

+-------+----------+
|user_id|login_date|
+-------+----------+
|      1|2016-03-01|
|      2|2017-06-25|
|      3|2016-03-02|
+-------+----------+



Total Matches Played

In [0]:
df.withColumn("total_login_tilldate",f.sum(f.col("sessions_count")).over(Division)).show()

+-------+------+----------+--------------+--------------------+
|user_id|kit_id|login_date|sessions_count|total_login_tilldate|
+-------+------+----------+--------------+--------------------+
|      1|     2|2016-03-01|             5|                   5|
|      1|     2|2016-03-02|             6|                  11|
|      2|     3|2017-06-25|             1|                   1|
|      3|     1|2016-03-02|             0|                   0|
|      3|     4|2018-07-03|             5|                   5|
+-------+------+----------+--------------+--------------------+



Users Who came for atlease 2 consecutive days after their first login


In [0]:
df3=df2.withColumn("nextdate",f.lead(f.col("login_date")).over(Division))

In [0]:
df3.filter(f.col("ran")==1).withColumn("differencedate",f.datediff(f.col("nextdate"),f.col("login_date"))).filter(f.col("differencedate")==1).select("user_id").show()

+-------+
|user_id|
+-------+
|      1|
+-------+

