In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CourseData").getOrCreate()
df_sub = spark.read.option("header", True).option("inferSchema", True).csv("file:/Workspace/Shared/subscriptions.csv")
df_user = spark.read.option("header", True).option("inferSchema", True).csv("file:/Workspace/Shared/user_activity.csv")
df_sub.printSchema()
df_sub.show()
df_user.printSchema()
df_user.show()

root
 |-- SubscriptionID: string (nullable = true)
 |-- UserID: string (nullable = true)
 |-- PlanType: string (nullable = true)
 |-- StartDate: date (nullable = true)
 |-- EndDate: date (nullable = true)
 |-- PriceUSD: double (nullable = true)
 |-- IsActive: boolean (nullable = true)
 |-- AutoRenew: boolean (nullable = true)

+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|
|        SUB005|  U004|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|
+------

In [0]:
# A. Subscription Engagement Score (Real Metric Modeling)
# Combine both datasets.
# Calculate:
# active_days = EndDate - StartDate
# events_per_user = count(EventType) grouped by UserID
# Create a score: engagement_score = (events_per_user / active_days) * PriceUSD
from pyspark.sql.functions import col, datediff, count, when, sum as spark_sum

df_sub = df_sub.withColumn("active_days", datediff(col("EndDate"), col("StartDate")))
events_per_user = df_user.groupBy("UserID").agg(count("EventType").alias("events_per_user"))
df_combined = df_sub.join(events_per_user, on="UserID", how="left")
df_combined = df_combined.fillna({"events_per_user": 0})

df_combined = df_combined.withColumn(
    "engagement_score", (col("events_per_user") / col("active_days")) * col("PriceUSD")
)

df_combined.select("SubscriptionID", "UserID", "PlanType", "engagement_score").show()



In [0]:
# B. Anomaly Detection via SQL
# Identify users with:
# Subscription inactive but recent activity
# AutoRenew is true but no events in 30 days
# Use SQL views to expose this logic.

df_sub.createOrReplaceTempView("subscriptions")
df_user.createOrReplaceTempView("user_activity")



In [0]:
spark.sql("""

CREATE OR REPLACE TEMP VIEW inactive_with_activity AS
SELECT DISTINCT s.UserID, s.SubscriptionID, s.IsActive, a.EventTime
FROM subscriptions s
JOIN user_activity a ON s.UserID = a.UserID
WHERE s.IsActive = false
  AND a.EventTime >= CURRENT_DATE - INTERVAL '30 days'
  """)


spark.sql("SELECT * FROM inactive_with_activity").show()


+------+--------------+--------+---------+
|UserID|SubscriptionID|IsActive|EventTime|
+------+--------------+--------+---------+
+------+--------------+--------+---------+



In [0]:
spark.sql("""
CREATE OR REPLACE TEMP VIEW auto_renew_no_activity AS
SELECT s.UserID, s.SubscriptionID, s.AutoRenew, last_event
FROM subscriptions s
LEFT JOIN (
    SELECT UserID, MAX(EventTime) AS last_event
    FROM user_activity
    GROUP BY UserID
) ua ON s.UserID = ua.UserID
WHERE s.AutoRenew = true
  AND (ua.last_event IS NULL OR ua.last_event < current_date() - interval 30 days)
""")

spark.sql("SELECT * FROM auto_renew_no_activity").show()



+------+--------------+---------+-------------------+
|UserID|SubscriptionID|AutoRenew|         last_event|
+------+--------------+---------+-------------------+
|  U001|        SUB001|     true|2024-04-10 16:00:00|
|  U001|        SUB004|     true|2024-04-10 16:00:00|
+------+--------------+---------+-------------------+



In [0]:
# C. Delta Lake + Merge Simulation
# Imagine a billing fix needs to be applied:
# For all Pro plans in March, increase price by $5 retroactively.
# Use MERGE INTO on Delta table to apply the change.

df_sub.write.format("delta").mode("overwrite").save("/delta/subscriptions")
from delta.tables import DeltaTable

delta_subs = DeltaTable.forPath(spark, "/delta/subscriptions")

from pyspark.sql.functions import lit, month, year,col
df_fix = df_sub.filter(
    (col("PlanType") == "Pro") &
    (month("StartDate") == 3) &
    (year("StartDate") == 2024)).withColumn("PriceUSD", col("PriceUSD") + lit(5))

delta_subs.alias("target").merge(
    source=df_fix.alias("source"),
    condition="target.SubscriptionID = source.SubscriptionID").whenMatchedUpdate(
    set={"PriceUSD": "source.PriceUSD"}).execute()

spark.read.format("delta").load("/delta/subscriptions") \
    .select("SubscriptionID", "UserID", "PlanType", "StartDate", "PriceUSD") \
    .filter("PlanType = 'Pro' AND month(StartDate) = 3") \
    .show()



+--------------+------+--------+----------+--------+
|SubscriptionID|UserID|PlanType| StartDate|PriceUSD|
+--------------+------+--------+----------+--------+
|        SUB003|  U003|     Pro|2024-03-10|    95.0|
+--------------+------+--------+----------+--------+



In [0]:
# D. Time Travel Debugging
# Show describe history of the table before and after the billing fix.
# Query using VERSION AS OF to prove the issue existed.
from delta.tables import DeltaTable

delta_subs = DeltaTable.forPath(spark, "/delta/subscriptions")
delta_subs.history().show(truncate=False)

# Query version before the fix
df_before = spark.read.format("delta").option("versionAsOf", 0).load("/delta/subscriptions")

df_before.filter("PlanType = 'Pro' AND month(StartDate) = 3").select(
    "SubscriptionID", "PlanType", "StartDate", "PriceUSD"
).show()

# Query current (latest) version after fix
df_after = spark.read.format("delta").load("/delta/subscriptions")

df_after.filter("PlanType = 'Pro' AND month(StartDate) = 3").select(
    "SubscriptionID", "PlanType", "StartDate", "PriceUSD"
).show()


+-------+-------------------+----------------+----------------------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# E. Build Tier Migration Table
# Identify users who upgraded:
# From Basic → Pro → Premium
# Use PySpark with lag() function to model this.
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lower

# Lowercase plan type for consistency
df_sub = df_sub.withColumn("PlanType", lower(col("PlanType")))

# Window spec: partition by User, ordered by StartDate
windowSpec = Window.partitionBy("UserID").orderBy("StartDate")

# Add lag columns
df_migration = df_sub.withColumn("PreviousPlan", lag("PlanType", 1).over(windowSpec)) \
                     .withColumn("TwoStepsAgo", lag("PlanType", 2).over(windowSpec))

df_migration.select("UserID", "StartDate", "PlanType", "PreviousPlan", "TwoStepsAgo").show()


+------+----------+--------+------------+-----------+
|UserID| StartDate|PlanType|PreviousPlan|TwoStepsAgo|
+------+----------+--------+------------+-----------+
|  U001|2024-01-01|   basic|        NULL|       NULL|
|  U001|2024-04-05| premium|       basic|       NULL|
|  U002|2024-02-15|     pro|        NULL|       NULL|
|  U003|2024-03-10|     pro|        NULL|       NULL|
|  U004|2024-01-20|   basic|        NULL|       NULL|
+------+----------+--------+------------+-----------+



In [0]:
# F. Power Users Detection
# Define a power user as:
# Used ≥ 2 features
# Logged in ≥ 3 times
# Create a separate Delta table power_users
from pyspark.sql.functions import col, countDistinct, sum, when

df_user_summary = df_user.groupBy("UserID").agg(
    countDistinct("FeatureUsed").alias("distinct_features"),
    sum(when(col("EventType") == "login", 1).otherwise(0)).alias("login_count")
)
df_power_users = df_user_summary.filter(
    (col("distinct_features") >= 2) &
    (col("login_count") >= 3)
)
df_power_users.write.format("delta").mode("overwrite").save("/delta/power_users")
spark.sql("DROP TABLE IF EXISTS power_users")
spark.sql("CREATE TABLE power_users USING DELTA LOCATION '/delta/power_users'")
spark.read.format("delta").load("/delta/power_users").show()



+------+-----------------+-----------+
|UserID|distinct_features|login_count|
+------+-----------------+-----------+
+------+-----------------+-----------+



In [0]:

# G. Session Replay View
# Build a user session trace table using:
# Window.partitionBy("UserID").orderBy("EventTime")
# Show how long each user spent between login and logout events.
from pyspark.sql.functions import col, to_timestamp

# Ensure EventTime is in timestamp format
df_user = df_user.withColumn("EventTime", to_timestamp("EventTime"))

# Filter login and logout events
df_filtered = df_user.filter((col("EventType") == "login") | (col("EventType") == "logout"))

from pyspark.sql.window import Window
from pyspark.sql.functions import lead

# Define window ordered by EventTime
windowSpec = Window.partitionBy("UserID").orderBy("EventTime")


df_session = df_filtered.withColumn("NextEvent", lead("EventType").over(windowSpec)) \
                        .withColumn("NextEventTime", lead("EventTime").over(windowSpec))

df_sessions_only = df_session.filter((col("EventType") == "login") & (col("NextEvent") == "logout"))

from pyspark.sql.functions import expr

df_session_duration = df_sessions_only.withColumn(
    "SessionDurationMinutes",
    expr("round((unix_timestamp(NextEventTime) - unix_timestamp(EventTime)) / 60, 2)")
)

df_session_duration.select("UserID", "EventTime", "NextEventTime", "SessionDurationMinutes").show()


+------+-------------------+-------------------+----------------------+
|UserID|          EventTime|      NextEventTime|SessionDurationMinutes|
+------+-------------------+-------------------+----------------------+
|  U001|2024-04-07 10:22:00|2024-04-10 16:00:00|                4658.0|
+------+-------------------+-------------------+----------------------+

