In [0]:
from pyspark.sql.functions import col, to_date
subscriptions = (spark.read.option("header", True)
                           .csv("file:/Workspace/Shared/subscriptions.csv")
                           .withColumn("StartDate", to_date("StartDate"))
                           .withColumn("EndDate", to_date("EndDate"))
                           .withColumn("PriceUSD", col("PriceUSD").cast("double"))
                           .withColumn("IsActive", col("IsActive").cast("boolean"))
                           .withColumn("AutoRenew", col("AutoRenew").cast("boolean")))

subscriptions.write.format("delta").mode("overwrite").saveAsTable("subscriptions_delta")
from pyspark.sql.functions import to_timestamp

user_activity = (spark.read.option("header", True)
                           .csv("file:/Workspace/Shared/user_activity.csv")
                           .withColumn("EventTime", to_timestamp("EventTime")))

user_activity.write.format("delta").mode("overwrite").saveAsTable("user_activity_delta")


Subscription Engagement Score

In [0]:
from pyspark.sql.functions import datediff, count
subs = subscriptions.withColumn("active_days", datediff("EndDate", "StartDate"))
events = user_activity.groupBy("UserID").agg(count("*").alias("events_per_user"))
engagement = (subs.join(events, "UserID", "left")
                  .fillna(0, subset=["events_per_user"])
                  .withColumn("engagement_score", (col("events_per_user") / col("active_days")) * col("PriceUSD")))

engagement.select("SubscriptionID", "UserID", "engagement_score").show()


+--------------+------+------------------+
|SubscriptionID|UserID|  engagement_score|
+--------------+------+------------------+
|        SUB001|  U001|0.6593406593406594|
|        SUB002|  U002|               1.0|
|        SUB003|  U003|0.9782608695652174|
|        SUB004|  U001|2.6373626373626378|
|        SUB005|  U004|0.3296703296703297|
+--------------+------+------------------+



Anomaly Detection via SQL

In [0]:
%sql
CREATE OR REPLACE VIEW inactive_recent_activity AS
SELECT DISTINCT s.UserID, s.SubscriptionID
FROM subscriptions_delta s
JOIN user_activity_delta u ON s.UserID = u.UserID
WHERE s.IsActive = false AND u.EventTime > s.EndDate;
CREATE OR REPLACE VIEW autorenew_no_recent_events AS
SELECT s.UserID, s.SubscriptionID
FROM subscriptions_delta s
LEFT JOIN user_activity_delta u ON s.UserID = u.UserID
WHERE s.AutoRenew = true
GROUP BY s.UserID, s.SubscriptionID
HAVING MAX(u.EventTime) < current_date() - interval 30 days OR MAX(u.EventTime) IS NULL;


Delta Lake + Merge Simulation

In [0]:
from delta.tables import DeltaTable
subs_table = DeltaTable.forName(spark, "subscriptions_delta")
subs_table.alias("target").merge(
    source=subscriptions.alias("source").filter(
        (col("PlanType") == "Pro") & (col("StartDate").between("2024-03-01", "2024-03-31"))
    ),
    condition="target.SubscriptionID = source.SubscriptionID"
).whenMatchedUpdate(set={"PriceUSD": "source.PriceUSD + 5"}).execute()


Time Travel Debugging



In [0]:
%sql
DESCRIBE HISTORY subscriptions_delta;
SELECT * FROM subscriptions_delta VERSION AS OF 0 WHERE PlanType = 'Pro';
SELECT * FROM subscriptions_delta WHERE PlanType = 'Pro';


SubscriptionID,UserID,PlanType,StartDate,EndDate,PriceUSD,IsActive,AutoRenew
SUB002,U002,Pro,2024-02-15,2024-05-15,90.0,True,False
SUB003,U003,Pro,2024-03-10,2024-06-10,95.0,False,False


Build Tier Migration Table

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
migrations = subscriptions.select("UserID", "StartDate", "PlanType")\
    .withColumn("StartDate", col("StartDate").cast("date"))\
    .orderBy("UserID", "StartDate")

window_spec = Window.partitionBy("UserID").orderBy("StartDate")
migrations = migrations.withColumn("prev_plan", lag("PlanType").over(window_spec))
tier_migration = migrations.filter(
    ((col("prev_plan") == "Basic") & (col("PlanType") == "Pro")) |
    ((col("prev_plan") == "Pro") & (col("PlanType") == "Premium"))
)

tier_migration.show()


+------+---------+--------+---------+
|UserID|StartDate|PlanType|prev_plan|
+------+---------+--------+---------+
+------+---------+--------+---------+



Power Users Detection

In [0]:
from pyspark.sql.functions import countDistinct, sum, when
power_users = (user_activity.groupBy("UserID")
    .agg(
        countDistinct("FeatureUsed").alias("feature_count"),
        sum(when(col("EventType") == "login", 1).otherwise(0)).alias("login_count")
    )
    .filter((col("feature_count") >= 2) & (col("login_count") >= 3)))
power_users.write.format("delta").mode("overwrite").saveAsTable("power_users")


Session Replay View

In [0]:
from pyspark.sql.functions import lead, unix_timestamp
session_events = user_activity.filter(col("EventType").isin("login", "logout"))
window_spec = Window.partitionBy("UserID").orderBy("EventTime")
session_trace = (session_events.withColumn("next_event", lead("EventType").over(window_spec))
                                 .withColumn("next_time", lead("EventTime").over(window_spec))
                                 .withColumn("duration_sec", unix_timestamp("next_time") - unix_timestamp("EventTime"))
                                 .filter(col("EventType") == "login"))

session_trace.select("UserID", "EventTime", "next_time", "duration_sec").show()


+------+-------------------+-------------------+------------+
|UserID|          EventTime|          next_time|duration_sec|
+------+-------------------+-------------------+------------+
|  U001|2024-04-07 10:22:00|2024-04-10 16:00:00|      279480|
|  U004|2024-04-11 12:00:00|               NULL|        NULL|
+------+-------------------+-------------------+------------+

