<a href="https://colab.research.google.com/github/chetanpatil4160/Pyspark/blob/main/Spark_Coiding_Question.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("interview").getOrCreate()

In [None]:
spark

In [None]:
# Sample data (login_date is now a string)
data = [
    (101, 1, "2024-02-01", 3),
    (101, 1, "2024-02-02", 3),
    (102, 2, "2024-02-01", 2),
    (103, 1, "2024-02-02", 5),
    (104, 3, "2024-02-03", 1),
    (105, 2, "2024-02-03", 4),
    (101, 1, "2024-02-04", 2),
    (102, 2, "2024-02-05", 3),
    (103, 1, "2024-02-06", 6),
    (104, 3, "2024-02-07", 2),
    (105, 2, "2024-02-07", 3),
    (106, 2, "2024-02-08", 1),
    (107, 1, "2024-02-08", 5),
    (108, 3, "2024-02-09", 2),
    (109, 3, "2024-02-09", 4),
    (101, 2, "2024-02-01", 1),  # Existing extra entry

    # New entries for users with continuous logins:
    (103, 1, "2024-02-07", 7),  # 103 logged in on 2024-02-06, so add for 2024-02-07
    (101, 1, "2024-02-05", 3),  # 101 logged in on 2024-02-04, so add for 2024-02-05
    (105, 2, "2024-02-08", 4)   # 105 logged in on 2024-02-07, so add for 2024-02-08
]


coloumns=["user_id","kit_id","login_date","session_count"]

df = spark.createDataFrame(data,coloumns)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#df1 = df.withColumn("login_date",df.login_date.cast(DateType()))

df1 = df.withColumn("login_date",to_date(col("login_date"),"yyyy-MM-dd"))

df1.createOrReplaceTempView("orders")

In [None]:
df1.show()

+-------+------+----------+-------------+
|user_id|kit_id|login_date|session_count|
+-------+------+----------+-------------+
|    101|     1|2024-02-01|            3|
|    101|     1|2024-02-02|            3|
|    102|     2|2024-02-01|            2|
|    103|     1|2024-02-02|            5|
|    104|     3|2024-02-03|            1|
|    105|     2|2024-02-03|            4|
|    101|     1|2024-02-04|            2|
|    102|     2|2024-02-05|            3|
|    103|     1|2024-02-06|            6|
|    104|     3|2024-02-07|            2|
|    105|     2|2024-02-07|            3|
|    106|     2|2024-02-08|            1|
|    107|     1|2024-02-08|            5|
|    108|     3|2024-02-09|            2|
|    109|     3|2024-02-09|            4|
|    101|     2|2024-02-01|            1|
|    103|     1|2024-02-07|            7|
|    101|     1|2024-02-05|            3|
|    105|     2|2024-02-08|            4|
+-------+------+----------+-------------+



In [None]:
# Grouping by user_id and finding the first login date for each user
df.groupBy(df.user_id).agg(min(df.login_date).alias("first_login_date")).show()

# Equivalent SQL query to achieve the same result using Spark SQL
spark.sql("""
            SELECT user_id, MIN(login_date) AS first_login_date
            FROM orders
            GROUP BY user_id
            ORDER BY user_id
""").show()


# Importing Window function for window operations
# Defining a window partitioned by user_id and ordered by login_date
# Assigning a row number to each login date within each user_id partition
# Filtering to keep only the first login date (row number = 1)

from pyspark.sql.window import Window

window_spec = Window.partitionBy(df.user_id).orderBy(df.login_date)

df.withColumn("row_num", row_number().over(window_spec)) \
  .filter(col("row_num") == 1).show()

+-------+----------------+
|user_id|first_login_date|
+-------+----------------+
|    101|      2024-02-01|
|    102|      2024-02-01|
|    103|      2024-02-02|
|    104|      2024-02-03|
|    105|      2024-02-03|
|    106|      2024-02-08|
|    107|      2024-02-08|
|    108|      2024-02-09|
|    109|      2024-02-09|
+-------+----------------+

+-------+----------------+
|user_id|first_login_date|
+-------+----------------+
|    101|      2024-02-01|
|    102|      2024-02-01|
|    103|      2024-02-02|
|    104|      2024-02-03|
|    105|      2024-02-03|
|    106|      2024-02-08|
|    107|      2024-02-08|
|    108|      2024-02-09|
|    109|      2024-02-09|
+-------+----------------+

+-------+------+----------+-------------+-------+
|user_id|kit_id|login_date|session_count|row_num|
+-------+------+----------+-------------+-------+
|    101|     1|2024-02-01|            3|      1|
|    102|     2|2024-02-01|            2|      1|
|    103|     1|2024-02-02|            5|     

**`Question 2 - Find Out The Kit Id Use by Each Player On First Day`**

In [None]:
# Partitioning the data by user_id and ordering it by login_date in ascending order
window_spec = Window.partitionBy(df.user_id).orderBy(df.login_date)

# Add a new column "rank" using dense_rank() function
# This assigns a rank to each row within the user_id partition based on login_date
df_with_rank = df.withColumn("rank", dense_rank().over(window_spec))

# Filter rows where rank is 1 (earliest login for each user)
df_with_rank.filter(col("rank") == 1).show()

+-------+------+----------+-------------+----+
|user_id|kit_id|login_date|session_count|rank|
+-------+------+----------+-------------+----+
|    101|     1|2024-02-01|            3|   1|
|    101|     2|2024-02-01|            1|   1|
|    102|     2|2024-02-01|            2|   1|
|    103|     1|2024-02-02|            5|   1|
|    104|     3|2024-02-03|            1|   1|
|    105|     2|2024-02-03|            4|   1|
|    106|     2|2024-02-08|            1|   1|
|    107|     1|2024-02-08|            5|   1|
|    108|     3|2024-02-09|            2|   1|
|    109|     3|2024-02-09|            4|   1|
+-------+------+----------+-------------+----+



**` For Same Question Above Show Kit Id in Array Instead Of Different Row`**


In [None]:
window_spec = Window.partitionBy(df.user_id).orderBy(df.login_date)

# Add a new column "rank" using dense_rank() function
# This assigns a rank to each row within the user_id partition based on login_date
df_with_rank = df.withColumn("rank", dense_rank().over(window_spec))

# Filter rows where rank is 1 (earliest login for each user)
kit_used = df_with_rank.filter(col("rank") == 1) \
.groupby(col("user_id")) \
.agg(collect_list(col("kit_id")).alias("kit_id"))

kit_used.show()

+-------+------+
|user_id|kit_id|
+-------+------+
|    101|[1, 2]|
|    102|   [2]|
|    103|   [1]|
|    104|   [3]|
|    105|   [2]|
|    106|   [2]|
|    107|   [1]|
|    108|   [3]|
|    109|   [3]|
+-------+------+



**Calculate games Played by each Users**

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

# Define window: partition by user_id, ordered by login_date, cumulative sum
running_dum = Window.partitionBy(df.user_id).orderBy(df.login_date).rowsBetween(Window.unboundedPreceding, 0)

# Add running total column
running_total = df.withColumn("running_total", sum("session_count").over(running_dum))
running_total.show()


# Calculate the running total of session_count for each user
spark.sql("""SELECT user_id,
       login_date,
       session_count,
       SUM(session_count) OVER (
           PARTITION BY user_id
           ORDER BY login_date
           ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
       ) AS running_total
FROM orders""").show()

+-------+------+----------+-------------+-------------+
|user_id|kit_id|login_date|session_count|running_total|
+-------+------+----------+-------------+-------------+
|    101|     1|2024-02-01|            3|            3|
|    101|     2|2024-02-01|            1|            4|
|    101|     1|2024-02-02|            3|            7|
|    101|     1|2024-02-04|            2|            9|
|    101|     1|2024-02-05|            3|           12|
|    102|     2|2024-02-01|            2|            2|
|    102|     2|2024-02-05|            3|            5|
|    103|     1|2024-02-02|            5|            5|
|    103|     1|2024-02-06|            6|           11|
|    103|     1|2024-02-07|            7|           18|
|    104|     3|2024-02-03|            1|            1|
|    104|     3|2024-02-07|            2|            3|
|    105|     2|2024-02-03|            4|            4|
|    105|     2|2024-02-07|            3|            7|
|    105|     2|2024-02-08|            4|       

**Find out The Users Who Has logged in continously**

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Compute the next login date per user and filter logins occurring on the same or next day
Window = Window.partitionBy(df.user_id).orderBy(df.login_date)

df.withColumn("next_date", lead("login_date").over(Window).alias("next_date")) \
.withColumn("diff", datediff(col("next_date"), col("login_date"))) \
.filter(col("diff").isin([0,1])).show()



# Findout out the users which come first day as well as next day
df.withColumn("next_date", lead("login_date").over(Window).alias("next_date")) \
.withColumn("first_login_date", first("login_date").over(Window).alias("first_login_date")) \
.withColumn("diff", datediff(col("next_date"), col("first_login_date"))) \
.filter(col("diff") == 1).show()

+-------+------+----------+-------------+----------+----+
|user_id|kit_id|login_date|session_count| next_date|diff|
+-------+------+----------+-------------+----------+----+
|    101|     1|2024-02-01|            3|2024-02-01|   0|
|    101|     2|2024-02-01|            1|2024-02-02|   1|
|    101|     1|2024-02-04|            2|2024-02-05|   1|
|    103|     1|2024-02-06|            6|2024-02-07|   1|
|    105|     2|2024-02-07|            3|2024-02-08|   1|
+-------+------+----------+-------------+----------+----+

+-------+------+----------+-------------+----------+----------------+----+
|user_id|kit_id|login_date|session_count| next_date|first_login_date|diff|
+-------+------+----------+-------------+----------+----------------+----+
|    101|     2|2024-02-01|            1|2024-02-02|      2024-02-01|   1|
+-------+------+----------+-------------+----------+----------------+----+

