# StreamSense — Hit Predictor Demo

**Objective:**  
Showcase the trained StreamSense model in action through interactive *What-If* predictions and visual insights.

This notebook demonstrates how the model can estimate a title’s likelihood of becoming a “hit” based on its metadata — such as category, rating, duration, release year, and country.  
It also provides a few visual analytics to explore how hit rates vary across the Netflix catalogue.

---

### Scope
- Load the cleaned dataset (`netflix_clean`) and latest MLflow-logged model  
- Implement a simple **What-If Prediction Helper** to score hypothetical titles  
- Run example scenarios to test how metadata changes affect hit probability  
- Generate visual insights on hit trends by category, rating, and release year  

---

### Outcome
An interactive, demo-ready notebook that:
- Lets users test hypothetical titles and instantly see predicted hit probabilities  
- Displays key trends from the dataset for storytelling and presentation  
- Serves as the final showcase notebook in the StreamSense workflow:
  1. Data ingestion  
  2. Feature engineering  
  3. Model training & tracking  
  4. **Hit predictor demo**

### Load netflix_clean and preview

In [0]:
from pyspark.sql import functions as F

# Load cleaned Delta table from Notebook 02
df_spark = spark.table("netflix_clean")

print(f"Rows: {df_spark.count():,}")
print("Columns:", df_spark.columns)

display(df_spark.limit(5))

## Load latest model from MLflow

In [0]:
import mlflow
import mlflow.sklearn
import pandas as pd

# Retrieve experiment
experiment = mlflow.get_experiment_by_name("/Shared/StreamSense_Experiments")
assert experiment is not None, "Experiment '/Shared/StreamSense_Experiments' not found."

# Get most recent run
runs_df = mlflow.search_runs(
    experiment_ids=[experiment.experiment_id],
    order_by=["start_time DESC"],
    max_results=1
)

assert len(runs_df) > 0, "No runs found in the experiment."

latest_run_id = runs_df.iloc[0].run_id
model_uri = f"runs:/{latest_run_id}/model"

print("Loading model from:", model_uri)
loaded_model = mlflow.sklearn.load_model(model_uri)

### WHAT-IF PREDICTION HELPER

In [0]:
# Feature configuration must match training
numeric_features = ["release_year", "duration_num", "is_movie"]
categorical_features = ["category", "rating", "country"]
feature_cols = numeric_features + categorical_features

def predict_hit_probability(
    category: str,
    rating: str,
    release_year: int,
    duration_num: int,
    is_movie: int,
    country: str
):
    """
    Build a one-row DataFrame for a hypothetical title and return predicted hit probability.
    """
    data = {
        "category": [category],
        "rating": [rating],
        "release_year": [release_year],
        "duration_num": [duration_num],
        "is_movie": [is_movie],
        "country": [country],
    }
    input_df = pd.DataFrame(data, columns=feature_cols)
    
    proba = loaded_model.predict_proba(input_df)[0, 1]
    pred_class = loaded_model.predict(input_df)[0]
    
    print("Input:", data)
    print(f"Predicted hit probability: {proba:.2%}")
    print("Predicted class:", "HIT (1)" if pred_class == 1 else "NON-HIT (0)")
    
    return proba, pred_class

## Visual Exploration

Before diving into interactive predictions, this section explores overall trends in the Netflix dataset.

We visualise key dimensions that correlate with a title’s hit probability:

- Hit rate by **category** (Movies vs TV Shows)
- Hit rate by **rating** (content maturity)
- Hit rate over **release year** (temporal trends)
- **Feature importances** from the trained model

These charts help contextualise the model’s predictions and reveal the main factors influencing a title’s success.

In [0]:
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# Aggregate hit rate by category
hit_by_category = (
    df_spark
    .groupBy("category")
    .agg(
        F.avg("is_hit").alias("hit_rate"),
        F.count("*").alias("count")
    )
    .orderBy(F.col("hit_rate").desc())
)

# Convert to pandas for plotting
pdf_cat = hit_by_category.toPandas()

# Drop rows with null category to avoid matplotlib TypeError
pdf_cat = pdf_cat[pdf_cat["category"].notna()]

# (Optional but safe) cast to string for labels
pdf_cat["category"] = pdf_cat["category"].astype(str)

# Plot
plt.figure(figsize=(8, 4))
plt.bar(pdf_cat["category"], pdf_cat["hit_rate"])
plt.xlabel("Category")
plt.ylabel("Hit rate")
plt.title("Hit rate by category")
plt.xticks(rotation=30, ha="right")
plt.tight_layout()

# Save alongside the notebook in workspace
plt.savefig(
    "hit_rate_by_category.png",
    dpi=180,
    bbox_inches="tight"
)

plt.show()

### Hit rate by Release Year

In [0]:
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# Aggregate hit rate by release year
hit_by_year = (
    df_spark
    .groupBy("release_year")
    .agg(
        F.avg("is_hit").alias("hit_rate"),
        F.count("*").alias("count")
    )
    .filter(F.col("release_year").isNotNull())
    .orderBy("release_year")
)

pdf_year = hit_by_year.toPandas()

plt.figure(figsize=(9, 4))
plt.plot(pdf_year["release_year"], pdf_year["hit_rate"], marker="o")
plt.xlabel("Release year")
plt.ylabel("Hit rate")
plt.title("Hit rate over time (by release year)")
plt.tight_layout()

plt.savefig(
    "hit_rate_by_year.png",
    dpi=180,
    bbox_inches="tight"
)

plt.show()

### Feature Importance Table

In [0]:
# numeric_features and categorical_features already defined above

preprocessor = loaded_model.named_steps["preprocessor"]
ohe = preprocessor.named_transformers_["cat"]
encoded_cat_features = ohe.get_feature_names_out(categorical_features)

all_features = numeric_features + list(encoded_cat_features)

clf = loaded_model.named_steps["classifier"]
importances = clf.feature_importances_

feat_imp = (
    pd.DataFrame({"feature": all_features, "importance": importances})
    .sort_values("importance", ascending=False)
)

display(feat_imp.head(15))

### Feature importance plot + PNG export

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# If not already defined earlier in the notebook, uncomment these:
# numeric_features = ["release_year", "duration_num", "is_movie"]
# categorical_features = ["category", "rating", "country"]

# Extract feature names from the preprocessing pipeline
preprocessor = loaded_model.named_steps["preprocessor"]
ohe = preprocessor.named_transformers_["cat"]
encoded_cat_features = ohe.get_feature_names_out(categorical_features)

all_features = numeric_features + list(encoded_cat_features)

# Extract feature importances from the classifier
clf = loaded_model.named_steps["classifier"]
importances = clf.feature_importances_

feat_imp = (
    pd.DataFrame({"feature": all_features, "importance": importances})
    .sort_values("importance", ascending=False)
)

# Plot top N features
top_n = 15
top_feat = feat_imp.head(top_n).sort_values("importance")

plt.figure(figsize=(8, 6))
plt.barh(top_feat["feature"], top_feat["importance"])
plt.xlabel("Importance")
plt.title(f"Top {top_n} feature importances")
plt.tight_layout()

# Save PNG alongside your notebook (like the other plots)
plt.savefig(
    "feature_importance.png",
    dpi=180,
    bbox_inches="tight"
)

plt.show()

### Hit rate by Rating

In [0]:
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# Aggregate hit rate by rating
hit_by_rating = (
    df_spark
    .groupBy("rating")
    .agg(
        F.avg("is_hit").alias("hit_rate"),
        F.count("*").alias("count")
    )
    .orderBy(F.col("hit_rate").desc())
)

pdf_rating = hit_by_rating.toPandas()

# Drop nulls and cast to string (avoid matplotlib TypeError)
pdf_rating = pdf_rating[pdf_rating["rating"].notna()]
pdf_rating["rating"] = pdf_rating["rating"].astype(str)

plt.figure(figsize=(10, 4))
plt.bar(pdf_rating["rating"], pdf_rating["hit_rate"])
plt.xlabel("Rating")
plt.ylabel("Hit rate")
plt.title("Hit rate by rating")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()

plt.savefig(
    "hit_rate_by_rating.png",
    dpi=180,
    bbox_inches="tight"
)

plt.show()

### Hit rate by category and rating

In [0]:
# Hit rate by category
hit_by_category = (
    df_spark
    .groupBy("category")
    .agg(F.avg("is_hit").alias("hit_rate"), F.count("*").alias("count"))
    .orderBy(F.col("hit_rate").desc())
)
display(hit_by_category)

# Hit rate by rating
hit_by_rating = (
    df_spark
    .groupBy("rating")
    .agg(F.avg("is_hit").alias("hit_rate"), F.count("*").alias("count"))
    .orderBy(F.col("hit_rate").desc())
)
display(hit_by_rating)

### Hit rate over release year

In [0]:
hit_by_year = (
    df_spark
    .groupBy("release_year")
    .agg(F.avg("is_hit").alias("hit_rate"), F.count("*").alias("count"))
    .filter(F.col("release_year").isNotNull())
    .orderBy("release_year")
)
display(hit_by_year)

### Example scenarios

In [0]:
# Example 1: Modern, popular-style movie
predict_hit_probability(
    category="Movie",
    rating="TV-MA",
    release_year=2023,
    duration_num=110,
    is_movie=1,
    country="United States"
)

# Example 2: Older children’s show
predict_hit_probability(
    category="TV Show",
    rating="TV-Y7",
    release_year=2012,
    duration_num=2,
    is_movie=0,
    country="United Kingdom"
)

# Example 3: Recent family film
predict_hit_probability(
    category="Movie",
    rating="PG",
    release_year=2022,
    duration_num=95,
    is_movie=1,
    country="Canada"
)

## Persist aggregates for dashboards

In [0]:
from pyspark.sql import functions as F

# Recompute aggregates to be safe
hit_by_category = (
    df_spark.groupBy("category")
    .agg(F.avg("is_hit").alias("hit_rate"), F.count("*").alias("count"))
)

hit_by_rating = (
    df_spark.groupBy("rating")
    .agg(F.avg("is_hit").alias("hit_rate"), F.count("*").alias("count"))
)

hit_by_year = (
    df_spark.groupBy("release_year")
    .agg(F.avg("is_hit").alias("hit_rate"), F.count("*").alias("count"))
    .filter(F.col("release_year").isNotNull())
)

# Persist as Delta tables for dashboards
(
    hit_by_category.write.mode("overwrite").format("delta")
    .saveAsTable("streamsense_hit_by_category")
)
(
    hit_by_rating.write.mode("overwrite").format("delta")
    .saveAsTable("streamsense_hit_by_rating")
)
(
    hit_by_year.write.mode("overwrite").format("delta")
    .saveAsTable("streamsense_hit_by_year")
)

print("Saved: streamsense_hit_by_category, streamsense_hit_by_rating, streamsense_hit_by_year")

## Notebook Summary – 04 Hit Predictor Demo

**Objective**  
Demonstrate the StreamSense model through interactive predictions and visual insights.

**Key steps completed**

- Loaded the latest trained Random Forest model from MLflow  
- Implemented a *What-If* predictor to simulate hit probabilities for new titles  
- Explored hit rate patterns by **category**, **rating**, and **release year**  
- Generated feature importance scores to understand which signals the model relies on  
- Produced static plots (PNG) for use in the README and submission deck  

**Next steps (optional)**

- Wrap the what-if predictor in a lightweight UI (e.g. Streamlit or a Databricks dashboard)  
- Replace the heuristic `is_hit` label with one derived from external sources (IMDb, TMDb)  
- Enrich the model with text embeddings (descriptions, cast, director) for richer content understanding  