In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
from delta.tables import DeltaTable

In [0]:
spark = SparkSession.builder.appName("assignments").getOrCreate()
spark

In [0]:
dfSub = spark.read.csv("file:/Workspace/Shared/jun-16/subscriptions.csv", header=True, inferSchema=True)
dfUser = spark.read.csv("file:/Workspace/Shared/jun-16/user_activity.csv", header=True, inferSchema=True)

In [0]:
dfSub.write.format("delta").mode("overwrite").saveAsTable("subscriptions_delta")
subDelta = DeltaTable.forName(spark, "subscriptions_delta")

In [0]:
dfSub.printSchema()
dfUser.printSchema()

root
 |-- SubscriptionID: string (nullable = true)
 |-- UserID: string (nullable = true)
 |-- PlanType: string (nullable = true)
 |-- StartDate: date (nullable = true)
 |-- EndDate: date (nullable = true)
 |-- PriceUSD: double (nullable = true)
 |-- IsActive: boolean (nullable = true)
 |-- AutoRenew: boolean (nullable = true)

root
 |-- UserID: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)
 |-- EventType: string (nullable = true)
 |-- FeatureUsed: string (nullable = true)



#**Subscription Engagement Score**

In [0]:
# 1. Combine both datasets.
dfJoined = dfSub.join(dfUser, on="UserID", how="inner")

In [0]:
# Calculate: active_days = EndDate - StartDate
dfJoined = dfJoined.withColumn("ActiveDays", F.date_diff(F.col("EndDate"), F.col("StartDate")))
dfJoined.show()

+------+--------------+--------+----------+----------+--------+--------+---------+-------------------+---------+-----------+----------+
|UserID|SubscriptionID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|          EventTime|EventType|FeatureUsed|ActiveDays|
+------+--------------+--------+----------+----------+--------+--------+---------+-------------------+---------+-----------+----------+
|  U001|        SUB001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|2024-04-10 16:00:00|   logout|  Dashboard|        91|
|  U002|        SUB002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|2024-04-08 11:10:00|   upload|    Reports|        90|
|  U003|        SUB003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|2024-04-09 09:45:00| download|  Analytics|        92|
|  U001|        SUB004| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|2024-04-10 16:00:00|   logout|  Dashboard|        91|
|  U004|        SUB005|   Basic|2024-01-20|2024-

In [0]:
# Calculate: events_per_user = count(EventType) grouped by UserID
dfMetrics = dfJoined.groupBy("UserID").agg(
    F.count("EventType").alias("EventsPerUser"),
    F.sum("ActiveDays").alias("ActiveDays"),
    F.sum("PriceUSD").alias("Price")
)
dfMetrics.show()

+------+-------------+----------+-----+
|UserID|EventsPerUser|ActiveDays|Price|
+------+-------------+----------+-----+
|  U004|            1|        91| 30.0|
|  U002|            1|        90| 90.0|
|  U003|            1|        92| 90.0|
|  U001|            4|       364|300.0|
+------+-------------+----------+-----+



In [0]:
# Create a score: engagement_score = (events_per_user / active_days) * PriceUSD
dfMetrics = dfMetrics.withColumn(
    "EngagementScore",
    F.round((F.col("EventsPerUser") / F.col("ActiveDays")) * F.col("Price"), 2)
)
dfMetrics.show()

+------+-------------+----------+-----+---------------+
|UserID|EventsPerUser|ActiveDays|Price|EngagementScore|
+------+-------------+----------+-----+---------------+
|  U004|            1|        91| 30.0|           0.33|
|  U002|            1|        90| 90.0|            1.0|
|  U003|            1|        92| 90.0|           0.98|
|  U001|            4|       364|300.0|            3.3|
+------+-------------+----------+-----+---------------+



#**Anomaly Detection via SQL**

In [0]:
dfUser.createOrReplaceTempView("user_activity")
dfSub.createOrReplaceTempView("subscription")

In [0]:
# Identify users with:
# Subscription inactive but recent activity
spark.sql("""
          SELECT u.UserID, s.SubExpiry, u.LastLogin FROM 
          (SELECT UserID, MAX(EndDate) AS SubExpiry FROM subscription 
          GROUP BY UserID) s
          INNER JOIN (SELECT UserID, MAX(EventTime) AS LastLogin FROM user_Activity 
                      GROUP BY UserID) u
          ON s.UserID = u.UserID
          WHERE u.LastLogin > s.SubExpiry
          """).show()

# None is displayed because no activity found

+------+---------+---------+
|UserID|SubExpiry|LastLogin|
+------+---------+---------+
+------+---------+---------+



In [0]:
# AutoRenew is true but no events in 30 days
spark.sql("""
          SELECT DISTINCT(u.UserID), u.LastActivity, s.AutoRenew FROM 
          (SELECT UserID, MAX(EventTime) AS LastActivity FROM user_Activity
           GROUP BY UserID) u
          INNER JOIN 
          (SELECT UserID, AutoRenew FROM subscription) s
          ON s.UserID = u.UserID
          WHERE ABS(DATE_DIFF(CURRENT_DATE(), u.LastActivity)) > 30 AND s.AutoRenew = true
          """).show()

+------+-------------------+---------+
|UserID|       LastActivity|AutoRenew|
+------+-------------------+---------+
|  U001|2024-04-10 16:00:00|     true|
+------+-------------------+---------+



#**Delta Lake + Merge simulation**

In [0]:
# Imagine a billing fix needs to be applied:
# For all Pro plans in March, increase price by $5 retroactively.
addedCost = dfSub.filter((F.col("PlanType") == "Pro") & (F.month(F.col("StartDate")) == 3)).withColumn("PriceUSD", F.col("PriceUSD") + 5)
addedCost.show()

+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    95.0|   false|    false|
+--------------+------+--------+----------+----------+--------+--------+---------+



In [0]:
# Use MERGE INTO on Delta table to apply the change.
subDelta.alias("target").merge(
    addedCost.alias("updates"),
    "target.SubscriptionID = updates.SubscriptionID"
).whenMatchedUpdate(
    set={"PriceUSD": "updates.PriceUSD"}
).execute()


#**Time Travel Debugging**

In [0]:
# Show describe history of the table before and after the billing fix.
subDelta.history().show()

+-------+-------------------+----------------+--------------------+--------------------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|          userId|            userName|           operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+----------------+--------------------+--------------------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      6|2025-06-16 10:55:47|2089312130523823|azuser3563_mml.lo...|               MERGE|{predicate -> ["(...|NULL|{2557978635507281}|0611-042249-grg1r6w4|          5|WriteSerializable|        false|{numTargetRowsCop...|        NULL|Databr

In [0]:
# Query using VERSION AS OF to prove the issue existed.
subDelta.history().filter(F.col("version") == 1).show()

+-------+-------------------+----------------+--------------------+--------------------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|          userId|            userName|           operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+----------------+--------------------+--------------------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      1|2025-06-16 10:49:29|2089312130523823|azuser3563_mml.lo...|CREATE OR REPLACE...|{partitionBy -> [...|NULL|{2557978635507281}|0611-042249-grg1r6w4|          0|WriteSerializable|        false|{numFiles -> 1, n...|        NULL|Databr

In [0]:
# Identify users who upgraded:
# From Basic → Pro → Premium
lagf = W.partitionBy("UserID").orderBy("StartDate")

df_lagged = dfSub.withColumn("prev_plan", F.lag("PlanType", 1).over(lagf)) \
                 .withColumn("prev2_plan", F.lag("PlanType", 2).over(lagf))

df_upgrades = df_lagged.filter(
    (F.col("PlanType") == "Premium") &
    (F.col("prev_plan") == "Pro") &
    (F.col("prev2_plan") == "Basic")
)

df_upgrades.show()

+--------------+------+--------+---------+-------+--------+--------+---------+---------+----------+
|SubscriptionID|UserID|PlanType|StartDate|EndDate|PriceUSD|IsActive|AutoRenew|prev_plan|prev2_plan|
+--------------+------+--------+---------+-------+--------+--------+---------+---------+----------+
+--------------+------+--------+---------+-------+--------+--------+---------+---------+----------+



#**Power Users Detection**

In [0]:
# Define a power user as:
# Used ≥ 2 features
# Logged in ≥ 3 times
feature = dfUser.groupBy("UserID").agg(
    F.countDistinct("FeatureUsed").alias("Features")
)

login = dfUser.filter(dfUser.EventType == "login").groupBy("UserID").agg(
    F.count("*").alias("Login")
)

powerUsers = feature.join(login, on="UserID", how="inner") \
    .filter((F.col("Features") >= 2) & (F.col("Login") >= 3)) \

powerUsers.show()

+------+--------+-----+
|UserID|Features|Login|
+------+--------+-----+
+------+--------+-----+



In [0]:
# Create a separate Delta table power_users
powerUsers.write.format("delta").mode("overwrite").saveAsTable("power_users")

#**Session Replay View**

In [0]:
# Show how long each user spent between login and logout events.
tracer = W.partitionBy("UserID").orderBy("EventTime")
dfUser = dfUser.withColumn("NextEvent", F.lead("EventType").over(tracer)) \
    .withColumn("NextTime", F.lead("EventTime").over(tracer))

inOut = dfUser.filter((F.col("EventType") == "login") & (F.col("NextEvent") == "logout"))
inOut = inOut.withColumn(
    "SessionDurationMinutes",
    (F.unix_timestamp("NextTime") - F.unix_timestamp("EventTime")) / 60
)
inOut.show()

+------+-------------------+---------+-----------+---------+-------------------+----------------------+
|UserID|          EventTime|EventType|FeatureUsed|NextEvent|           NextTime|SessionDurationMinutes|
+------+-------------------+---------+-----------+---------+-------------------+----------------------+
|  U001|2024-04-07 10:22:00|    login|  Dashboard|   logout|2024-04-10 16:00:00|                4658.0|
+------+-------------------+---------+-----------+---------+-------------------+----------------------+



In [0]:
session_df = dfUser.filter((F.col("EventType") == "login") & (F.col("NextEvent") == "logout"))