In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Subscription-Based SaaS Platform").getOrCreate()
spark

In [0]:
df_act = spark.read.csv("dbfs:/FileStore/shared_uploads/azuser3559_mml.local@techademy.com/user_activity.csv", header=True, inferSchema=True)
df_act.show()
df_sub = spark.read.csv("dbfs:/FileStore/shared_uploads/azuser3559_mml.local@techademy.com/subscriptions.csv", header=True, inferSchema=True)
df_sub.show()

+------+-------------------+---------+-----------+
|UserID|          EventTime|EventType|FeatureUsed|
+------+-------------------+---------+-----------+
|  U001|2024-04-07 10:22:00|    login|  Dashboard|
|  U002|2024-04-08 11:10:00|   upload|    Reports|
|  U003|2024-04-09 09:45:00| download|  Analytics|
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|
|  U004|2024-04-11 12:00:00|    login|  Dashboard|
+------+-------------------+---------+-----------+

+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|
|        SUB004|  U001| Premium|2024-04-05

In [0]:
df_combine = df_act.join(df_sub, df_act.UserID == df_sub.UserID).drop(df_sub.UserID)
df_combine.show()


+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+
|UserID|          EventTime|EventType|FeatureUsed|SubscriptionID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|        SUB001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|
|  U002|2024-04-08 11:10:00|   upload|    Reports|        SUB002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|
|  U003|2024-04-09 09:45:00| download|  Analytics|        SUB003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|        SUB004| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|
|  U004|2024-04-11 12:00:00|    login|  Dashboard|        SUB005|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|


In [0]:
df_active = df_combine.withColumn("Active Days", datediff("EndDate","StartDate"))
df_active.show()

+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+-----------+
|UserID|          EventTime|EventType|FeatureUsed|SubscriptionID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|Active Days|
+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+-----------+
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|        SUB001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|         91|
|  U002|2024-04-08 11:10:00|   upload|    Reports|        SUB002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|         90|
|  U003|2024-04-09 09:45:00| download|  Analytics|        SUB003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|         92|
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|        SUB004| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|         91|
|  U004|2024-04-11 12:00:00|    login|  D

In [0]:
df_eve = df_active.groupBy("UserID").agg(count("EventType").alias("events_per_user"))
df_eve.show()

+------+---------------+
|UserID|events_per_user|
+------+---------------+
|  U004|              1|
|  U002|              1|
|  U003|              1|
|  U001|              4|
+------+---------------+



In [0]:
df_score = df_active.join(df_eve, df_active.UserID == df_eve.UserID).withColumn("engagement_score",(col("events_per_user")/col("Active Days"))*col("PriceUSD"))
df_score.show()

+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+-----------+------+---------------+------------------+
|UserID|          EventTime|EventType|FeatureUsed|SubscriptionID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|Active Days|UserID|events_per_user|  engagement_score|
+------+-------------------+---------+-----------+--------------+--------+----------+----------+--------+--------+---------+-----------+------+---------------+------------------+
|  U001|2024-04-10 16:00:00|   logout|  Dashboard|        SUB001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|         91|  U001|              4|1.3186813186813189|
|  U002|2024-04-08 11:10:00|   upload|    Reports|        SUB002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|         90|  U002|              1|               1.0|
|  U003|2024-04-09 09:45:00| download|  Analytics|        SUB003|     Pro|2024-03-10|2024-06-10|    90.0|

In [0]:
df_sub.createOrReplaceTempView("subscriptions")
df_act.createOrReplaceTempView("user_activity")


In [0]:
spark.sql("""
CREATE OR REPLACE TEMP VIEW anomaly_inactive_active AS
SELECT DISTINCT s.UserID
FROM subscriptions s
JOIN user_activity u ON s.UserID = u.UserID
WHERE s.IsActive = false
""")

spark.sql("SELECT * FROM anomaly_inactive_active").show()


+------+
|UserID|
+------+
|  U004|
|  U003|
+------+



In [0]:
spark.sql("""
CREATE OR REPLACE TEMP VIEW anomaly_autorenew_noactivity AS
SELECT s.UserID
FROM subscriptions s
LEFT JOIN user_activity u ON s.UserID = u.UserID
WHERE s.AutoRenew = true
GROUP BY s.UserID
HAVING MAX(u.EventTime) < current_date() - INTERVAL 30 DAYS
""")

spark.sql("SELECT * FROM anomaly_autorenew_noactivity").show()


+------+
|UserID|
+------+
|  U001|
+------+



In [0]:
df_sub.write.format("delta").mode("overwrite").saveAsTable("subscriptions_delta")

spark.sql("""
MERGE INTO subscriptions_delta t
USING (
  SELECT * FROM subscriptions_delta
  WHERE PlanType = 'Pro' AND month(StartDate) = 3
) updates
ON t.SubscriptionID = updates.SubscriptionID
WHEN MATCHED THEN
  UPDATE SET t.PriceUSD = t.PriceUSD + 5
""")

spark.sql("SELECT * FROM subscriptions_delta").show()


+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|
|        SUB005|  U004|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    95.0|   false|    false|
+--------------+------+--------+----------+----------+--------+--------+---------+



In [0]:
spark.sql('''
DESCRIBE HISTORY subscriptions_delta
''').show()

+-------+-------------------+----------------+--------------------+--------------------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|          userId|            userName|           operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+----------------+--------------------+--------------------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      2|2025-06-16 11:31:27|6267232536945943|azuser3559_mml.lo...|            OPTIMIZE|{predicate -> [],...|NULL|{2601386153271675}|0612-123310-2108yh11|          1|SnapshotIsolation|        false|{numRemovedFiles ...|        NULL|Databr

In [0]:
spark.sql('''
SELECT * FROM subscriptions_delta VERSION AS OF 0
''').show()

+--------------+------+--------+----------+----------+--------+--------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|
+--------------+------+--------+----------+----------+--------+--------+---------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|
|        SUB005|  U004|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|
+--------------+------+--------+----------+----------+--------+--------+---------+



In [0]:
from pyspark.sql.window import Window

m_window = Window.partitionBy("UserID").orderBy("StartDate")

df_mig = df_sub.withColumn("PrevPlan", lag("PlanType").over(m_window))

df_mig.filter((col("PrevPlan") == "Basic") & (col("PlanType") == "Pro")).display()


SubscriptionID,UserID,PlanType,StartDate,EndDate,PriceUSD,IsActive,AutoRenew,PrevPlan


In [0]:
df_features = df_act.select("UserID", "FeatureUsed").distinct().groupBy("UserID").count().withColumnRenamed("count", "unique_features")


In [0]:

df_log = df_act.filter(col("EventType") == "login").groupBy("UserID").count().withColumnRenamed("count", "login_count")


In [0]:
df_power = df_features.join(df_log, "UserID", "inner").filter((col("unique_features") >= 2) & (col("login_count") >= 3))

df_power.write.format("delta").mode("overwrite").saveAsTable("power_users")


In [0]:

session_window = Window.partitionBy("UserID").orderBy("EventTime")

df_session = df_act.withColumn("prev_event", lag("EventType").over(session_window)).withColumn("prev_time", lag("EventTime").over(session_window)).withColumn("duration", (unix_timestamp("EventTime") - unix_timestamp("prev_time")) / 60).filter((col("prev_event") == "login") & (col("EventType") == "logout"))

df_session.select("UserID", "prev_time", "EventTime", "duration").display()


UserID,prev_time,EventTime,duration
U001,2024-04-07T10:22:00Z,2024-04-10T16:00:00Z,4658.0
