In [0]:
spark.conf.set(
  "fs.azure.account.key.meenakshi04.blob.core.windows.net",
  "FGOU2xJ7nBFGsQ5szV+AStTE4HyQ=="
)


In [0]:
subscriptions_path = "wasbs://images@meenakshi04.blob.core.windows.net/subscriptions.csv"
activity_path = "wasbs://images@meenakshi04.blob.core.windows.net/user_activity.csv"

subscriptions = spark.read.option("header", True).option("inferSchema", True).csv(subscriptions_path)
activity = spark.read.option("header", True).option("inferSchema", True).csv(activity_path)

subscriptions = subscriptions.withColumn("StartDate", to_date("StartDate")) \
                             .withColumn("EndDate", to_date("EndDate")) \
                             .withColumn("PriceUSD", col("PriceUSD").cast("double"))

activity = activity.withColumn("EventTime", to_timestamp("EventTime"))


In [0]:
from pyspark.sql.functions import *

subscriptions = subscriptions.withColumn("active_days", datediff("EndDate", "StartDate"))
events_per_user = activity.groupBy("UserID").count().withColumnRenamed("count", "events_per_user")

engagement_df = subscriptions.join(events_per_user, on="UserID", how="left").fillna(0)
engagement_df = engagement_df.withColumn("engagement_score", (col("events_per_user") / col("active_days")) * col("PriceUSD"))

engagement_df.select("SubscriptionID", "UserID", "engagement_score").show()


+--------------+------+------------------+
|SubscriptionID|UserID|  engagement_score|
+--------------+------+------------------+
|        SUB001|  U001|0.6593406593406594|
|        SUB002|  U002|               1.0|
|        SUB003|  U003|0.9782608695652174|
|        SUB004|  U001|2.6373626373626378|
|        SUB005|  U004|0.3296703296703297|
+--------------+------+------------------+



##  B. Anomaly Detection via **SQL**

In [0]:
subscriptions.createOrReplaceTempView("subscriptions")
activity.createOrReplaceTempView("activity")


In [0]:
spark.sql("""
SELECT DISTINCT a.UserID
FROM activity a
JOIN subscriptions s ON a.UserID = s.UserID
WHERE s.IsActive = 'false' AND to_date(a.EventTime) > s.EndDate
""").show()


+------+
|UserID|
+------+
+------+



In [0]:
spark.sql("""
SELECT s.UserID
FROM subscriptions s
LEFT JOIN activity a
  ON s.UserID = a.UserID AND to_date(a.EventTime) >= current_date() - 30
WHERE s.AutoRenew = 'true'
GROUP BY s.UserID
HAVING COUNT(a.EventType) = 0
""").show()


+------+
|UserID|
+------+
|  U001|
+------+



## C. Delta Lake + Merge Simulation (Billing Fix)****

In [0]:
from delta.tables import *

subscriptions.write.format("delta").mode("overwrite").save("/tmp/delta/subscriptions")

delta_subs = DeltaTable.forPath(spark, "/tmp/delta/subscriptions")

delta_subs.alias("tgt").merge(
    subscriptions.filter("PlanType = 'Pro' AND month(StartDate) = 3").alias("src"),
    "tgt.SubscriptionID = src.SubscriptionID"
).whenMatchedUpdate(set={"PriceUSD": "src.PriceUSD + 5"}).execute()


## D. Time Travel **Debugging**

In [0]:
spark.sql("DESCRIBE HISTORY delta.`/tmp/delta/subscriptions`").show(truncate=False)

df_old = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/subscriptions")
df_old.select("SubscriptionID", "PlanType", "PriceUSD").show()


+-------+-------------------+----------------+----------------------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## E. Tier Migration Tracking (Basic → Pro → Premium)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

windowSpec = Window.partitionBy("UserID").orderBy("StartDate")
subs_with_lag = subscriptions.withColumn("prev_plan", lag("PlanType").over(windowSpec))

subs_with_lag.filter("prev_plan = 'Basic' AND PlanType = 'Pro'").show()
subs_with_lag.filter("prev_plan = 'Pro' AND PlanType = 'Premium'").show()


+--------------+------+--------+----------+----------+--------+--------+---------+-----------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|active_days|prev_plan|
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+---------+
|        SUB006|  U001|     Pro|2024-03-15|2024-04-05|    90.0|    true|    false|         21|    Basic|
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+---------+

+--------------+------+--------+----------+----------+--------+--------+---------+-----------+---------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|active_days|prev_plan|
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+---------+
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|         91|      Pro|
+--------------+------+--------+----------+----------+

## F. Power Users **Detection**

In [0]:
from pyspark.sql.functions import countDistinct

feature_counts = activity.groupBy("UserID").agg(countDistinct("FeatureUsed").alias("feature_count"))
login_counts = activity.filter(col("EventType") == "login").groupBy("UserID").count().withColumnRenamed("count", "login_count")

power_users = feature_counts.join(login_counts, "UserID") \
                            .filter((col("feature_count") >= 2) & (col("login_count") >= 3))

power_users.write.format("delta").mode("overwrite").save("/tmp/delta/power_users")
power_users.show()
feature_counts.show()
login_counts.show()


+------+-------------+-----------+
|UserID|feature_count|login_count|
+------+-------------+-----------+
+------+-------------+-----------+

+------+-------------+
|UserID|feature_count|
+------+-------------+
|  U004|            1|
|  U002|            1|
|  U003|            1|
|  U001|            1|
+------+-------------+

+------+-----------+
|UserID|login_count|
+------+-----------+
|  U004|          1|
|  U001|          2|
+------+-----------+

