<a href="https://colab.research.google.com/github/ankitarm/PySpark/blob/main/Chatgpt_Pyspark_questions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. Question 1: Customer Transactions Analysis
Write a PySpark program to find the first transaction date and total transaction amount for each customer.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
spark = SparkSession.builder.master("local[*]").appName("CustomerTransactions").getOrCreate()
from pyspark.sql import Row

# Sample data
data = [
    Row(customer_id=101, transaction_date="2023-01-10", amount=150.0),
    Row(customer_id=102, transaction_date="2023-01-12", amount=200.0),
    Row(customer_id=101, transaction_date="2023-01-15", amount=100.0),
    Row(customer_id=101, transaction_date="2023-01-05", amount=50.0),
    Row(customer_id=102, transaction_date="2023-01-14", amount=300.0)
]

# Create DataFrame
transactions = spark.createDataFrame(data)


# Show input data
transactions.show()


+-----------+----------------+------+
|customer_id|transaction_date|amount|
+-----------+----------------+------+
|        101|      2023-01-10| 150.0|
|        102|      2023-01-12| 200.0|
|        101|      2023-01-15| 100.0|
|        101|      2023-01-05|  50.0|
|        102|      2023-01-14| 300.0|
+-----------+----------------+------+



In [None]:
from pyspark.sql.types import DateType

# Cast transaction_date from string to date
# to_date(column, "yyyy-MM-dd")     column.cast(StringType())
transactions.withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd")).show()
transactions.select( "*" , col("transaction_date").cast(DateType())).show()


+-----------+----------------+------+
|customer_id|transaction_date|amount|
+-----------+----------------+------+
|        101|      2023-01-10| 150.0|
|        102|      2023-01-12| 200.0|
|        101|      2023-01-15| 100.0|
|        101|      2023-01-05|  50.0|
|        102|      2023-01-14| 300.0|
+-----------+----------------+------+

+-----------+----------------+------+----------------+
|customer_id|transaction_date|amount|transaction_date|
+-----------+----------------+------+----------------+
|        101|      2023-01-10| 150.0|      2023-01-10|
|        102|      2023-01-12| 200.0|      2023-01-12|
|        101|      2023-01-15| 100.0|      2023-01-15|
|        101|      2023-01-05|  50.0|      2023-01-05|
|        102|      2023-01-14| 300.0|      2023-01-14|
+-----------+----------------+------+----------------+



In [None]:
display(transactions)

DataFrame[customer_id: bigint, transaction_date: string, amount: double]

In [None]:
transactions = transactions.withColumn( "transaction_date" , col("transaction_date").cast(DateType()))
transactions.show()

+-----------+----------------+------+
|customer_id|transaction_date|amount|
+-----------+----------------+------+
|        101|      2023-01-10| 150.0|
|        102|      2023-01-12| 200.0|
|        101|      2023-01-15| 100.0|
|        101|      2023-01-05|  50.0|
|        102|      2023-01-14| 300.0|
+-----------+----------------+------+



In [None]:
from pyspark.sql.functions import sum, min
transactions.groupBy(col("customer_id")).agg(min(transactions.transaction_date).alias("transaction_date"), sum("amount").alias("amount")).show()

+-----------+----------------+------+
|customer_id|transaction_date|amount|
+-----------+----------------+------+
|        101|      2023-01-05| 300.0|
|        102|      2023-01-12| 500.0|
+-----------+----------------+------+



2.  Question Active User Streaks
For each user, find the longest streak of consecutive login days.



In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import to_date, col

# Sample login data
login_data = [
    Row(user_id=1, login_date="2023-01-01"),
    Row(user_id=1, login_date="2023-01-02"),
    Row(user_id=1, login_date="2023-01-04"),
    Row(user_id=1, login_date="2023-01-05"),
    Row(user_id=1, login_date="2023-01-06"),
    Row(user_id=2, login_date="2023-01-01"),
    Row(user_id=2, login_date="2023-01-03"),
    Row(user_id=2, login_date="2023-01-04")
]

# Create the DataFrame
logins = spark.createDataFrame(login_data)
logins.show()

+-------+----------+
|user_id|login_date|
+-------+----------+
|      1|2023-01-01|
|      1|2023-01-02|
|      1|2023-01-04|
|      1|2023-01-05|
|      1|2023-01-06|
|      2|2023-01-01|
|      2|2023-01-03|
|      2|2023-01-04|
+-------+----------+



In [None]:
display(logins)

DataFrame[user_id: bigint, login_date: string]

In [None]:
from pyspark.sql.types import DateType
logins = logins.withColumn("login_date", col("login_date").cast(DateType()))
logins.show()

+-------+----------+
|user_id|login_date|
+-------+----------+
|      1|2023-01-01|
|      1|2023-01-02|
|      1|2023-01-04|
|      1|2023-01-05|
|      1|2023-01-06|
|      2|2023-01-01|
|      2|2023-01-03|
|      2|2023-01-04|
+-------+----------+



In [None]:
display(logins)

DataFrame[user_id: bigint, login_date: date]

have to use row_number() which is window function so import window from window


dense_rank cannot be used - it will over count the entries.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_row = Window.partitionBy(col("user_id")).orderBy(col("login_date"))

logins = logins.withColumn("row_num", row_number().over(window_row))
logins.show()


+-------+----------+-------+
|user_id|login_date|row_num|
+-------+----------+-------+
|      1|2023-01-01|      1|
|      1|2023-01-02|      2|
|      1|2023-01-04|      3|
|      1|2023-01-05|      4|
|      1|2023-01-06|      5|
|      2|2023-01-01|      1|
|      2|2023-01-03|      2|
|      2|2023-01-04|      3|
+-------+----------+-------+



error because login_date and row_num are different datatype.

In [None]:
from pyspark.sql.functions import date_diff
logins = logins.withColumn("Difference", date_diff(("login_date"),col("row_num").cast("int")))
logins.show()

AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "date_diff(login_date, CAST(row_num AS INT))" due to data type mismatch: Parameter 2 requires the "DATE" type, however "CAST(row_num AS INT)" has the type "INT".;
'Project [user_id#227L, login_date#240, row_num#255, date_diff(login_date#240, cast(row_num#255 as int)) AS Difference#275]
+- Project [user_id#227L, login_date#240, row_num#255]
   +- Project [user_id#227L, login_date#240, row_num#255, row_num#255]
      +- Window [row_number() windowspecdefinition(user_id#227L, login_date#240 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_num#255], [user_id#227L], [login_date#240 ASC NULLS FIRST]
         +- Project [user_id#227L, login_date#240]
            +- Project [user_id#227L, cast(login_date#228 as date) AS login_date#240]
               +- LogicalRDD [user_id#227L, login_date#228], false


expr("  ") --- sql like operations directly on dataframe

In [None]:
from pyspark.sql.functions import date_diff, expr
logins = logins.withColumn("Difference", expr("date_sub(login_date,row_num)"))
logins.show()

+-------+----------+-------+----------+
|user_id|login_date|row_num|Difference|
+-------+----------+-------+----------+
|      1|2023-01-01|      1|2022-12-31|
|      1|2023-01-02|      2|2022-12-31|
|      1|2023-01-04|      3|2023-01-01|
|      1|2023-01-05|      4|2023-01-01|
|      1|2023-01-06|      5|2023-01-01|
|      2|2023-01-01|      1|2022-12-31|
|      2|2023-01-03|      2|2023-01-01|
|      2|2023-01-04|      3|2023-01-01|
+-------+----------+-------+----------+



In [None]:
from pyspark.sql.functions import count
logins = logins.groupBy(col("user_id"),col("Difference")).agg(count("Difference").alias("Streak"))
logins.show()

+-------+----------+------+
|user_id|Difference|Streak|
+-------+----------+------+
|      1|2022-12-31|     2|
|      1|2023-01-01|     3|
|      2|2022-12-31|     1|
|      2|2023-01-01|     2|
+-------+----------+------+



In [None]:
from pyspark.sql.functions import max
logins = logins.groupBy("user_id").agg((max("Streak")).alias("Max_Streak"))
logins.show()

+-------+----------+
|user_id|Max_Streak|
+-------+----------+
|      1|         3|
|      2|         2|
+-------+----------+



 Question 3: Identify Top 2 Products by Revenue per Category

In [None]:
# ✅ Start a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("TopProductsByRevenue").getOrCreate()
# ✅ Sample input data
data = [
    (1, "A", "Phones", 500, 2),
    (2, "B", "Phones", 300, 3),
    (3, "C", "Laptops", 1000, 1),
    (4, "D", "Phones", 700, 1),
    (5, "E", "Laptops", 800, 2),
    (6, "F", "Laptops", 1000, 1),
]

columns = ["order_id", "product_id", "category", "price", "quantity"]

orders = spark.createDataFrame(data, columns)
orders.show()


+--------+----------+--------+-----+--------+
|order_id|product_id|category|price|quantity|
+--------+----------+--------+-----+--------+
|       1|         A|  Phones|  500|       2|
|       2|         B|  Phones|  300|       3|
|       3|         C| Laptops| 1000|       1|
|       4|         D|  Phones|  700|       1|
|       5|         E| Laptops|  800|       2|
|       6|         F| Laptops| 1000|       1|
+--------+----------+--------+-----+--------+



In [None]:
from pyspark.sql.functions import col
orders = orders.withColumn("total_revenue",col("price")*col("quantity"))
orders.show()

+--------+----------+--------+-----+--------+-------------+
|order_id|product_id|category|price|quantity|total_revenue|
+--------+----------+--------+-----+--------+-------------+
|       1|         A|  Phones|  500|       2|         1000|
|       2|         B|  Phones|  300|       3|          900|
|       3|         C| Laptops| 1000|       1|         1000|
|       4|         D|  Phones|  700|       1|          700|
|       5|         E| Laptops|  800|       2|         1600|
|       6|         F| Laptops| 1000|       1|         1000|
+--------+----------+--------+-----+--------+-------------+



In [None]:
# divide categorically and give top 2 for each means top 2 ranks for each category hence window functions.

from pyspark.sql.functions import desc, row_number
from pyspark.sql.window import Window
windowCat = Window.partitionBy(col("category")).orderBy(desc(col("total_revenue")))
orders = orders.withColumn("row_num", row_number().over(windowCat))
orders.show()

+--------+----------+--------+-----+--------+-------------+-------+
|order_id|product_id|category|price|quantity|total_revenue|row_num|
+--------+----------+--------+-----+--------+-------------+-------+
|       5|         E| Laptops|  800|       2|         1600|      1|
|       3|         C| Laptops| 1000|       1|         1000|      2|
|       6|         F| Laptops| 1000|       1|         1000|      3|
|       1|         A|  Phones|  500|       2|         1000|      1|
|       2|         B|  Phones|  300|       3|          900|      2|
|       4|         D|  Phones|  700|       1|          700|      3|
+--------+----------+--------+-----+--------+-------------+-------+



In [None]:
orders.select(col("category"), col("product_id"), col("total_revenue")) \
.where(col("row_num")<=2) \
.orderBy("category", desc("total_revenue")).show()

"""
orders.filter(col("row_num") <= 2) \
      .select("category", "product_id", "total_revenue") \
      .orderBy("category", desc("total_revenue")) \
      .show()
      """

+--------+----------+-------------+
|category|product_id|total_revenue|
+--------+----------+-------------+
| Laptops|         E|         1600|
| Laptops|         C|         1000|
|  Phones|         A|         1000|
|  Phones|         B|          900|
+--------+----------+-------------+



Question 4: Compute Running Revenue per Customer

In [None]:
# Import necessary libraries
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import col

# Sample transaction data
data = [
    (1, 101, "2024-01-01", 200),
    (2, 101, "2024-01-03", 150),
    (3, 102, "2024-01-02", 300),
    (4, 101, "2024-01-05", 100),
    (5, 102, "2024-01-04", 200),
]

# Define schema
schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("amount", IntegerType(), True),
])

# Create DataFrame
transaction = spark.createDataFrame(data, schema)

# Show input
transaction.show()


+--------------+-----------+----------------+------+
|transaction_id|customer_id|transaction_date|amount|
+--------------+-----------+----------------+------+
|             1|        101|      2024-01-01|   200|
|             2|        101|      2024-01-03|   150|
|             3|        102|      2024-01-02|   300|
|             4|        101|      2024-01-05|   100|
|             5|        102|      2024-01-04|   200|
+--------------+-----------+----------------+------+



In [None]:
display(transaction)

DataFrame[transaction_id: int, customer_id: int, transaction_date: string, amount: int]

In [None]:
from pyspark.sql.functions import to_date
transaction = transaction.withColumn("transaction_date", to_date("transaction_date",'yyyy-MM-dd'))
transaction.show()

+--------------+-----------+----------------+------+
|transaction_id|customer_id|transaction_date|amount|
+--------------+-----------+----------------+------+
|             1|        101|      2024-01-01|   200|
|             2|        101|      2024-01-03|   150|
|             3|        102|      2024-01-02|   300|
|             4|        101|      2024-01-05|   100|
|             5|        102|      2024-01-04|   200|
+--------------+-----------+----------------+------+



 ❌ Incorrect logic:
python
Copy
Edit
transaction = transaction.withColumn("row_num", row_number().over(window_cus))
row_number() just gives you the position of the row, not the cumulative sum. However, to compute the running total of amount (as per the question), we need to use sum() as a window function, not row_number().


In [None]:
window_cus = Window.partitionBy("customer_id").orderBy("transaction_date")
transaction = transaction.withColumn("row_num",row_number().over(window_cus))
transaction.show()

+--------------+-----------+----------------+------+-------+
|transaction_id|customer_id|transaction_date|amount|row_num|
+--------------+-----------+----------------+------+-------+
|             1|        101|      2024-01-01|   200|      1|
|             2|        101|      2024-01-03|   150|      2|
|             4|        101|      2024-01-05|   100|      3|
|             3|        102|      2024-01-02|   300|      1|
|             5|        102|      2024-01-04|   200|      2|
+--------------+-----------+----------------+------+-------+



In [None]:
# sum in window giving cumulative
from pyspark.sql.functions import sum
window_cus = Window.partitionBy("customer_id").orderBy("transaction_date")
transaction = transaction.withColumn("running_tot" ,sum("amount").over(window_cus))
transaction.show()

+--------------+-----------+----------------+------+-------+-----------+
|transaction_id|customer_id|transaction_date|amount|row_num|running_tot|
+--------------+-----------+----------------+------+-------+-----------+
|             1|        101|      2024-01-01|   200|      1|        200|
|             2|        101|      2024-01-03|   150|      2|        350|
|             4|        101|      2024-01-05|   100|      3|        450|
|             3|        102|      2024-01-02|   300|      1|        300|
|             5|        102|      2024-01-04|   200|      2|        500|
+--------------+-----------+----------------+------+-------+-----------+



In [None]:
display(transaction)

DataFrame[transaction_id: int, customer_id: int, transaction_date: date, amount: int, row_num: int]

In [None]:
window_cus = Window.partitionBy("customer_id").orderBy("transaction_date") \
                   .rowsBetween(Window.unboundedPreceding, Window.currentRow)
'''What does .rowsBetween(Window.unboundedPreceding, Window.currentRow) mean?
Window.unboundedPreceding: start from the first row in the partition

Window.currentRow: include up to the current row
➡️ So this includes all rows up to and including the current row, which is exactly what you want for a cumulative or running sum.'''

Question 5 — User Purchase Frequency Change
For each user, find the number of days between consecutive purchases and the difference in amount spent.

In [None]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("purchase_date", StringType(), True),
    StructField("amount", IntegerType(), True)
])

# Step 4: Create sample data
data = [
    (1, "2024-01-01", 100),
    (1, "2024-01-15", 200),
    (2, "2024-01-03", 150),
    (1, "2024-02-01", 300),
    (2, "2024-01-20", 100),
    (3, "2024-01-10", 400),
    (3, "2024-02-11", 200)
]

# Step 5: Create DataFrame
df = spark.createDataFrame(data, schema)

# Step 6: Convert string to date type
from pyspark.sql.functions import col
df = df.withColumn("purchase_date", to_date(col("purchase_date"), "yyyy-MM-dd"))

# Step 7: Show the input
df.show()

+-------+-------------+------+
|user_id|purchase_date|amount|
+-------+-------------+------+
|      1|   2024-01-01|   100|
|      1|   2024-01-15|   200|
|      2|   2024-01-03|   150|
|      1|   2024-02-01|   300|
|      2|   2024-01-20|   100|
|      3|   2024-01-10|   400|
|      3|   2024-02-11|   200|
+-------+-------------+------+



In [None]:
#For each user, find the number of days between consecutive purchases and the difference in amount spent.
# So we need to find diff b/w previous and current  days, and purchase
# to get previous use lag

from pyspark.sql.functions import lag
window_lag = Window.partitionBy("user_id").orderBy("purchase_date")

df = df.withColumn("previous_date", lag("purchase_date").over(window_lag)) \
       .withColumn("previous_amount", lag("amount").over(window_lag))
df.show()


+-------+-------------+------+-------------+---------------+
|user_id|purchase_date|amount|previous_date|previous_amount|
+-------+-------------+------+-------------+---------------+
|      1|   2024-01-01|   100|         NULL|           NULL|
|      1|   2024-01-15|   200|   2024-01-01|            100|
|      1|   2024-02-01|   300|   2024-01-15|            200|
|      2|   2024-01-03|   150|         NULL|           NULL|
|      2|   2024-01-20|   100|   2024-01-03|            150|
|      3|   2024-01-10|   400|         NULL|           NULL|
|      3|   2024-02-11|   200|   2024-01-10|            400|
+-------+-------------+------+-------------+---------------+



In [None]:
display(df)

DataFrame[user_id: int, purchase_date: date, amount: int, previous_date: date, previous_amount: int]

In [None]:
#Calculating diff
from pyspark.sql.functions import date_diff
df = df.withColumn("Num_days",date_diff("purchase_date","previous_date")).withColumn("Amount_diff",col("amount")-col("previous_amount"))
df.show()

+-------+-------------+------+-------------+---------------+--------+-----------+
|user_id|purchase_date|amount|previous_date|previous_amount|Num_days|Amount_diff|
+-------+-------------+------+-------------+---------------+--------+-----------+
|      1|   2024-01-01|   100|         NULL|           NULL|    NULL|       NULL|
|      1|   2024-01-15|   200|   2024-01-01|            100|      14|        100|
|      1|   2024-02-01|   300|   2024-01-15|            200|      17|        100|
|      2|   2024-01-03|   150|         NULL|           NULL|    NULL|       NULL|
|      2|   2024-01-20|   100|   2024-01-03|            150|      17|        -50|
|      3|   2024-01-10|   400|         NULL|           NULL|    NULL|       NULL|
|      3|   2024-02-11|   200|   2024-01-10|            400|      32|       -200|
+-------+-------------+------+-------------+---------------+--------+-----------+



In [None]:
df.select("user_id","purchase_date","amount","Num_days","Amount_diff").show()

+-------+-------------+------+--------+-----------+
|user_id|purchase_date|amount|Num_days|Amount_diff|
+-------+-------------+------+--------+-----------+
|      1|   2024-01-01|   100|    NULL|       NULL|
|      1|   2024-01-15|   200|      14|        100|
|      1|   2024-02-01|   300|      17|        100|
|      2|   2024-01-03|   150|    NULL|       NULL|
|      2|   2024-01-20|   100|      17|        -50|
|      3|   2024-01-10|   400|    NULL|       NULL|
|      3|   2024-02-11|   200|      32|       -200|
+-------+-------------+------+--------+-----------+



 Question 6: Repeat Purchasers Within 30 Days
 You’re given a transactions dataset with customer purchases. Find customers who made more than one purchase within a 30-day window.

In [None]:

df.drop()
schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("amount", IntegerType(), True)
])

# Step 5: Create sample data
data = [
    (1, "2024-01-01", 100),
    (1, "2024-01-20", 200),
    (1, "2024-03-01", 300),
    (2, "2024-01-01", 150),
    (2, "2024-02-10", 100)
]

# Step 6: Create DataFrame
df = spark.createDataFrame(data, schema)

# Step 7: Convert transaction_date to DateType
df = df.withColumn("transaction_date", to_date("transaction_date", "yyyy-MM-dd"))

# Step 8: Show input data
df.show()

+-----------+----------------+------+
|customer_id|transaction_date|amount|
+-----------+----------------+------+
|          1|      2024-01-01|   100|
|          1|      2024-01-20|   200|
|          1|      2024-03-01|   300|
|          2|      2024-01-01|   150|
|          2|      2024-02-10|   100|
+-----------+----------------+------+



In [None]:
# previous purchase using lag
from pyspark.sql.functions import lead
window_30lead = Window.partitionBy("customer_id").orderBy("transaction_date")

df = df.withColumn("second_transaction_date", lead("transaction_date").over(window_30lead))\
      .withColumnRenamed("transaction_date","first_transaction_date") \
      .drop("amount")
df.show()

+-----------+----------------------+-----------------------+
|customer_id|first_transaction_date|second_transaction_date|
+-----------+----------------------+-----------------------+
|          1|            2024-01-01|             2024-01-20|
|          1|            2024-01-20|             2024-03-01|
|          1|            2024-03-01|                   NULL|
|          2|            2024-01-01|             2024-02-10|
|          2|            2024-02-10|                   NULL|
+-----------+----------------------+-----------------------+



In [None]:
#calculating days diff

df = df.withColumn("Days_diff",date_diff("second_transaction_date","first_transaction_date"))
df.show()

+-----------+----------------------+-----------------------+---------+
|customer_id|first_transaction_date|second_transaction_date|Days_diff|
+-----------+----------------------+-----------------------+---------+
|          1|            2024-01-01|             2024-01-20|       19|
|          1|            2024-01-20|             2024-03-01|       41|
|          1|            2024-03-01|                   NULL|     NULL|
|          2|            2024-01-01|             2024-02-10|       40|
|          2|            2024-02-10|                   NULL|     NULL|
+-----------+----------------------+-----------------------+---------+



In [None]:
df.filter(col("Days_diff") <= 30).show()

+-----------+----------------------+-----------------------+---------+
|customer_id|first_transaction_date|second_transaction_date|Days_diff|
+-----------+----------------------+-----------------------+---------+
|          1|            2024-01-01|             2024-01-20|       19|
+-----------+----------------------+-----------------------+---------+



 Question 7: Sessionize User Activity with Idle Timeout

 Group each user’s activities into sessions, where a session is defined as a series of events separated by no more than 15 minutes of inactivity.

Assign a session_id to each event, such that events within the same session have the same session_id.



In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import to_timestamp

# Step 3: Start Spark session
spark = SparkSession.builder.appName("SessionizeUserActivity").getOrCreate()

# Step 4: Define schema
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("activity_timestamp", StringType(), True),
])

# Step 5: Sample data
data = [
    (1, "2024-06-01 10:00:00"),
    (1, "2024-06-01 10:05:00"),
    (1, "2024-06-01 10:40:00"),
    (2, "2024-06-01 09:00:00"),
    (2, "2024-06-01 09:20:00"),
    (2, "2024-06-01 09:55:00"),
]

# Step 6: Create DataFrame
df = spark.createDataFrame(data, schema)
df.show()

+-------+-------------------+
|user_id| activity_timestamp|
+-------+-------------------+
|      1|2024-06-01 10:00:00|
|      1|2024-06-01 10:05:00|
|      1|2024-06-01 10:40:00|
|      2|2024-06-01 09:00:00|
|      2|2024-06-01 09:20:00|
|      2|2024-06-01 09:55:00|
+-------+-------------------+



In [15]:
display(df)

DataFrame[user_id: int, activity_timestamp: string]

In [64]:
df = df.withColumn("activity_timestamp",to_timestamp("activity_timestamp","yyyy-MM-dd HH:mm:ss"))
display(df)
df.show()

DataFrame[user_id: int, activity_timestamp: timestamp]

+-------+-------------------+
|user_id| activity_timestamp|
+-------+-------------------+
|      1|2024-06-01 10:00:00|
|      1|2024-06-01 10:05:00|
|      1|2024-06-01 10:40:00|
|      2|2024-06-01 09:00:00|
|      2|2024-06-01 09:20:00|
|      2|2024-06-01 09:55:00|
+-------+-------------------+



In [65]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
window_15lag = Window.partitionBy("user_id").orderBy("activity_timestamp")
df = df.withColumn("previous_activity",lag("activity_timestamp").over(window_15lag))
df.show()


+-------+-------------------+-------------------+
|user_id| activity_timestamp|  previous_activity|
+-------+-------------------+-------------------+
|      1|2024-06-01 10:00:00|               NULL|
|      1|2024-06-01 10:05:00|2024-06-01 10:00:00|
|      1|2024-06-01 10:40:00|2024-06-01 10:05:00|
|      2|2024-06-01 09:00:00|               NULL|
|      2|2024-06-01 09:20:00|2024-06-01 09:00:00|
|      2|2024-06-01 09:55:00|2024-06-01 09:20:00|
+-------+-------------------+-------------------+



In [66]:
from pyspark.sql.functions import unix_timestamp, col
df = df.withColumn("Time_Diff",((unix_timestamp(col("activity_timestamp"))-unix_timestamp(col("previous_activity"))))/60)
df.show(truncate = False)

+-------+-------------------+-------------------+---------+
|user_id|activity_timestamp |previous_activity  |Time_Diff|
+-------+-------------------+-------------------+---------+
|1      |2024-06-01 10:00:00|NULL               |NULL     |
|1      |2024-06-01 10:05:00|2024-06-01 10:00:00|5.0      |
|1      |2024-06-01 10:40:00|2024-06-01 10:05:00|35.0     |
|2      |2024-06-01 09:00:00|NULL               |NULL     |
|2      |2024-06-01 09:20:00|2024-06-01 09:00:00|20.0     |
|2      |2024-06-01 09:55:00|2024-06-01 09:20:00|35.0     |
+-------+-------------------+-------------------+---------+



In [67]:
from pyspark.sql.functions import when
df = df.withColumn("Session_strt", when((col("Time_Diff") > 15) | (col("Time_Diff").isNull()), 1).otherwise(0))
df.show()

+-------+-------------------+-------------------+---------+------------+
|user_id| activity_timestamp|  previous_activity|Time_Diff|Session_strt|
+-------+-------------------+-------------------+---------+------------+
|      1|2024-06-01 10:00:00|               NULL|     NULL|           1|
|      1|2024-06-01 10:05:00|2024-06-01 10:00:00|      5.0|           0|
|      1|2024-06-01 10:40:00|2024-06-01 10:05:00|     35.0|           1|
|      2|2024-06-01 09:00:00|               NULL|     NULL|           1|
|      2|2024-06-01 09:20:00|2024-06-01 09:00:00|     20.0|           1|
|      2|2024-06-01 09:55:00|2024-06-01 09:20:00|     35.0|           1|
+-------+-------------------+-------------------+---------+------------+



In [68]:
from pyspark.sql.functions import sum
df = df.withColumn("Session_id", sum("Session_strt").over(window_15lag) )
df.show()

+-------+-------------------+-------------------+---------+------------+----------+
|user_id| activity_timestamp|  previous_activity|Time_Diff|Session_strt|Session_id|
+-------+-------------------+-------------------+---------+------------+----------+
|      1|2024-06-01 10:00:00|               NULL|     NULL|           1|         1|
|      1|2024-06-01 10:05:00|2024-06-01 10:00:00|      5.0|           0|         1|
|      1|2024-06-01 10:40:00|2024-06-01 10:05:00|     35.0|           1|         2|
|      2|2024-06-01 09:00:00|               NULL|     NULL|           1|         1|
|      2|2024-06-01 09:20:00|2024-06-01 09:00:00|     20.0|           1|         2|
|      2|2024-06-01 09:55:00|2024-06-01 09:20:00|     35.0|           1|         3|
+-------+-------------------+-------------------+---------+------------+----------+



In [69]:
df.select("user_id","activity_timestamp","Session_id").show()

+-------+-------------------+----------+
|user_id| activity_timestamp|Session_id|
+-------+-------------------+----------+
|      1|2024-06-01 10:00:00|         1|
|      1|2024-06-01 10:05:00|         1|
|      1|2024-06-01 10:40:00|         2|
|      2|2024-06-01 09:00:00|         1|
|      2|2024-06-01 09:20:00|         2|
|      2|2024-06-01 09:55:00|         3|
+-------+-------------------+----------+



Question 8 : Calculate Rolling 3-Day Revenue per User
You are given a transactions dataset. For each user and each transaction date, compute the rolling total revenue for that user over the past 3 days, including the current day.

In [70]:

data = [
    (1, "2024-06-01", 100),
    (1, "2024-06-02", 200),
    (1, "2024-06-03", 300),
    (1, "2024-06-05", 400),
    (2, "2024-06-01", 150),
    (2, "2024-06-04", 250),
    (2, "2024-06-05", 350),
]
df = spark.createDataFrame(data, ["user_id", "transaction_date", "revenue"])
df.show()

+-------+----------------+-------+
|user_id|transaction_date|revenue|
+-------+----------------+-------+
|      1|      2024-06-01|    100|
|      1|      2024-06-02|    200|
|      1|      2024-06-03|    300|
|      1|      2024-06-05|    400|
|      2|      2024-06-01|    150|
|      2|      2024-06-04|    250|
|      2|      2024-06-05|    350|
+-------+----------------+-------+



In [71]:
display(df)

DataFrame[user_id: bigint, transaction_date: string, revenue: bigint]

In [73]:
from pyspark.sql.types import DateType
df = df.withColumn("transaction_date",col("transaction_date").cast(DateType()))
df.show()

+-------+----------------+-------+
|user_id|transaction_date|revenue|
+-------+----------------+-------+
|      1|      2024-06-01|    100|
|      1|      2024-06-02|    200|
|      1|      2024-06-03|    300|
|      1|      2024-06-05|    400|
|      2|      2024-06-01|    150|
|      2|      2024-06-04|    250|
|      2|      2024-06-05|    350|
+-------+----------------+-------+



In [74]:
display(df)

DataFrame[user_id: bigint, transaction_date: date, revenue: bigint]

In [81]:
from pyspark.sql.functions import date_sub
window_3lag = Window.partitionBy("user_id").orderBy("transaction_date").rangeBetween(-3*86400 + 1,0)
df.withColumn(" ",sum("revenue").over(window_3lag)).show()

AnalysisException: [DATATYPE_MISMATCH.RANGE_FRAME_INVALID_TYPE] Cannot resolve "(PARTITION BY user_id ORDER BY transaction_date ASC NULLS FIRST RANGE BETWEEN -259199 FOLLOWING AND CURRENT ROW)" due to data type mismatch: The data type "DATE" used in the order specification does not match the data type "BIGINT" which is used in the range frame.;
'Project [user_id#908L, transaction_date#927, revenue#910L, sum(revenue#910L) windowspecdefinition(user_id#908L, transaction_date#927 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -259199, currentrow$())) AS  #972]
+- Project [user_id#908L, cast(transaction_date#909 as date) AS transaction_date#927, revenue#910L]
   +- LogicalRDD [user_id#908L, transaction_date#909, revenue#910L], false


.orderBy("transaction_date").rangeBetween(-3*86400 + 1,0)
here the range Between is applied on orderBy column and also it takes only numeric values like int.
here rangeBetween is applied on transaction_date which is in date format.
so convert it to numeric.
unix_timestamp converts into int

In [82]:
df = df.withColumn("transaction_date",unix_timestamp(col("transaction_date")))
df.show()
display(df)

+-------+----------------+-------+
|user_id|transaction_date|revenue|
+-------+----------------+-------+
|      1|      1717200000|    100|
|      1|      1717286400|    200|
|      1|      1717372800|    300|
|      1|      1717545600|    400|
|      2|      1717200000|    150|
|      2|      1717459200|    250|
|      2|      1717545600|    350|
+-------+----------------+-------+



DataFrame[user_id: bigint, transaction_date: bigint, revenue: bigint]

Summary
Use +1 if you want to exclude the earliest boundary of your 3-day range.

Don’t use +1 if you want a fully inclusive 3-day window.

In [85]:
window_3lag = Window.partitionBy("user_id").orderBy("transaction_date").rangeBetween(-3*86400 +1 ,0)
df.withColumn(" ",sum("revenue").over(window_3lag)).show()

+-------+----------------+-------+---+
|user_id|transaction_date|revenue|   |
+-------+----------------+-------+---+
|      1|      1717200000|    100|100|
|      1|      1717286400|    200|300|
|      1|      1717372800|    300|600|
|      1|      1717545600|    400|700|
|      2|      1717200000|    150|150|
|      2|      1717459200|    250|250|
|      2|      1717545600|    350|600|
+-------+----------------+-------+---+

