In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Second set") \
    .getOrCreate()


In [0]:
from pyspark.sql.functions import *
activitydf = spark.read.csv("dbfs:/FileStore/tables/user_activity.csv", header=True, inferSchema=True)
display(activitydf)
subscriptiondf = spark.read.csv("dbfs:/FileStore/tables/subscriptions.csv", header=True, inferSchema=True)
display(subscriptiondf)


UserID,EventTime,EventType,FeatureUsed
U001,2024-04-07T10:22:00Z,login,Dashboard
U002,2024-04-08T11:10:00Z,upload,Reports
U003,2024-04-09T09:45:00Z,download,Analytics
U001,2024-04-10T16:00:00Z,logout,Dashboard
U004,2024-04-11T12:00:00Z,login,Dashboard


SubscriptionID,UserID,PlanType,StartDate,EndDate,PriceUSD,IsActive,AutoRenew
SUB001,U001,Basic,2024-01-01,2024-04-01,30.0,True,True
SUB002,U002,Pro,2024-02-15,2024-05-15,90.0,True,False
SUB003,U003,Pro,2024-03-10,2024-06-10,90.0,False,False
SUB004,U001,Premium,2024-04-05,2024-07-05,120.0,True,True
SUB005,U004,Basic,2024-01-20,2024-04-20,30.0,False,False


In [0]:
# A. Subscription Engagement Score (Real Metric Modeling)
# Combine both datasets.
from pyspark.sql.functions import *
joined = activitydf.join(subscriptiondf, "UserID")
# Calculate:
# active_days = EndDate - StartDate

active = joined.withColumn("active_days", datediff(col("EndDate"), col("StartDate")))
active.show()
# events_per_user = count(EventType) grouped by UserID
events = activitydf.groupBy("UserID").count().withColumnRenamed("count", "events")
events.show()

# Create a score: engagement_score = (events_per_user / active_days) * PriceUSD
engagement = active.join(events, on="UserID")
engagement = engagement.withColumn(
    "engagement_score",
    (col("events") / col("active_days")) * col("PriceUSD")
)
engagement.groupBy("UserID").agg(
    sum("engagement_score").alias("total_engagement_score")
).show()
engagement.show()


+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+-----------+
|UserID|          EventTime|EventType|FeatureUsed|SubscriptionID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|active_days|
+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+-----------+
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|        SUB001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|         91|
|  U002|2024-04-08 11:10:00|   upload|    Reports|        SUB002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|         90|
|  U003|2024-04-09 09:45:00| download|  Analytics|        SUB003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|         92|
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|        SUB004| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|         91|
|  U004|2024-04-11 12:00:00|    login|  D

In [0]:
# B. Anomaly Detection via SQL
# Identify users with:
# Subscription inactive but recent activity

activitydf.createOrReplaceTempView("user_activity")
subscriptiondf.createOrReplaceTempView("subscriptions")

spark.sql(
    """
    CREATE OR REPLACE TEMP VIEW Anomaly_Inactive_RecentActivity AS
    SELECT a.UserID, s.SubscriptionID, s.IsActive, a.EventTime
    FROM user_activity a
    JOIN subscriptions s ON a.UserID = s.UserID
    WHERE s.IsActive = false
    AND a.EventTime >= '2024-04-01'  
"""
)
spark.sql("SELECT * FROM Anomaly_Inactive_RecentActivity").show()

# AutoRenew is true but no events in 30 days
# Use SQL views to expose this logic.
spark.sql("""
CREATE OR REPLACE TEMP VIEW Anomaly_AutoRenew_NoRecentEvents AS
SELECT s.UserID, s.SubscriptionID, s.AutoRenew
FROM subscriptions s
LEFT JOIN (
    SELECT DISTINCT UserID
    FROM user_activity
    WHERE EventTime >= DATE_SUB(CURRENT_DATE(), 30)
) recent ON s.UserID = recent.UserID
WHERE s.AutoRenew = true
  AND recent.UserID IS NULL
""")
spark.sql("SELECT * FROM Anomaly_AutoRenew_NoRecentEvents").show()



+------+--------------+--------+-------------------+
|UserID|SubscriptionID|IsActive|          EventTime|
+------+--------------+--------+-------------------+
|  U003|        SUB003|   false|2024-04-09 09:45:00|
|  U004|        SUB005|   false|2024-04-11 12:00:00|
+------+--------------+--------+-------------------+

+------+--------------+---------+
|UserID|SubscriptionID|AutoRenew|
+------+--------------+---------+
|  U001|        SUB001|     true|
|  U001|        SUB004|     true|
+------+--------------+---------+



In [0]:
# C. Delta Lake + Merge Simulation
# Imagine a billing fix needs to be applied:
# For all Pro plans in March, increase price by $5 retroactively.
# Use MERGE INTO on Delta table to apply the change.
subscriptiondf.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/subscriptions")
spark.sql("""
MERGE INTO delta.`file:/Workspace/Shared/subscriptions` target
USING (
  SELECT * FROM delta.`file:/Workspace/Shared/subscriptions` 
  WHERE PlanType = 'Pro' AND month(StartDate) = 3
) src
ON target.SubscriptionID = src.SubscriptionID
WHEN MATCHED THEN
  UPDATE SET target.PriceUSD = target.PriceUSD + 5
""").show()
     




+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                1|               1|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [0]:
# D. Time Travel Debugging
# Show describe history of the table before and after the billing fix.
# Query using VERSION AS OF to prove the issue existed.

spark.sql("DESCRIBE HISTORY delta.`file:/Workspace/Shared/subscriptions`").show()
old=spark.read.format("delta").option("versionAsOf", 0).load("file:/Workspace/Shared/subscriptions")
old.filter(col("PlanType") == "Pro").show()


+-------+--------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|          userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      1|2025-06-16 11:04:...|1679761755594499|azuser3546_mml.lo...|    MERGE|{predicate -> ["(...|NULL|{3751191479042554}|0612-091342-i15khidz|          0|WriteSerializable|        false|{numTargetRowsCop...|        NULL|Databricks-Runtim...|
|      0|2025-06-16 11:0

In [0]:
# E. Build Tier Migration Table
# Identify users who upgraded:
# From Basic → Pro → Premium
# Use PySpark with lag() function to model this.

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

subscriptiondf.createOrReplaceTempView("subs_raw")

df_mig = spark.sql("""
  SELECT UserID, StartDate, PlanType,
         ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY StartDate) AS rn
  FROM subs_raw
""")


window = Window.partitionBy("UserID").orderBy("StartDate")
df_mig = df_mig.withColumn("prev_plan", lag("PlanType").over(window))


df_mig.filter(
    (col("prev_plan") == "Basic") & (col("PlanType") == "Pro") |
    (col("prev_plan") == "Pro") & (col("PlanType") == "Premium")
).select("UserID", "prev_plan", "PlanType", "StartDate").show()


+------+---------+--------+---------+
|UserID|prev_plan|PlanType|StartDate|
+------+---------+--------+---------+
+------+---------+--------+---------+



In [0]:
# F. Power Users Detection
# Define a power user as:
# Used ≥ 2 features
# Logged in ≥ 3 times
# Create a separate Delta table power_users
feature = activitydf.groupBy("UserID").agg(countDistinct("FeatureUsed").alias("feature_count"))
feature.show()
login = activitydf.filter(col("EventType") == "login").groupBy("UserID").count().withColumnRenamed("count", "login_count")
login.show()
power_users = feature.join(login, "UserID") \
    .filter((col("feature_count") >= 2) & (col("login_count") >= 3))
power_users.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/power_users")
    

+------+-------------+
|UserID|feature_count|
+------+-------------+
|  U004|            1|
|  U002|            1|
|  U003|            1|
|  U001|            1|
+------+-------------+

+------+-----------+
|UserID|login_count|
+------+-----------+
|  U004|          1|
|  U001|          1|
+------+-----------+



In [0]:
# G. Session Replay View
# Build a user session trace table using:
# Window.partitionBy("UserID").orderBy("EventTime")
# Show how long each user spent between login and logout events.
Window.partitionBy("UserID").orderBy("EventTime")
from pyspark.sql.window import Window
from pyspark.sql.functions import col, unix_timestamp, lead, lag, when

sessions = activitydf.filter(col("EventType").isin("login", "logout"))

w = Window.partitionBy("UserID").orderBy("EventTime")

session_trace = sessions \
    .withColumn("next_event", lead("EventType").over(w)) \
    .withColumn("next_time", lead("EventTime").over(w)) \
    .withColumn("session_duration_seconds", 
                when(col("EventType") == "login", 
                     unix_timestamp("next_time") - unix_timestamp("EventTime")))

session_trace_filtered = session_trace.filter(col("EventType") == "login")

session_trace_filtered.select("UserID", "EventTime", "next_time", "session_duration_seconds").show()


+------+-------------------+-------------------+------------------------+
|UserID|          EventTime|          next_time|session_duration_seconds|
+------+-------------------+-------------------+------------------------+
|  U001|2024-04-07 10:22:00|2024-04-10 16:00:00|                  279480|
|  U004|2024-04-11 12:00:00|               NULL|                    NULL|
+------+-------------------+-------------------+------------------------+

