In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("UserSessions").getOrCreate()

data = [
    ("2025-01-01", 100),
    ("2025-01-02", 200),
    ("2025-01-03", 300),
    ("2025-01-04", 400),
    ("2025-01-05", 500)
]

columns = ["sale_date", "sales"]

df = spark.createDataFrame(data, columns)
df.show()

# previous 2 rows + current row (3-day window)
w = Window.orderBy("sale_date").rowsBetween(-2, 0)
df1 = df.withColumn('rolling_3_day_avg', round(avg('sales').over(w), 0).cast('int'))  # .cast('int') to avoid 100.0....
df1.show()

+----------+-----+
| sale_date|sales|
+----------+-----+
|2025-01-01|  100|
|2025-01-02|  200|
|2025-01-03|  300|
|2025-01-04|  400|
|2025-01-05|  500|
+----------+-----+

+----------+-----+-----------------+
| sale_date|sales|rolling_3_day_avg|
+----------+-----+-----------------+
|2025-01-01|  100|              100|
|2025-01-02|  200|              150|
|2025-01-03|  300|              200|
|2025-01-04|  400|              300|
|2025-01-05|  500|              400|
+----------+-----+-----------------+



In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CustomerPurchases").getOrCreate()

data = [
    ("C1", "2025-01-01", 100),
    ("C1", "2025-01-05", 200),
    ("C1", "2025-01-10", 300),
    ("C2", "2025-01-02", 150),
    ("C2", "2025-01-06", 120),
    ("C3", "2025-01-03", 200),
    ("C3", "2025-01-09", 250),
]

columns = ["customer_id", "purchase_date", "amount"]

df_c = spark.createDataFrame(data, columns)
df_c = df_c.withColumn("purchase_date", to_date("purchase_date"))
# df_c.show()
win1 = Window.partitionBy('customer_id').orderBy('purchase_date')

df1 = df_c.withColumn("lag1", lag('amount').over(win1))
df1.show()
df2 = df1.filter(col('lag1').isNotNull() & (col('lag1')<=col('amount')))
df2.show()
df3 = df2.select('customer_id').distinct()
df3.show()

+-----------+-------------+------+----+
|customer_id|purchase_date|amount|lag1|
+-----------+-------------+------+----+
|         C1|   2025-01-01|   100|NULL|
|         C1|   2025-01-05|   200| 100|
|         C1|   2025-01-10|   300| 200|
|         C2|   2025-01-02|   150|NULL|
|         C2|   2025-01-06|   120| 150|
|         C3|   2025-01-03|   200|NULL|
|         C3|   2025-01-09|   250| 200|
+-----------+-------------+------+----+

+-----------+-------------+------+----+
|customer_id|purchase_date|amount|lag1|
+-----------+-------------+------+----+
|         C1|   2025-01-05|   200| 100|
|         C1|   2025-01-10|   300| 200|
|         C3|   2025-01-09|   250| 200|
+-----------+-------------+------+----+

+-----------+
|customer_id|
+-----------+
|         C1|
|         C3|
+-----------+



In [11]:
df_c.createOrReplaceTempView('customer_table')
result = spark.sql("""
WITH with_lag AS (
    SELECT
        customer_id,
        purchase_date,
        amount,
        LAG(amount) OVER (PARTITION BY customer_id ORDER BY purchase_date) AS lag1
    FROM customer_table
),
violations AS (
    SELECT customer_id
    FROM with_lag
    WHERE lag1 IS NOT NULL AND amount <= lag1
)
SELECT DISTINCT customer_id
FROM customer_table
WHERE customer_id NOT IN (SELECT customer_id FROM violations)
""")
result.show()

+-----------+
|customer_id|
+-----------+
|         C1|
|         C3|
+-----------+

