In [0]:
# =====================================================
# USE CASE 3: Real-Time Personalized Retention Offer Triggering
# Project: Viewer Churn Prediction for OTT Platforms
# Platform: Databricks (Free Edition)
# Author: Erugurala Teja (24MBMB19)
# =====================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, current_timestamp, regexp_replace, lower, length
import datetime

# Step 1️⃣: Initialize Spark Session
spark = SparkSession.builder.appName("OTT_Retention_Offers_Streaming_Final").getOrCreate()


In [0]:
# Step 2️⃣: Define Input and Output Paths
input_path = "/Volumes/workspace/default/dataset/ott_reviews.csv"
temp_stream_input = f"/Volumes/workspace/default/dataset/stream_input_batch_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_stream_path = "/Volumes/workspace/default/dataset/ott_retention_offers_stream/"
checkpoint_path = "/Volumes/workspace/default/dataset/ott_retention_checkpoint/"
final_export_path = "/Volumes/workspace/default/dataset/ott_retention_offers_export"


In [0]:
print("✅ PATHS SET SUCCESSFULLY")
print(f"Input CSV: {input_path}")
print(f"Temporary Stream Input Folder: {temp_stream_input}")
print(f"Output Stream Folder: {output_stream_path}")
print(f"Export Folder: {final_export_path}")


✅ PATHS SET SUCCESSFULLY
Input CSV: /Volumes/workspace/default/dataset/ott_reviews.csv
Temporary Stream Input Folder: /Volumes/workspace/default/dataset/stream_input_batch_20251111_185643
Output Stream Folder: /Volumes/workspace/default/dataset/ott_retention_offers_stream/
Export Folder: /Volumes/workspace/default/dataset/ott_retention_offers_export


In [0]:
# =====================================================
# Step 3️⃣: Load the Base OTT Reviews Dataset
# =====================================================
df = spark.read.csv(input_path, header=True, inferSchema=True)
print(f"✅ Loaded {df.count()} records from {input_path}")
df.printSchema()

✅ Loaded 6000 records from /Volumes/workspace/default/dataset/ott_reviews.csv
root
 |-- app_name: string (nullable = true)
 |-- reviewId: string (nullable = true)
 |-- userName: string (nullable = true)
 |-- content: string (nullable = true)
 |-- score: string (nullable = true)
 |-- at: string (nullable = true)



In [0]:
# Step 4️⃣: Basic Cleaning
df_clean = df.dropna(subset=["app_name", "score", "content"])
df_clean = df_clean.withColumn("content", regexp_replace("content", "[^a-zA-Z ]", " "))
df_clean = df_clean.withColumn("content", lower(col("content")))
df_clean = df_clean.filter(length(col("content")) > 20)


In [0]:
# Step 5️⃣: Select required columns and save as a temporary streaming batch (simulate real-time data)
df_clean.select("app_name", "userName", "score", "content").write.mode("overwrite").option("header", True).csv(temp_stream_input)
print(f"✅ Temporary streaming batch created at: {temp_stream_input}")


✅ Temporary streaming batch created at: /Volumes/workspace/default/dataset/stream_input_batch_20251111_185643


In [0]:
# =====================================================
# Step 6️⃣: Define Streaming Source (Simulated)
# =====================================================
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("app_name", StringType(), True),
    StructField("userName", StringType(), True),
    StructField("score", DoubleType(), True),
    StructField("content", StringType(), True)
])

stream_df = (
    spark.readStream
        .option("header", True)
        .schema(schema)
        .csv(temp_stream_input)
)

In [0]:
# =====================================================
# Step 7️⃣: Process Stream — Detect At-Risk Users & Trigger Offers
# =====================================================
offers_df = (
    stream_df.withColumn(
        "risk_status",
        when(col("score") <= 2, "At Risk").otherwise("Retained")
    ).withColumn(
        "offer_message",
        when(col("risk_status") == "At Risk", "🎁 Offer: 50% Off Next Month!").otherwise("Thank you for staying with us 💖")
    ).withColumn(
        "trigger_time", current_timestamp()
    )
)

In [0]:
# =====================================================
# Step 8️⃣: WriteStream with Trigger Once (Free Edition Compatible)
# =====================================================
query = (
    offers_df.writeStream
        .format("csv")
        .option("path", output_stream_path)
        .option("checkpointLocation", checkpoint_path)
        .option("header", True)
        .outputMode("append")
        .trigger(once=True)
        .start()
)

print("✅ Streaming job started — processing simulated OTT data once.")
query.awaitTermination()
print("✅ Streaming job completed successfully!")

✅ Streaming job started — processing simulated OTT data once.
✅ Streaming job completed successfully!


In [0]:
# =====================================================
# Step 9️⃣: Read Stream Output (Filter Valid Files Only)
# =====================================================
files = [f.path for f in dbutils.fs.ls(output_stream_path) if f.name.startswith("part") and f.name.endswith(".csv")]

if len(files) == 0:
    print("⚠️ No streaming output found. Check the previous step.")
else:
    from pyspark.sql.types import StructType, StructField, StringType

    output_schema = StructType([
        StructField("app_name", StringType(), True),
        StructField("userName", StringType(), True),
        StructField("score", StringType(), True),
        StructField("content", StringType(), True),
        StructField("risk_status", StringType(), True),
        StructField("offer_message", StringType(), True),
        StructField("trigger_time", StringType(), True)
    ])

    offers_output = (
        spark.read
            .option("header", True)
            .schema(output_schema)
            .csv(files)
            .filter(col("app_name").isNotNull())
    )

    print(f"✅ Total Offers Generated: {offers_output.count()}")
    display(offers_output)

✅ Total Offers Generated: 3051


app_name,userName,score,content,risk_status,offer_message,trigger_time
Netflix,Sa'adatu Bashir,5.0,am sure netflix is a very good app,Retained,Thank you for staying with us 💖,2025-11-11T18:58:56.184Z
Netflix,Dim Sum,1.0,nothing worth watching waste of money,At Risk,🎁 Offer: 50% Off Next Month!,2025-11-11T18:58:56.184Z
Netflix,Rohan Tiwari,1.0,i had an issue related to video quality of the app but the representative didn t even bother to tell me her name and told me to contact my isp i mean what every other thing is working just fine all the other ott apps on my phone and everybody s phone in my house i m the only one who used netflix now i ve uninstall it obviously and ended the subscription tell your customer care people to give their names to people so people can address them and fix bugs in your app instead of blaming isp,At Risk,🎁 Offer: 50% Off Next Month!,2025-11-11T18:58:56.184Z
Netflix,El Dio,,it s annoying that i cannot watch continuously without having to reset my phonebecause of the error,Retained,Thank you for staying with us 💖,2025-11-11T18:58:56.184Z
Netflix,N,1.0,oct s update doesn t even allow users to open netflix wont go past the splashscreen aug update no longer enables control over literal video resolution i m for sure canceling if this doesn t get fixed,At Risk,🎁 Offer: 50% Off Next Month!,2025-11-11T18:58:56.184Z
Netflix,Eli Aaron,5.0,best app i love netflix it is cheap,Retained,Thank you for staying with us 💖,2025-11-11T18:58:56.184Z
Netflix,Sara Costa,2.0,sinceramente uma falta de respeito com os assinantes a gente paga todo m s e ainda assim precisa lidar com essa enrola o de ter que pagar mais pra assistir certas s ries e filmes bloquear conte dos pra quem tem o plano com an ncios absurdo o m nimo seria oferecer igualdade de acesso j que todos est o pagando decepcionante ver uma empresa desse porte tratar os assinantes dessa forma,At Risk,🎁 Offer: 50% Off Next Month!,2025-11-11T18:58:56.184Z
Netflix,Jake Hendel,2.0,let people use an account within the same family you will lose subscribers like myself not gain an additional if you continue to enforce this awful junk,At Risk,🎁 Offer: 50% Off Next Month!,2025-11-11T18:58:56.184Z
Netflix,Donna Shields,5.0,full k movies ultra hd p movies,Retained,Thank you for staying with us 💖,2025-11-11T18:58:56.184Z
Netflix,Dawn Espinoza,1.0,what is the point of paying subscription when every episodes needs to be stopped and won t turn up after closing this app for about tenth times tsk and the endings are mostly unsatisfying not working properly glitches none stop,At Risk,🎁 Offer: 50% Off Next Month!,2025-11-11T18:58:56.184Z


In [0]:
# =====================================================
# Step 🔟: Export Clean Final CSV to Volumes for Viva (No Indentation Errors)
# =====================================================

export_path = "/Volumes/workspace/default/dataset/ott_retention_offers_export"

if len(files) == 0:
    print("⚠️ No streaming part files found to export. Please re-run the stream.")
else:
    # Export single CSV file
    offers_output.coalesce(1).write.mode("overwrite").option("header", True).csv(export_path)

    print("✅ Final Offers Exported Successfully!")
    print("📂 Download Instructions:")
    print("1️⃣  Go to: Data ➜ Volumes ➜ workspace ➜ default ➜ dataset ➜ ott_retention_offers_export")
    print("2️⃣  Inside, open the folder and find a file like: part-00000-xxxxx.csv")
    print("3️⃣  Right-click ➜ Download")
    print("===============================================")

    print("📈 HUMANIZED OUTPUT SUMMARY")
    offers_output.groupBy("risk_status").count().show()
    print("===============================================")


✅ Final Offers Exported Successfully!
📂 Download Instructions:
1️⃣  Go to: Data ➜ Volumes ➜ workspace ➜ default ➜ dataset ➜ ott_retention_offers_export
2️⃣  Inside, open the folder and find a file like: part-00000-xxxxx.csv
3️⃣  Right-click ➜ Download
📈 HUMANIZED OUTPUT SUMMARY
+-----------+-----+
|risk_status|count|
+-----------+-----+
|   Retained|  825|
|    At Risk| 2226|
+-----------+-----+

