**Intialize SparkSession**

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
      .appName("Subscription based Saas-platform")\
      .getOrCreate()
spark

**Subscription Engagement Score**

In [None]:
#Load the data
from pyspark.sql.functions import col
subscriptions=spark.read.option("header", True).option("inferSchema", True).csv("file:/Workspace/Shared/subscriptions.csv")
activity=spark.read.option("header", True).option("inferSchema", True).csv("file:/Workspace/Shared/user_activity.csv")
subscriptions.printSchema()
activity.printSchema()
subscriptions=subscriptions.withColumn("StartDate", col("StartDate").cast("date")) \
                             .withColumn("EndDate", col("EndDate").cast("date")) \
                             .withColumn("PriceUSD", col("PriceUSD").cast("double"))

activity=activity.withColumn("EventTime", col("EventTime").cast("timestamp"))

subscriptions.createOrReplaceTempView("subscriptions")
activity.createOrReplaceTempView("activity")

root
 |-- SubscriptionID: string (nullable = true)
 |-- UserID: string (nullable = true)
 |-- PlanType: string (nullable = true)
 |-- StartDate: date (nullable = true)
 |-- EndDate: date (nullable = true)
 |-- PriceUSD: double (nullable = true)
 |-- IsActive: boolean (nullable = true)
 |-- AutoRenew : string (nullable = true)

root
 |-- UserID: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)
 |-- EventType: string (nullable = true)
 |-- FeatureUsed : string (nullable = true)



In [None]:
from pyspark.sql.functions import col,datediff,count,sum,avg,expr,when,lit,lag,unix_timestamp
#subscription_days_active = EndDate - StartDate
subs_days = subscriptions.withColumn("subscription_days_active", datediff("EndDate", "StartDate"))
#activity_event_count
activity_event_count = activity.groupBy("UserID").agg(count("EventType").alias("activity_event_count"))
# Join and calculate user_engagement_score
engagement= subs_days.join(activity_event_count, "UserID", "left").fillna(0)
engagement= engagement.withColumn("user_engagement_score",
    (col("activity_event_count") / col("subscription_days_active")) * col("PriceUSD"))
print("The engagement score:")
engagement.select("SubscriptionID", "UserID", "user_engagement_score").show()

The engagement score:
+--------------+------+------------------+
|SubscriptionID|UserID|  engagement_score|
+--------------+------+------------------+
|        SUB001|  U001|0.6593406593406594|
|        SUB002|  U002|               1.0|
|        SUB003|  U003|0.9782608695652174|
|        SUB004|  U001|2.6373626373626378|
|        SUB005|  U004|0.3296703296703297|
+--------------+------+------------------+



**Anomaly Detection via SQL**

In [None]:
#Inactive but recently active

print("Inactive but recently active:")
spark.sql("""
SELECT s.UserID, s.IsActive, MAX(a.EventTime) AS LastEvent
FROM subscriptions s
LEFT JOIN activity a ON s.UserID = a.UserID
GROUP BY s.UserID, s.IsActive
HAVING s.IsActive = false AND LastEvent > current_date() - INTERVAL 7 DAYS
""").show()

#AutoRenew true but no activity in last 30 days

print("AutoRenew true but no activity in last 30 days:")
spark.sql("""
SELECT s.UserID, MAX(a.EventTime) AS LastEvent
FROM subscriptions s
LEFT JOIN activity a ON s.UserID = a.UserID
WHERE s.AutoRenew = true
GROUP BY s.UserID
HAVING LastEvent < current_date() - INTERVAL 30 DAYS OR LastEvent IS NULL
""").show()

Inactive but recently active:
+------+--------+---------+
|UserID|IsActive|LastEvent|
+------+--------+---------+
+------+--------+---------+

AutoRenew true but no activity in last 30 days:
+------+-------------------+
|UserID|          LastEvent|
+------+-------------------+
|  U001|2024-04-10 16:00:00|
+------+-------------------+



**Delta Table (Lakehouse) + Merge Simulation**

In [None]:
subscriptions.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/subscriptions")
spark.sql("""
MERGE INTO delta.`file:/Workspace/Shared/subscriptions` target
USING (
  SELECT * FROM delta.`file:/Workspace/Shared/subscriptions`
  WHERE PlanType = 'Pro' AND month(StartDate) = 3
) src
ON target.SubscriptionID = src.SubscriptionID
WHEN MATCHED THEN
  UPDATE SET target.PriceUSD = target.PriceUSD + 5
""").show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                1|               1|               0|                0|
+-----------------+----------------+----------------+-----------------+



**Time Travel Debugging**

In [None]:
# View history
spark.sql("DESCRIBE HISTORY delta.`file:/Workspace/Shared/subscriptions`").show()
# Previous version
old=spark.read.format("delta").option("versionAsOf", 0).load("file:/Workspace/Shared/subscriptions")
old.filter(col("PlanType") == "Pro").show()

+-------+--------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|          userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      3|2025-06-16 10:07:...|4833629471493945|azuser3545_mml.lo...|    MERGE|{predicate -> ["(...|NULL|{3587617172970376}|0611-043339-3vb7b9iv|          2|WriteSerializable|        false|{numTargetRowsCop...|        NULL|Databricks-Runtim...|
|      2|2025-06-16 10:0

**Tier Migration Table using lag**

In [None]:
from pyspark.sql.window import Window
w = Window.partitionBy("UserID").orderBy("StartDate")
migration = subscriptions.withColumn("PrevPlan", lag("PlanType").over(w))
migration.filter(
    (col("PrevPlan") == "Basic") & (col("PlanType") == "Pro")
).show()

+--------------+------+--------+---------+-------+--------+--------+---------+--------+
|SubscriptionID|UserID|PlanType|StartDate|EndDate|PriceUSD|IsActive|AutoRenew|PrevPlan|
+--------------+------+--------+---------+-------+--------+--------+---------+--------+
+--------------+------+--------+---------+-------+--------+--------+---------+--------+



**Power Users Detection**

In [None]:
from pyspark.sql.functions import countDistinct, col
feature = activity.groupBy("UserID").agg(countDistinct("FeatureUsed ").alias("feature_count"))
feature.show()
login = activity.filter(col("EventType") == "login").groupBy("UserID").count().withColumnRenamed("count", "login_count")
login.show()
high_usage_users = feature.join(login, "UserID") \
    .filter((col("feature_count") >= 2) & (col("login_count") >= 3))
high_usage_users.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/high_usage_users")

+------+-------------+
|UserID|feature_count|
+------+-------------+
|  U004|            1|
|  U002|            1|
|  U003|            1|
|  U001|            1|
+------+-------------+

+------+-----------+
|UserID|login_count|
+------+-----------+
|  U004|          1|
|  U001|          1|
+------+-----------+



**Session Replay View**

In [None]:
window=Window.partitionBy("UserID").orderBy("EventTime")
sessions=activity.withColumn("PrevEvent", lag("EventType").over(window)) \
                   .withColumn("PrevTime", lag("EventTime").over(window)) \
                   .withColumn("SessionDuration",
                        when((col("PrevEvent") == "login") & (col("EventType") == "logout"),
                             unix_timestamp("EventTime") - unix_timestamp("PrevTime"))
                    )
print("Sessions:")
sessions.select("UserID", "EventTime", "EventType", "SessionDuration").show()

Sessions:
+------+-------------------+---------+---------------+
|UserID|          EventTime|EventType|SessionDuration|
+------+-------------------+---------+---------------+
|  U001|2024-04-07 10:22:00|    login|           NULL|
|  U001|2024-04-10 16:00:00|   logout|         279480|
|  U002|2024-04-08 11:10:00|   upload|           NULL|
|  U003|2024-04-09 09:45:00| download|           NULL|
|  U004|2024-04-11 12:00:00|    login|           NULL|
+------+-------------------+---------+---------------+

