<a href="https://colab.research.google.com/github/dineshsaud/SparkDataAnalysis/blob/main/SportDataAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [1]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
from pyspark.sql.window import Window


In [2]:
spark = (
    SparkSession.builder
    .appName("DineshPractice.me")
    .getOrCreate())


In [3]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("kit_id", IntegerType(), True),
    StructField("login_date", StringType(), True),
    StructField("sessions_count", IntegerType(), True)
])

In [4]:
data = [
    (1, 2, "2016-03-01", 5),
    (1, 2, "2016-03-02", 6),
    (2, 3, "2017-06-25", 1),
    (3, 1, "2016-03-02", 0),
    (3, 4, "2018-07-03", 5)
]

In [5]:
input_df = spark.createDataFrame(data=data,schema=schema)

input_df.show()

+-------+------+----------+--------------+
|user_id|kit_id|login_date|sessions_count|
+-------+------+----------+--------------+
|      1|     2|2016-03-01|             5|
|      1|     2|2016-03-02|             6|
|      2|     3|2017-06-25|             1|
|      3|     1|2016-03-02|             0|
|      3|     4|2018-07-03|             5|
+-------+------+----------+--------------+



In [6]:
GroupedDF = input_df.groupBy("user_id").agg(
    f.min("login_date").alias("first_login"))

GroupedDF.show()

+-------+-----------+
|user_id|first_login|
+-------+-----------+
|      1| 2016-03-01|
|      2| 2017-06-25|
|      3| 2016-03-02|
+-------+-----------+



In [7]:
windowSpec = Window.partitionBy("user_id").orderBy("login_date")


rankDf = input_df.withColumn("rank", f.rank().over(windowSpec))

rankDf.show()


+-------+------+----------+--------------+----+
|user_id|kit_id|login_date|sessions_count|rank|
+-------+------+----------+--------------+----+
|      1|     2|2016-03-01|             5|   1|
|      1|     2|2016-03-02|             6|   2|
|      2|     3|2017-06-25|             1|   1|
|      3|     1|2016-03-02|             0|   1|
|      3|     4|2018-07-03|             5|   2|
+-------+------+----------+--------------+----+



In [16]:
windowSpec = Window.partitionBy("user_id").orderBy("login_date")

runningDf = input_df.withColumn(
    "Total_Game_session", f.sum("sessions_count").over(windowSpec)
    )

runningDf..show()

+-------+
|user_id|
+-------+
|      1|
|      1|
|      2|
|      3|
|      3|
+-------+



In [17]:
requiredDf = input_df.withColumn(
    "first_login", f.min("login_date").over( Window.partitionBy("user_id"))
    ).withColumn(
        "lead_data", f.lead("login_date").over( Window.partitionBy("user_id").orderBy("login_date"))
    ).filter(f.datediff("lead_data","first_login") == 1)


requiredDf.select("user_id").show()

+-------+
|user_id|
+-------+
|      1|
+-------+



## PySpark Code Explanation

This PySpark code snippet performs operations on a DataFrame named `input_df` to identify user login patterns, specifically targeting users who logged in consecutively for two days starting from their first login. The operations include adding new columns and filtering the data based on specific conditions.

### Detailed Steps

1. **Window Specification**:
   - `Window.partitionBy("user_id")`: This part of the code defines a window specification that partitions data by `user_id`. This means operations that follow (like calculating the minimum login date) are performed within each user's data.

2. **Add `first_login` Column**:
   - `.withColumn("first_login", f.min("login_date").over(Window.partitionBy("user_id")))`: This line adds a new column called `first_login` to the DataFrame. For each user, it calculates the minimum (or first) login date. This calculation is done within the defined window (by user).

3. **Add `lead_data` Column**:
   - `.withColumn("lead_data", f.lead("login_date").over(Window.partitionBy("user_id").orderBy("login_date")))`: This line adds another column named `lead_data`. It uses the `lead` function to get the login date of the next row within each user's group of data. The data is ordered by `login_date` within each user partition to ensure the correct sequence.

4. **Filter Data**:
   - `.filter(f.datediff("lead_data","first_login") == 1)`: This filtering step narrows down the DataFrame to rows where the difference between `lead_data` (the following day's login date) and `first_login` is exactly one day. This identifies users who logged in consecutively for at least two days starting from their first login.

5. **Order and Display Data**:
   - `.orderBy("user_id", "login_date")`: Orders the resulting DataFrame by `user_id` and `login_date` to organize the data.
   - `.show()`: Displays the DataFrame in a tabular format.

### Conclusion

The resulting DataFrame, `requiredDf`, will show the `user_id`, `login_date`, `first_login`, and `lead_data` for each user who logged in consecutively for the first two days after their first recorded login. This can be useful for analyzing user engagement patterns over time.
