In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import *

# Initialize Spark with Delta Lake
spark = SparkSession.builder \
    .appName("SaaSAnalytics") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
subscriptions = spark.read.csv("/FileStore/tables/subscriptions.csv", header=True, inferSchema=True)
user_activity = spark.read.csv("/FileStore/tables/user_activity.csv", header=True, inferSchema=True) \
.withColumn("EventTime", to_timestamp(col("EventTime")))

In [0]:
# Calculate active days for each subscription
subscriptions_with_days = subscriptions.withColumn("active_days", datediff(col("EndDate"), col("StartDate")))

# Count events per user
events_per_user = user_activity.groupBy("UserID").agg(count("EventType").alias("event_count"))

# Calculate engagement score
engagement_scores = subscriptions_with_days.join(events_per_user, "UserID", "left").withColumn("engagement_score",
(col("event_count") / col("active_days")) * col("PriceUSD"))

display(engagement_scores)

UserID,SubscriptionID,PlanType,StartDate,EndDate,PriceUSD,IsActive,AutoRenew,active_days,event_count,engagement_score
U001,SUB001,Basic,2024-01-01,2024-04-01,30.0,True,True,91,2,0.6593406593406594
U002,SUB002,Pro,2024-02-15,2024-05-15,90.0,True,False,90,1,1.0
U003,SUB003,Pro,2024-03-10,2024-06-10,90.0,False,False,92,1,0.9782608695652174
U001,SUB004,Premium,2024-04-05,2024-07-05,120.0,True,True,91,2,2.6373626373626378
U004,SUB005,Basic,2024-01-20,2024-04-20,30.0,False,False,91,1,0.3296703296703297


In [0]:
subscriptions.createOrReplaceTempView("subscriptions")
user_activity.createOrReplaceTempView("user_activity")

# Identify inactive subscriptions with recent activity
spark.sql("""create or replace TEMP view inactive_with_activity as
select s.SubscriptionID, s.UserID, s.IsActive, MAX(a.EventTime) as last_activity
from subscriptions s join  user_activity a on s.UserID = a.UserID
where s.IsActive = false group by s.SubscriptionID, s.UserID, s.IsActive
having datediff(to_date('2024-04-15'), last_activity) < 30""")

spark.sql("SELECT * FROM inactive_with_activity").show()

# Identify autorenew with no recent activity
spark.sql("""create or replace TEMP view autorenew_no_activity as
select s.SubscriptionID, s.UserID, s.AutoRenew, MAX(a.EventTime) as last_activity
from subscriptions s left join user_activity a on s.UserID = a.UserID
where s.AutoRenew = true group by s.SubscriptionID, s.UserID, s.AutoRenew
having last_activity is null or datediff(current_date(), last_activity) > 30""")

spark.sql("SELECT * FROM autorenew_no_activity").show()

+--------------+------+--------+-------------------+
|SubscriptionID|UserID|IsActive|      last_activity|
+--------------+------+--------+-------------------+
|        SUB005|  U004|   false|2024-04-11 12:00:00|
|        SUB003|  U003|   false|2024-04-09 09:45:00|
+--------------+------+--------+-------------------+

+--------------+------+---------+-------------------+
|SubscriptionID|UserID|AutoRenew|      last_activity|
+--------------+------+---------+-------------------+
|        SUB004|  U001|     true|2024-04-10 16:00:00|
|        SUB001|  U001|     true|2024-04-10 16:00:00|
+--------------+------+---------+-------------------+



In [0]:
# Convert to Delta table
delta_path = "/dbfs/deltalake/subscriptions"
subscriptions.write.format("delta").mode("overwrite").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)

delta_table.alias("target").merge(subscriptions.alias("source").filter(col("PlanType") == "Pro").filter(month(col("StartDate")) == 3),
"target.SubscriptionID = source.SubscriptionID").whenMatchedUpdate(set={"PriceUSD": "source.PriceUSD + 5"}).execute()

display(spark.read.format("delta").load(delta_path))

SubscriptionID,UserID,PlanType,StartDate,EndDate,PriceUSD,IsActive,AutoRenew
SUB001,U001,Basic,2024-01-01,2024-04-01,30.0,True,True
SUB002,U002,Pro,2024-02-15,2024-05-15,90.0,True,False
SUB004,U001,Premium,2024-04-05,2024-07-05,120.0,True,True
SUB005,U004,Basic,2024-01-20,2024-04-20,30.0,False,False
SUB003,U003,Pro,2024-03-10,2024-06-10,95.0,False,False


In [0]:
# Describe history
delta_table.history().show()

# Query before the change
print("Before the change:")
spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(delta_path) \
    .filter((col("PlanType") == "Pro") & (month(col("StartDate")) == 3)) \
    .show()

# Query after the change 
print("After the change:")
spark.read.format("delta") \
    .load(delta_path) \
    .filter((col("PlanType") == "Pro") & (month(col("StartDate")) == 3)) \
    .show()

+-------+-------------------+---------------+--------------------+---------+--------------------+----+--------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|            userName|operation| operationParameters| job|notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+--------------------+---------+--------------------+----+--------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     12|2025-06-16 11:19:27|442779750304833|azuser3547_mml.lo...| OPTIMIZE|{predicate -> [],...|NULL|    NULL|0616-103841-l1w2z...|         11|SnapshotIsolation|        false|{numRemovedFiles ...|        NULL|Databricks-Runtim...|
|     11|2025-06-16 11:19:26|442779750304833|azuser3547_mml.lo...|    ME

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

win = Window.partitionBy("UserID").orderBy("StartDate")

# Detect plan migration
df_migration = subscriptions.withColumn("prev_plan", lag("PlanType").over(win))

# Filter: Basic to Pro, or Pro to Premium
df_migration.filter(
    ((col("prev_plan") == "Basic") & (col("PlanType") == "Pro")) |
    ((col("prev_plan") == "Pro") & (col("PlanType") == "Premium"))
).select("UserID", "prev_plan", "PlanType", "StartDate").show()

+------+---------+--------+---------+
|UserID|prev_plan|PlanType|StartDate|
+------+---------+--------+---------+
+------+---------+--------+---------+



In [0]:
# Calculate feature usage
feature_usage = user_activity.groupBy("UserID", "FeatureUsed") \
    .agg(count("*").alias("feature_count"))

# Calculate login counts
login_counts = user_activity.filter(col("EventType") == "login") \
    .groupBy("UserID") \
    .agg(count("*").alias("login_count"))

# Identify power users
power_users = feature_usage.groupBy("UserID") \
    .agg(countDistinct("FeatureUsed").alias("distinct_features")) \
    .join(login_counts, "UserID") \
    .filter((col("distinct_features") >= 2) & (col("login_count") >= 3)) \
    .select("UserID", lit("power_user").alias("user_type"))

# Save as Delta table
power_users.write.format("delta") \
    .mode("overwrite") \
    .save("dbfs:/deltalake/power_users")
display(power_users)

UserID,user_type


In [0]:
from pyspark.sql.functions import when,unix_timestamp

window_spec = Window.partitionBy("UserID").orderBy("EventTime")

df_session = user_activity.withColumn("event_ts", unix_timestamp("EventTime")) \
    .withColumn("prev_ts", lag("event_ts").over(window_spec)) \
    .withColumn("prev_event", lag("EventType").over(window_spec)) \
    .withColumn("session_secs", 
                when(col("EventType") == "logout", col("event_ts") - col("prev_ts"))) \
    .filter(col("EventType") == "logout")

df_session.select("UserID", "prev_event", "EventTime", "session_secs").show()

+------+----------+-------------------+------------+
|UserID|prev_event|          EventTime|session_secs|
+------+----------+-------------------+------------+
|  U001|     login|2024-04-10 16:00:00|      279480|
+------+----------+-------------------+------------+

