In [0]:
from pyspark.sql import SparkSession
spark

In [0]:
#Load both datasets with schema inference
df_subs = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("file:/Workspace/Shared/subscriptions.csv")

df_activity = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("file:/Workspace/Shared/user_activity.csv")

df_subs.show()
df_activity.show()

+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|
|        SUB005|  U004|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|
|        SUB006|  U005|   Basic|2024-01-05|2024-02-05|    30.0|    true|     true|
|        SUB007|  U005|     Pro|2024-02-06|2024-03-06|    90.0|    true|     true|
|        SUB008|  U005| Premium|2024-03-07|2024-04-07|   120.0|    true|     true|
|        SUB009|  U006|   Basic|2024-01-10|2024-04-10|    30.0|    true|    false|
|   

In [0]:
#A – Subscription Engagement Score
from pyspark.sql.functions import col, count, datediff

df_subs_engaged = df_subs.withColumn("active_days", datediff("EndDate", "StartDate"))

df_events_per_user = df_activity.groupBy("UserID") \
    .agg(count("EventType").alias("events_per_user"))

df_engagement = df_subs_engaged.join(df_events_per_user, "UserID", "left") \
    .withColumn("engagement_score", (col("events_per_user") / col("active_days")) * col("PriceUSD"))

df_engagement.show()

+------+--------------+--------+----------+----------+--------+--------+---------+-----------+---------------+------------------+
|UserID|SubscriptionID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|active_days|events_per_user|  engagement_score|
+------+--------------+--------+----------+----------+--------+--------+---------+-----------+---------------+------------------+
|  U001|        SUB001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|         91|              2|0.6593406593406594|
|  U002|        SUB002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|         90|              1|               1.0|
|  U003|        SUB003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|         92|              1|0.9782608695652174|
|  U001|        SUB004| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|         91|              2|2.6373626373626378|
|  U004|        SUB005|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|        

In [0]:
# B - Anomaly Detection via SQL
#Users who have inactive subscriptions but recent activity
#Users who have AutoRenew = true but no activity in last 30 days
df_subs.createOrReplaceTempView("subscriptions")
df_activity.createOrReplaceTempView("user_activity")


spark.sql("""
CREATE OR REPLACE TEMP VIEW anomaly_users_combined AS
(
    SELECT DISTINCT s.UserID, 'InactiveButActive' AS AnomalyType
    FROM subscriptions s
    JOIN user_activity u ON s.UserID = u.UserID
    WHERE s.IsActive = false

    UNION

    SELECT s.UserID, 'AutoRenewNoActivity30Days' AS AnomalyType
    FROM subscriptions s
    LEFT JOIN user_activity u ON s.UserID = u.UserID
    WHERE s.AutoRenew = true
    GROUP BY s.UserID
    HAVING MAX(to_timestamp(u.EventTime)) < current_timestamp() - INTERVAL 30 DAYS
           OR MAX(to_timestamp(u.EventTime)) IS NULL
)
""")

# View anomalies
spark.sql("SELECT * FROM anomaly_users_combined").show()



+------+--------------------+
|UserID|         AnomalyType|
+------+--------------------+
|  U004|   InactiveButActive|
|  U003|   InactiveButActive|
|  U001|AutoRenewNoActivi...|
|  U005|AutoRenewNoActivi...|
+------+--------------------+



In [0]:
#C – Delta Lake Merge Simulation (Pro Plan Price Fix)
from pyspark.sql.functions import month, lit

# Save to Delta
df_subs.write.format("delta").mode("overwrite") \
    .save("/Workspace/Shared/delta_subscriptions")

# Load from Delta
df_delta = spark.read.format("delta").load("/Workspace/Shared/delta_subscriptions")
df_delta.createOrReplaceTempView("delta_subscriptions")

# Create fix DataFrame
df_fix = df_delta.filter((col("PlanType") == "Pro") & (month("StartDate") == 3)) \
    .withColumn("PriceUSD", col("PriceUSD") + lit(5))

df_fix.createOrReplaceTempView("fix_table")
df_fix.show()
# Apply merge
spark.sql("""
MERGE INTO delta_subscriptions AS main
USING fix_table AS fix
ON main.SubscriptionID = fix.SubscriptionID
WHEN MATCHED THEN UPDATE SET main.PriceUSD = fix.PriceUSD
""")

+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    95.0|   false|    false|
+--------------+------+--------+----------+----------+--------+--------+---------+



DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
#D – Time Travel Debugging
# View Delta version history
spark.sql("DESCRIBE HISTORY delta.`/Workspace/Shared/delta_subscriptions`").show()

# Read before merge using version
df_old = spark.read.format("delta").option("versionAsOf", 0) \
    .load("/Workspace/Shared/delta_subscriptions")

df_old.show()

+-------+-------------------+----------------+--------------------+---------+--------------------+----+-----------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|          userId|            userName|operation| operationParameters| job|         notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+----------------+--------------------+---------+--------------------+----+-----------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      7|2025-06-16 10:49:08|8778822765517627|azuser3551_mml.lo...|    MERGE|{predicate -> ["(...|NULL|{848225530731555}|0611-043435-vg20yowf|          6|WriteSerializable|        false|{numTargetRowsCop...|        NULL|Databricks-Runtim...|
|      6|2025-06-16 10:49:04|877

In [0]:
#E – Tier Migration (Basic → Pro → Premium)
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

win = Window.partitionBy("UserID").orderBy("StartDate")

# Detect plan migration
df_migration = df_subs.withColumn("prev_plan", lag("PlanType").over(win))

# Filter: Basic to Pro, or Pro to Premium
df_migration.filter(
    ((col("prev_plan") == "Basic") & (col("PlanType") == "Pro")) |
    ((col("prev_plan") == "Pro") & (col("PlanType") == "Premium"))
).select("UserID", "prev_plan", "PlanType", "StartDate").show()

+------+---------+--------+----------+
|UserID|prev_plan|PlanType| StartDate|
+------+---------+--------+----------+
|  U005|    Basic|     Pro|2024-02-06|
|  U005|      Pro| Premium|2024-03-07|
|  U006|    Basic|     Pro|2024-04-11|
+------+---------+--------+----------+



In [0]:
#F – Power Users Detection
from pyspark.sql.functions import countDistinct, sum

# Power user = Used ≥2 features and logged in ≥3 times
df_power_users = df_activity.groupBy("UserID") \
    .agg(
        countDistinct("FeatureUsed").alias("features_used"),
        sum((col("EventType") == "login").cast("int")).alias("login_count")
    ).filter((col("features_used") >= 2) & (col("login_count") >= 3))

# Save to Delta table
df_power_users.write.format("delta").mode("overwrite") \
    .save("/Workspace/Shared/power_users")

df_power_users.show()

+------+-------------+-----------+
|UserID|features_used|login_count|
+------+-------------+-----------+
|  U005|            3|          3|
+------+-------------+-----------+



In [0]:
#G – Session Replay View (Session Duration)

from pyspark.sql.functions import when,unix_timestamp

window_spec = Window.partitionBy("UserID").orderBy("EventTime")

df_session = df_activity.withColumn("event_ts", unix_timestamp("EventTime")) \
    .withColumn("prev_ts", lag("event_ts").over(window_spec)) \
    .withColumn("prev_event", lag("EventType").over(window_spec)) \
    .withColumn("session_secs", 
                when(col("EventType") == "logout", col("event_ts") - col("prev_ts"))) \
    .filter(col("EventType") == "logout")

df_session.select("UserID", "prev_event", "EventTime", "session_secs").show()

+------+----------+-------------------+------------+
|UserID|prev_event|          EventTime|session_secs|
+------+----------+-------------------+------------+
|  U001|     login|2024-04-10 16:00:00|      279480|
+------+----------+-------------------+------------+

