In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Spark DataFrames").getOrCreate()

Subscription Engagement Score (Real Metric Modeling)

In [0]:
sub_df = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/FileStore/shared_uploads/azuser3548_mml.local@techademy.com/subscriptions.csv")
activity_df = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/FileStore/shared_uploads/azuser3548_mml.local@techademy.com/user_activity.csv")
sub_df.show()
activity_df.show()

+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|
|        SUB005|  U004|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|
+--------------+------+--------+----------+----------+--------+--------+---------+

+------+-------------------+---------+-----------+
|UserID|          EventTime|EventType|FeatureUsed|
+------+-------------------+---------+-----------+
|  U001|2024-04-07 10:22:00|    login|  Dashboard|
|  U002|2024-04-08 11:10:00|   upload|    Report

In [0]:
# Combine both datasets.
# Calculate:
# active_days = EndDate - StartDate
# events_per_user = count(EventType) grouped by UserID
# Create a score: engagement_score = (events_per_user / active_days) * PriceUSD

from pyspark.sql.functions import datediff, count, col
sub_df = sub_df.withColumn("active_days", datediff("EndDate", "StartDate"))
events_df = activity_df.groupBy("UserID").agg(count("*").alias("events_per_user"))
engagement_df = sub_df.join(events_df, "UserID", "left") \
    .withColumn("engagement_score", (col("events_per_user") / col("active_days")) * col("PriceUSD"))
engagement_df.show()


+------+--------------+--------+----------+----------+--------+--------+---------+-----------+---------------+------------------+
|UserID|SubscriptionID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|active_days|events_per_user|  engagement_score|
+------+--------------+--------+----------+----------+--------+--------+---------+-----------+---------------+------------------+
|  U001|        SUB001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|         91|              2|0.6593406593406594|
|  U002|        SUB002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|         90|              1|               1.0|
|  U003|        SUB003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|         92|              1|0.9782608695652174|
|  U001|        SUB004| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|         91|              2|2.6373626373626378|
|  U004|        SUB005|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|        

Anomaly Detection via SQL

In [0]:
# Identify users with:
# Subscription inactive but recent activity
# AutoRenew is true but no events in 30 days
# Use SQL views to expose this logic.

from pyspark.sql.functions import to_timestamp, to_date
sub_df.createOrReplaceTempView("subscriptions")
activity_df = activity_df.withColumn("EventTime", to_timestamp("EventTime"))
activity_df.createOrReplaceTempView("user_activity")

spark.sql("""create or replace temp view inactive_with_recent_activity AS
SELECT s.SubscriptionID,s.UserID,s.IsActive,max(a.EventTime) AS last_activity
FROM subscriptions s
JOIN user_activity a ON s.UserID = a.UserID
WHERE s.IsActive = false
GROUP BY s.SubscriptionID, s.UserID, s.IsActive
HAVING datediff(to_date('2024-04-15'), last_activity)<30""")

spark.sql("select * from inactive_with_recent_activity").show()

spark.sql("""CREATE OR REPLACE TEMP VIEW autorenew_no_recent_activity AS
SELECT s.SubscriptionID,s.UserID,s.AutoRenew,MAX(a.EventTime) AS last_activity
FROM subscriptions s
LEFT JOIN user_activity a ON s.UserID = a.UserID
WHERE s.AutoRenew = true
GROUP BY s.SubscriptionID, s.UserID, s.AutoRenew
HAVING last_activity IS NULL OR datediff(current_date(), last_activity) > 30""")

spark.sql("select * from autorenew_no_recent_activity").show()  

+--------------+------+--------+-------------------+
|SubscriptionID|UserID|IsActive|      last_activity|
+--------------+------+--------+-------------------+
|        SUB003|  U003|   false|2024-04-09 09:45:00|
|        SUB005|  U004|   false|2024-04-11 12:00:00|
+--------------+------+--------+-------------------+

+--------------+------+---------+-------------------+
|SubscriptionID|UserID|AutoRenew|      last_activity|
+--------------+------+---------+-------------------+
|        SUB001|  U001|     true|2024-04-10 16:00:00|
|        SUB004|  U001|     true|2024-04-10 16:00:00|
+--------------+------+---------+-------------------+



Delta Lake + Merge Simulation

In [0]:
# Imagine a billing fix needs to be applied:
# For all Pro plans in March, increase price by $5 retroactively.
# Use MERGE INTO on Delta table to apply the change.

from delta.tables import DeltaTable
sub_df.write.format("delta").mode("overwrite").save("/delta/subscriptions")
delta_subs = DeltaTable.forPath(spark, "/delta/subscriptions")
update_df = sub_df.filter((col("PlanType") == "Pro") & (col("StartDate").startswith("2024-03"))).withColumn("PriceUSD", col("PriceUSD") + 5)
delta_subs.alias("target").merge(update_df.alias("updates"),"target.SubscriptionID = updates.SubscriptionID").whenMatchedUpdate(set={"PriceUSD": "updates.PriceUSD"}).execute()
display(spark.read.format("delta").load("/delta/subscriptions"))


SubscriptionID,UserID,PlanType,StartDate,EndDate,PriceUSD,IsActive,AutoRenew,active_days
SUB001,U001,Basic,2024-01-01,2024-04-01,30.0,True,True,91
SUB002,U002,Pro,2024-02-15,2024-05-15,90.0,True,False,90
SUB004,U001,Premium,2024-04-05,2024-07-05,120.0,True,True,91
SUB005,U004,Basic,2024-01-20,2024-04-20,30.0,False,False,91
SUB003,U003,Pro,2024-03-10,2024-06-10,95.0,False,False,92


Time Travel Debugging

In [0]:
# Show describe history of the table before and after the billing fix.
# Query using VERSION AS OF to prove the issue existed.

delta_subs.history().show()

# Read previous version
print("Before the change:")
spark.read.format("delta").option("versionAsOf", 0).load("/delta/subscriptions").show()

print("After the change:")
spark.read.format("delta").load("/delta/subscriptions").show()

Before the change:
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|active_days|
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|         91|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|         90|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|         92|
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|         91|
|        SUB005|  U004|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|         91|
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+

+-------+-------------------+----------------+--------------------+---------+--------------------+----+-----------------+----

Build Tier Migration Table

In [0]:
# Identify users who upgraded:
# From Basic → Pro → Premium
# Use PySpark with lag() function to model this.

from pyspark.sql.window import Window
from pyspark.sql.functions import lag

window_spec = Window.partitionBy("UserID").orderBy("StartDate")

tier_df = sub_df.withColumn("previous_plan", lag("PlanType").over(window_spec)) \
    .filter((col("previous_plan") == "Basic") & (col("PlanType") == "Pro") |
            (col("previous_plan") == "Pro") & (col("PlanType") == "Premium"))
tier_df.show()


+--------------+------+--------+---------+-------+--------+--------+---------+-----------+-------------+
|SubscriptionID|UserID|PlanType|StartDate|EndDate|PriceUSD|IsActive|AutoRenew|active_days|previous_plan|
+--------------+------+--------+---------+-------+--------+--------+---------+-----------+-------------+
+--------------+------+--------+---------+-------+--------+--------+---------+-----------+-------------+



Power Users Detection

In [0]:
# Define a power user as:
# Used ≥ 2 features
# Logged in ≥ 3 times
# Create a separate Delta table power_users

from pyspark.sql.functions import countDistinct

power_users_df = activity_df.groupBy("UserID").agg(
    countDistinct("FeatureUsed").alias("features_used"),
    count("*").alias("login_count")
).filter((col("features_used") >= 2) & (col("login_count") >= 3))
power_users_df.write.format("delta").mode("overwrite").saveAsTable("power_users")
display(power_users_df)

UserID,features_used,login_count


Session Replay View

In [0]:
# Build a user session trace table using:
# Window.partitionBy("UserID").orderBy("EventTime")
# Show how long each user spent between login and logout events.

from pyspark.sql.functions import lead, unix_timestamp,when
from pyspark.sql.window import Window

window = Window.partitionBy("UserID").orderBy("EventTime")

session_replay = activity_df.withColumn("event_ts", unix_timestamp("EventTime")) \
    .withColumn("prev_ts", lag("event_ts").over(window)) \
    .withColumn("prev_event", lag("EventType").over(window)) \
    .withColumn("session_secs", 
                when(col("EventType") == "logout", col("event_ts") - col("prev_ts"))) \
    .filter(col("EventType") == "logout")
session_replay.select("UserID", "prev_event", "EventTime", "session_secs").show()


+------+----------+-------------------+------------+
|UserID|prev_event|          EventTime|session_secs|
+------+----------+-------------------+------------+
|  U001|     login|2024-04-10 16:00:00|      279480|
+------+----------+-------------------+------------+

