In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SubscriptionAnalytics") \
    .getOrCreate()


In [0]:
df_subs = spark.read.option("header", "true").option("inferSchema", "true").csv("file:/Workspace/Shared/subscriptions.csv")
df_activity = spark.read.option("header", "true").option("inferSchema", "true").csv("file:/Workspace/Shared/user_activity.csv")

In [0]:
df_subs.write.format("delta").mode("overwrite").save("/tmp/delta/subscriptions")
df_activity.write.format("delta").mode("overwrite").save("/tmp/delta/user_activity")

A. Subscription Engagement Score

In [0]:
from pyspark.sql.functions import datediff, count, col

df_subs = df_subs.withColumn("active_days", datediff("EndDate", "StartDate"))

events_per_user = df_activity.groupBy("UserID").agg(count("*").alias("events_per_user"))

df_engagement = df_subs.join(events_per_user, on="UserID", how="left").fillna(0)
df_engagement = df_engagement.withColumn(
    "engagement_score",
    (col("events_per_user") / col("active_days")) * col("PriceUSD")
)
df_engagement.select("SubscriptionID", "UserID", "engagement_score").show()

+--------------+------+------------------+
|SubscriptionID|UserID|  engagement_score|
+--------------+------+------------------+
|        SUB001|  U001|2.9670329670329667|
|        SUB002|  U002|               4.0|
|        SUB003|  U003| 2.934782608695652|
|        SUB004|  U001|11.868131868131867|
|        SUB005|  U004| 1.978021978021978|
|        SUB006|  U005|               0.0|
|        SUB007|  U006|               0.0|
|        SUB008|  U006|               0.0|
|        SUB009|  U006|               0.0|
+--------------+------+------------------+



B. Anomaly Detection via SQL

Subscription inactive but recent activity

In [0]:
from pyspark.sql.functions import to_date

df_activity = df_activity.withColumn("EventTime", col("EventTime").cast("timestamp"))

df_anomaly_1 = df_subs.filter(~col("IsActive")) \
    .join(df_activity, "UserID") \
    .filter(col("EventTime") > col("EndDate")) \
    .select("UserID").distinct()

print("Inactive Subscriptions but Recent Activity:")
df_anomaly_1.show()

Inactive Subscriptions but Recent Activity:
+------+
|UserID|
+------+
+------+



AutoRenew is true but no events in 30 days

In [0]:
from pyspark.sql.functions import max as max_, current_date, date_sub

latest_events = df_activity.groupBy("UserID").agg(max_("EventTime").alias("last_event"))

df_anomaly_2 = df_subs.filter(col("AutoRenew") == True) \
    .join(latest_events, "UserID", "left") \
    .filter((col("last_event").isNull()) | (col("last_event") < date_sub(current_date(), 30))) \
    .select("UserID").distinct()

print(" AutoRenew True but No Activity in 30 Days:")
df_anomaly_2.show()

 AutoRenew True but No Activity in 30 Days:
+------+
|UserID|
+------+
|  U001|
|  U006|
+------+



C. Delta Lake + Merge Simulation

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit
from pyspark.sql import Row

delta_subs = DeltaTable.forPath(spark, "/tmp/delta/subscriptions")

fix_df = spark.read.format("delta").load("/tmp/delta/subscriptions") \
    .filter((col("PlanType") == "Pro") & 
            (col("StartDate") >= "2024-03-01") & 
            (col("StartDate") <= "2024-03-31")) \
    .withColumn("PriceUSD", col("PriceUSD") + lit(5))

delta_subs.alias("target").merge(
    fix_df.alias("source"),
    "target.SubscriptionID = source.SubscriptionID"
).whenMatchedUpdateAll().execute()

print(" Merge fix applied to Pro subscriptions in March.")

 Merge fix applied to Pro subscriptions in March.


D. Time Travel Debugging

In [0]:
from pyspark.sql.functions import col

spark.sql("DESCRIBE HISTORY delta.`/tmp/delta/subscriptions`").show(truncate=False)

print("===== BEFORE Billing Fix (Version 0) =====")
df_before = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/subscriptions")
df_before.filter(
    (col("PlanType") == "Pro") &
    (col("StartDate") >= "2024-03-01") &
    (col("StartDate") <= "2024-03-31")
).select("SubscriptionID", "PlanType", "StartDate", "PriceUSD").show()

print("===== AFTER Billing Fix (Current Version) =====")
df_after = spark.read.format("delta").load("/tmp/delta/subscriptions")
df_after.filter(
    (col("PlanType") == "Pro") &
    (col("StartDate") >= "2024-03-01") &
    (col("StartDate") <= "2024-03-31")
).select("SubscriptionID", "PlanType", "StartDate", "PriceUSD").show()


+-------+-------------------+----------------+----------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

E. Build Tier Migration Table

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, when, concat_ws

df_subs = spark.read.format("delta").load("/tmp/delta/subscriptions")

user_window = Window.partitionBy("UserID").orderBy("StartDate")

df_with_lags = df_subs.withColumn("prev_plan", lag("PlanType", 1).over(user_window)) \
                      .withColumn("prev2_plan", lag("PlanType", 2).over(user_window))

df_migration = df_with_lags.withColumn(
    "migration_path",
    concat_ws(" → ", col("prev2_plan"), col("prev_plan"), col("PlanType"))
)

df_upgraded = df_migration.filter(col("migration_path") == "Basic → Pro → Premium")

df_upgraded.select("UserID", "migration_path", "StartDate", "PlanType").show()

+------+--------------------+----------+--------+
|UserID|      migration_path| StartDate|PlanType|
+------+--------------------+----------+--------+
|  U006|Basic → Pro → Pre...|2024-05-12| Premium|
+------+--------------------+----------+--------+



F. Power Users Detection

In [0]:
from pyspark.sql.functions import col, countDistinct, count, when

df_activity = spark.read.format("delta").load("/tmp/delta/user_activity")

df_logins = df_activity.filter(col("EventType") == "login")

df_power = df_activity.groupBy("UserID").agg(
    countDistinct("FeatureUsed").alias("unique_features"),
    count(when(col("EventType") == "login", True)).alias("login_count")
)

df_power_users = df_power.filter(
    (col("unique_features") >= 2) & (col("login_count") >= 3)
)

df_power_users.write.format("delta").mode("overwrite").save("/tmp/delta/power_users")

df_power_users.show()


+------+---------------+-----------+
|UserID|unique_features|login_count|
+------+---------------+-----------+
|  U004|              3|          3|
|  U001|              4|          3|
+------+---------------+-----------+



G. Session Replay View

In [0]:
from pyspark.sql.functions import col, lead, to_timestamp, unix_timestamp
from pyspark.sql.window import Window

df_activity = spark.read.format("delta").load("/tmp/delta/user_activity")

df_activity = df_activity.withColumn("EventTime", to_timestamp("EventTime"))

df_filtered = df_activity.filter(col("EventType").isin("login", "logout"))

user_window = Window.partitionBy("UserID").orderBy("EventTime")

df_sessions = df_filtered.withColumn("next_event", lead("EventType").over(user_window)) \
                         .withColumn("next_time", lead("EventTime").over(user_window))

df_sessions_matched = df_sessions.filter((col("EventType") == "login") & (col("next_event") == "logout"))

df_sessions_result = df_sessions_matched.withColumn(
    "session_duration_minutes",
    (unix_timestamp("next_time") - unix_timestamp("EventTime")) / 60
).select(
    "UserID", 
    col("EventTime").alias("login_time"), 
    col("next_time").alias("logout_time"), 
    "session_duration_minutes"
)

df_sessions_result.write.format("delta").mode("overwrite").save("/tmp/delta/user_sessions")

df_sessions_result.show(truncate=False)


+------+-------------------+-------------------+------------------------+
|UserID|login_time         |logout_time        |session_duration_minutes|
+------+-------------------+-------------------+------------------------+
|U001  |2024-04-07 10:22:00|2024-04-07 10:50:00|28.0                    |
|U001  |2024-04-08 09:00:00|2024-04-08 09:45:00|45.0                    |
|U001  |2024-04-09 08:00:00|2024-04-09 08:20:00|20.0                    |
|U002  |2024-04-08 11:10:00|2024-04-08 11:25:00|15.0                    |
|U003  |2024-04-09 09:45:00|2024-04-09 10:15:00|30.0                    |
|U004  |2024-04-11 12:00:00|2024-04-11 12:45:00|45.0                    |
+------+-------------------+-------------------+------------------------+

