In [0]:
# ==========================================================
# OTT ANALYTICS PIPELINE — WORKING VERSION
# CAST-ERROR COLUMNS BYPASSED
# ==========================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("OTT_Analytics_Working_Pipeline").getOrCreate()

# --------------------------
# 1️⃣ BRONZE — RAW DATA LOAD
# --------------------------
try:
    bronze_df = spark.table("workspace.default.ott_user_behavior_1")
    print("✅ Loaded Bronze table from Catalog")
except:
    bronze_df = spark.read.option("header", "true").option("inferSchema", "true") \
        .csv("/FileStore/tables/ott_user_behavior_1.csv")
    print("⚠️ Loaded fallback CSV")

# --------------------------
# 2️⃣ SILVER — CLEANING
# --------------------------
# Only numeric columns that are safe
safe_numeric_cols = [
    "age", "avg_watch_time_per_day", "total_watch_time",
    "watch_duration", "completion_rate", "rating_given",
    "total_sessions_per_week", "monthly_spend", "churn_flag"
]

for c in safe_numeric_cols:
    if c in bronze_df.columns:
        bronze_df = bronze_df.withColumn(c, col(c).cast(DoubleType()))

bronze_df = bronze_df.fillna({c: 0.0 for c in safe_numeric_cols if c in bronze_df.columns})

# Convert watch_date to timestamp if exists
if "watch_date" in bronze_df.columns:
    silver_df = bronze_df.withColumn("watch_ts", to_timestamp(col("watch_date"), "yyyy-MM-dd HH:mm:ss"))
else:
    silver_df = bronze_df

silver_df.write.mode("overwrite").format("delta").saveAsTable("workspace.default.silver_ott_user_behavior")
print("✅ Silver table created")

# --------------------------
# 3️⃣ GOLD — USE CASES
# --------------------------

# (1) Viewing Pattern Analysis
viewing_pattern = (
    silver_df.groupBy("region", "subscription_type")
    .agg(
        round(avg("avg_watch_time_per_day"), 2).alias("avg_watch_time_per_day"),
        round(avg("total_watch_time"), 2).alias("avg_total_watch_time"),
        countDistinct("user_id").alias("unique_users")
    )
)
viewing_pattern.write.mode("overwrite").format("delta").saveAsTable("workspace.default.gold_viewing_pattern")
print("✅ gold_viewing_pattern")

# (2) Content Recommendation
if {"content_id", "rating_given", "genre", "content_title"}.issubset(set(silver_df.columns)):
    content_stats = (
        silver_df.groupBy("content_id", "genre", "content_title")
        .agg(
            round(avg("rating_given"), 2).alias("avg_rating"),
            count("user_id").alias("num_watchers")
        )
    )
    content_stats.write.mode("overwrite").format("delta").saveAsTable("workspace.default.gold_content_stats")
    print("✅ gold_content_stats")

# (3) Churn Prediction
if "churn_flag" in silver_df.columns:
    features = [
        "avg_watch_time_per_day", "completion_rate",
        "monthly_spend", "total_sessions_per_week"
    ]
    available_features = [f for f in features if f in silver_df.columns]

    df_churn = silver_df.select(*(available_features + ["churn_flag"])).dropna()
    assembler = VectorAssembler(inputCols=available_features, outputCol="features")
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    lr = LogisticRegression(featuresCol="scaled_features", labelCol="churn_flag")
    model = Pipeline(stages=[assembler, scaler, lr]).fit(df_churn)

    preds = model.transform(df_churn)
    auc = BinaryClassificationEvaluator(labelCol="churn_flag", metricName="areaUnderROC").evaluate(preds)
    print(f"✅ Churn Model trained | AUC = {auc:.3f}")

    preds.select("avg_watch_time_per_day", "monthly_spend", "completion_rate",
                 "probability", "prediction", "churn_flag") \
         .write.mode("overwrite").format("delta").saveAsTable("workspace.default.gold_churn_predictions")

# (4) Regional Content Demand
if {"region", "genre", "watch_ts"}.issubset(set(silver_df.columns)):
    df_region = silver_df.withColumn("watch_month", date_format(col("watch_ts"), "yyyy-MM"))
    regional_demand = (
        df_region.groupBy("region", "genre", "watch_month")
        .agg(
            round(sum("total_watch_time"), 2).alias("sum_watch_time"),
            countDistinct("user_id").alias("unique_users"),
            round(avg("rating_given"), 2).alias("avg_rating")
        )
    )
    regional_demand.write.mode("overwrite").format("delta").saveAsTable("workspace.default.gold_regional_demand")
    print("✅ gold_regional_demand")

# --------------------------
# 4️⃣ SUMMARY
# --------------------------
print("""
==========================================================
🎬 OTT ANALYTICS PIPELINE COMPLETED (SAFE VERSION)
----------------------------------------------------------
🥈 Silver: silver_ott_user_behavior
🥇 Gold Tables:
    - gold_viewing_pattern
    - gold_content_stats
    - gold_churn_predictions
    - gold_regional_demand
⚠️ Peak traffic analysis skipped (unsafe columns)
==========================================================
""")


✅ Loaded Bronze table from Catalog
✅ Silver table created
✅ gold_viewing_pattern
✅ gold_content_stats
✅ Churn Model trained | AUC = 0.509
✅ gold_regional_demand

🎬 OTT ANALYTICS PIPELINE COMPLETED (SAFE VERSION)
----------------------------------------------------------
🥈 Silver: silver_ott_user_behavior
🥇 Gold Tables:
    - gold_viewing_pattern
    - gold_content_stats
    - gold_churn_predictions
    - gold_regional_demand
⚠️ Peak traffic analysis skipped (unsafe columns)



In [0]:
%sql
SHOW TABLES IN workspace.default;


database,tableName,isTemporary
default,gold_churn_predictions,False
default,gold_content_stats,False
default,gold_regional_demand,False
default,gold_viewing_pattern,False
default,ott_churn_predictions,False
default,ott_device_usage_summary,False
default,ott_genre_rating_summary,False
default,ott_genre_trend_summary,False
default,ott_peak_traffic_summary,False
default,ott_region_total_demand,False


In [0]:
%sql
SELECT * FROM workspace.default.gold_viewing_pattern
LIMIT 20;


region,subscription_type,avg_watch_time_per_day,avg_total_watch_time,unique_users
Maharashtra,Standard,102.22,5721.97,208
Uttar Pradesh,Premium,106.86,5892.33,179
Kerala,Basic,105.28,6138.37,212
Bihar,Standard,109.92,5948.74,206
Uttar Pradesh,Basic,101.72,5744.02,190
Maharashtra,Basic,111.3,6022.0,191
Bihar,Premium,102.21,5484.07,245
Karnataka,Standard,112.15,6114.73,222
West Bengal,Standard,95.49,5211.03,200
Kerala,Standard,107.77,6053.64,206


In [0]:
%sql
CREATE OR REPLACE TABLE default.gold_viewing_pattern AS
SELECT * FROM workspace.default.gold_viewing_pattern;

CREATE OR REPLACE TABLE default.silver_ott_user_behavior AS
SELECT * FROM workspace.default.silver_ott_user_behavior;


num_affected_rows,num_inserted_rows


In [0]:
%sql
SHOW TABLES IN default;


database,tableName,isTemporary
default,gold_churn_predictions,False
default,gold_content_stats,False
default,gold_regional_demand,False
default,gold_viewing_pattern,False
default,ott_churn_predictions,False
default,ott_device_usage_summary,False
default,ott_genre_rating_summary,False
default,ott_genre_trend_summary,False
default,ott_peak_traffic_summary,False
default,ott_region_total_demand,False
