# Change Things: From Analysis to Intervention

> *"The people who think they can change the world are the ones who do"* - This tutorial transforms causal insights into real-world interventions that create impact.

## The Innovation Imperative

Most data science stops at analysis. You discover relationships, build models, write reports. **Innovators go further.** They use causal insights to design interventions that actually change outcomes in the real world.

### What You'll Master

- **Intervention design** based on causal mechanisms
- **Policy optimization** using causal insights
- **A/B testing** with causal principles
- **Adaptive interventions** that learn and improve
- **Impact measurement** for real-world changes

### The Challenge

Moving from "What causes what?" to "How do we change what?"
- Design interventions that actually work
- Navigate complex systems and unintended consequences
- Measure impact in messy real-world settings
- Scale successful interventions

**It's time to change things, not just analyze them!**

In [None]:
# Setup: The Change-Maker's Arsenal
import os
import sys
import warnings

warnings.filterwarnings("ignore")

# Add the project root to Python path
project_root = os.path.abspath(os.path.join(os.getcwd(), "../../../"))
if project_root not in sys.path:
    sys.path.insert(0, os.path.join(project_root, "libs"))

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.preprocessing import StandardScaler

# Our causal intervention toolkit
from causal_inference.core.base import CovariateData, OutcomeData, TreatmentData
from causal_inference.estimators.causal_forest import CausalForest

# Set up the intervention laboratory
np.random.seed(42)
plt.style.use("default")  # Use default style instead of deprecated seaborn style
sns.set_palette("Set1")

print("🔧 INTERVENTION LABORATORY: Activated")
print("💡 From Insights to Action: Ready")
print("🚀 Time to change the world!")

## Case Study 1: Designing Educational Interventions

**The Setting**: You've discovered that personalized tutoring has heterogeneous effects - it helps struggling students but may actually hurt high-performers who become over-dependent.

**The Challenge**: Design an intervention policy that maximizes overall learning outcomes.

**The Innovation**: Use causal insights to create smart, targeted interventions!

In [None]:
# Case Study 1: Educational Intervention Design


def generate_education_intervention_data(n_students=2000):
    """
    Generate student data with heterogeneous tutoring effects
    """
    # Student characteristics
    baseline_ability = np.random.normal(75, 15, n_students)  # Baseline test scores
    motivation = np.random.beta(2, 2, n_students)  # 0-1 scale
    ses_background = np.random.gamma(2, 3, n_students)  # Socioeconomic status
    prior_support = np.random.binomial(1, 0.4, n_students)  # Has support at home
    learning_style = np.random.choice(
        ["visual", "auditory", "kinesthetic"], n_students, p=[0.4, 0.35, 0.25]
    )

    # Treatment assignment (currently random, but we'll optimize this!)
    receives_tutoring = np.random.binomial(1, 0.3, n_students)  # 30% get tutoring

    # HETEROGENEOUS TREATMENT EFFECTS - The key insight!
    # Tutoring helps struggling students but may hurt high performers

    # Base effect depends on baseline ability (struggling students benefit more)
    ability_effect = 15 * (baseline_ability < 70) + 8 * (baseline_ability < 80) * (
        baseline_ability >= 70
    )

    # Motivation interaction (low motivation students benefit more from structure)
    motivation_effect = 10 * (motivation < 0.4) + 5 * (motivation < 0.7) * (
        motivation >= 0.4
    )

    # SES interaction (tutoring compensates for lack of home support)
    ses_effect = 8 * (ses_background < 3) + 4 * (ses_background < 6) * (
        ses_background >= 3
    )

    # NEGATIVE effects for high performers (over-dependence)
    overhelp_penalty = -5 * (baseline_ability > 85) * (motivation > 0.7)

    # Learning style match bonus
    style_bonus = 3 * (
        learning_style == "visual"
    )  # Visual learners benefit most from tutoring

    # Individual treatment effects
    individual_effects = (
        ability_effect + motivation_effect + ses_effect + overhelp_penalty + style_bonus
    )

    # Final test scores
    noise = np.random.normal(0, 8, n_students)
    test_score = (
        baseline_ability
        + individual_effects * receives_tutoring
        + 5 * motivation  # Motivation always helps
        + 2 * ses_background  # SES background effect
        + 3 * prior_support  # Prior support effect
        + noise
    )

    test_score = np.clip(test_score, 0, 100)  # Realistic test score range

    # Create learning style dummies
    style_dummies = pd.get_dummies(learning_style, prefix="style")

    data = pd.DataFrame(
        {
            "student_id": range(n_students),
            "baseline_ability": baseline_ability,
            "motivation": motivation,
            "ses_background": ses_background,
            "prior_support": prior_support,
            "receives_tutoring": receives_tutoring,
            "test_score": test_score,
            "true_effect": individual_effects,
            "learning_style": learning_style,
        }
    )

    # Add learning style dummies
    data = pd.concat([data, style_dummies], axis=1)

    return data


# Generate the data
edu_data = generate_education_intervention_data(2000)

print("🎓 Educational Intervention Dataset Generated")
print(f"👥 Students: {len(edu_data):,}")
print(f"📊 Current tutoring rate: {edu_data['receives_tutoring'].mean():.1%}")
print(
    f"🎭 Treatment effects range: {edu_data['true_effect'].min():.1f} to {edu_data['true_effect'].max():.1f} points"
)
print("\nData preview:")
print(
    edu_data[
        [
            "baseline_ability",
            "motivation",
            "receives_tutoring",
            "test_score",
            "true_effect",
        ]
    ].head()
)

# Analyze current random policy
current_avg_score = edu_data["test_score"].mean()
tutored_avg = edu_data[edu_data["receives_tutoring"] == 1]["test_score"].mean()
not_tutored_avg = edu_data[edu_data["receives_tutoring"] == 0]["test_score"].mean()
current_effect = tutored_avg - not_tutored_avg

print("\n📊 CURRENT RANDOM POLICY RESULTS:")
print(f"📈 Overall average score: {current_avg_score:.1f}")
print(f"🎓 Tutored students: {tutored_avg:.1f}")
print(f"📚 Non-tutored students: {not_tutored_avg:.1f}")
print(f"⚡ Naive treatment effect: {current_effect:.1f} points")

# Visualize the heterogeneity
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Treatment effects by baseline ability
ability_bins = pd.cut(edu_data["baseline_ability"], bins=10)
ability_effects = edu_data.groupby(ability_bins)["true_effect"].mean()
axes[0, 0].plot(
    range(len(ability_effects)), ability_effects.values, "o-", linewidth=2, markersize=8
)
axes[0, 0].axhline(y=0, color="red", linestyle="--", alpha=0.7)
axes[0, 0].set_title(
    "Treatment Effect by Baseline Ability\n(Negative for High Performers!)",
    fontweight="bold",
)
axes[0, 0].set_xlabel("Ability Decile")
axes[0, 0].set_ylabel("True Treatment Effect")
axes[0, 0].grid(True, alpha=0.3)

# Treatment effects by motivation
motivation_bins = pd.cut(
    edu_data["motivation"],
    bins=5,
    labels=["Very Low", "Low", "Medium", "High", "Very High"],
)
motivation_effects = edu_data.groupby(motivation_bins)["true_effect"].mean()
axes[0, 1].bar(motivation_effects.index, motivation_effects.values, alpha=0.8)
axes[0, 1].set_title("Treatment Effect by Motivation Level", fontweight="bold")
axes[0, 1].set_ylabel("True Treatment Effect")
axes[0, 1].tick_params(axis="x", rotation=45)

# Distribution of treatment effects
axes[1, 0].hist(edu_data["true_effect"], bins=30, alpha=0.7, edgecolor="black")
axes[1, 0].axvline(
    edu_data["true_effect"].mean(),
    color="red",
    linestyle="--",
    linewidth=2,
    label=f"Mean: {edu_data['true_effect'].mean():.1f}",
)
axes[1, 0].axvline(0, color="orange", linestyle="-", linewidth=2, label="No Effect")
negative_effects = (edu_data["true_effect"] < 0).mean()
axes[1, 0].set_title(
    f"Distribution of Treatment Effects\n({negative_effects:.1%} have negative effects!)",
    fontweight="bold",
)
axes[1, 0].set_xlabel("True Treatment Effect")
axes[1, 0].set_ylabel("Frequency")
axes[1, 0].legend()

# Learning style effects
style_effects = edu_data.groupby("learning_style")["true_effect"].mean()
axes[1, 1].bar(
    style_effects.index,
    style_effects.values,
    alpha=0.8,
    color=["skyblue", "lightcoral", "lightgreen"],
)
axes[1, 1].set_title("Treatment Effect by Learning Style", fontweight="bold")
axes[1, 1].set_ylabel("True Treatment Effect")

plt.tight_layout()
plt.show()

print("\n🎯 KEY INSIGHTS FOR INTERVENTION DESIGN:")
print(
    f"✅ Struggling students (ability < 70) benefit most: +{edu_data[edu_data['baseline_ability'] < 70]['true_effect'].mean():.1f} points"
)
print(
    f"⚠️ High performers (ability > 85) may be hurt: {edu_data[edu_data['baseline_ability'] > 85]['true_effect'].mean():.1f} points"
)
print(
    f"📊 {negative_effects:.1%} of students have negative treatment effects under current policy"
)
print("💡 Optimal policy should target tutoring based on student characteristics!")

In [None]:
# Step 1: Learn Treatment Effects Using Causal ML

print("🤖 STEP 1: Learning Individual Treatment Effects")
print("=" * 50)

# Prepare data for causal learning
feature_cols = [
    "baseline_ability",
    "motivation",
    "ses_background",
    "prior_support",
    "style_auditory",
    "style_kinesthetic",
    "style_visual",
]

treatment = TreatmentData(
    values=edu_data["receives_tutoring"],
    name="receives_tutoring",
    treatment_type="binary",
)

outcome = OutcomeData(
    values=edu_data["test_score"], name="test_score", outcome_type="continuous"
)

covariates = CovariateData(values=edu_data[feature_cols], names=feature_cols)

print("🎯 Training causal forest to predict individual treatment effects")
print(f"📊 Features: {len(feature_cols)} student characteristics")
print(f"👥 Sample: {len(edu_data)} students")

# Train causal forest
causal_forest = CausalForest(
    n_estimators=200, min_samples_leaf=20, bootstrap_samples=100
)

causal_forest.fit(treatment, outcome, covariates)

# Predict individual treatment effects using correct API
X_features = edu_data[feature_cols].values
predicted_effects, prediction_std = causal_forest.predict_cate(X_features)
edu_data["predicted_effect"] = predicted_effects

# Evaluate prediction quality
true_effects = edu_data["true_effect"].values
prediction_corr = np.corrcoef(predicted_effects, true_effects)[0, 1]
prediction_rmse = np.sqrt(np.mean((predicted_effects - true_effects) ** 2))

print("\n📈 TREATMENT EFFECT PREDICTION QUALITY:")
print(f"🎯 Correlation with true effects: r = {prediction_corr:.3f}")
print(f"📊 RMSE: {prediction_rmse:.2f} points")
print(
    f"💯 R²: {prediction_corr**2:.3f} (explains {prediction_corr**2 * 100:.1f}% of variation)"
)

if prediction_corr > 0.6:
    print("🏆 EXCELLENT prediction quality - ready to design optimal policy!")
elif prediction_corr > 0.4:
    print("✅ GOOD prediction quality - proceeding with optimization!")
else:
    print("⚠️ Moderate prediction quality - results may be approximate")

# Visualize prediction quality
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Predicted vs true effects
axes[0].scatter(true_effects, predicted_effects, alpha=0.6, s=30)
axes[0].plot(
    [true_effects.min(), true_effects.max()],
    [true_effects.min(), true_effects.max()],
    "r--",
    alpha=0.8,
    linewidth=2,
)
axes[0].set_title(
    f"Predicted vs True Effects\nr = {prediction_corr:.3f}", fontweight="bold"
)
axes[0].set_xlabel("True Treatment Effect")
axes[0].set_ylabel("Predicted Treatment Effect")
axes[0].grid(True, alpha=0.3)

# Distribution comparison
axes[1].hist(true_effects, bins=25, alpha=0.7, label="True Effects", density=True)
axes[1].hist(
    predicted_effects, bins=25, alpha=0.7, label="Predicted Effects", density=True
)
axes[1].axvline(0, color="red", linestyle="--", alpha=0.7, label="No Effect")
axes[1].set_title("Distribution Comparison", fontweight="bold")
axes[1].set_xlabel("Treatment Effect")
axes[1].set_ylabel("Density")
axes[1].legend()

plt.tight_layout()
plt.show()

print("\n💡 Now we can design optimal intervention policies using these predictions!")

In [None]:
# Step 2: Design Optimal Intervention Policy

print("🎯 STEP 2: Designing Optimal Intervention Policy")
print("=" * 50)


def evaluate_policy(data, treatment_threshold, budget_constraint=0.3):
    """
    Evaluate an intervention policy that treats students with predicted effect > threshold
    """
    # Apply policy: treat if predicted effect > threshold
    should_treat = data["predicted_effect"] > treatment_threshold

    # Check budget constraint
    treatment_rate = should_treat.mean()
    if treatment_rate > budget_constraint:
        # If over budget, treat top students by predicted effect
        n_to_treat = int(budget_constraint * len(data))
        top_students = data.nlargest(n_to_treat, "predicted_effect").index
        should_treat = data.index.isin(top_students)
        treatment_rate = budget_constraint

    # Calculate expected outcomes under this policy
    # For treated: baseline + treatment effect
    # For untreated: just baseline
    baseline_scores = (
        data["test_score"] - data["true_effect"] * data["receives_tutoring"]
    )  # Remove current treatment effect

    expected_scores = baseline_scores + data["true_effect"] * should_treat

    policy_results = {
        "treatment_rate": treatment_rate,
        "avg_score": expected_scores.mean(),
        "treated_avg": expected_scores[should_treat].mean()
        if should_treat.sum() > 0
        else 0,
        "untreated_avg": expected_scores[~should_treat].mean()
        if (~should_treat).sum() > 0
        else 0,
        "total_benefit": (data["true_effect"] * should_treat).sum(),
        "students_helped": (should_treat & (data["true_effect"] > 0)).sum(),
        "students_harmed": (should_treat & (data["true_effect"] < 0)).sum(),
    }

    return policy_results, should_treat


# Test different policy thresholds
print("🔍 Testing different intervention thresholds...")

thresholds = np.arange(-5, 15, 1)  # Range of effect thresholds
policy_results = []

for threshold in thresholds:
    results, _ = evaluate_policy(edu_data, threshold, budget_constraint=0.3)
    results["threshold"] = threshold
    policy_results.append(results)

policy_df = pd.DataFrame(policy_results)

# Find optimal policy
optimal_idx = policy_df["avg_score"].idxmax()
optimal_threshold = policy_df.loc[optimal_idx, "threshold"]
optimal_results, optimal_treatment = evaluate_policy(edu_data, optimal_threshold, 0.3)

print("\n🏆 OPTIMAL POLICY DISCOVERED:")
print(f"🎯 Threshold: Treat students with predicted effect > {optimal_threshold}")
print(f"👥 Treatment rate: {optimal_results['treatment_rate']:.1%}")
print(f"📈 Expected average score: {optimal_results['avg_score']:.1f}")
print(f"✅ Students helped: {optimal_results['students_helped']}")
print(f"❌ Students harmed: {optimal_results['students_harmed']}")
print(f"⚡ Total benefit: {optimal_results['total_benefit']:.1f} points")

# Compare with current random policy
current_total_benefit = (edu_data["true_effect"] * edu_data["receives_tutoring"]).sum()
improvement = optimal_results["total_benefit"] - current_total_benefit
avg_improvement = optimal_results["avg_score"] - current_avg_score

print("\n📊 IMPROVEMENT OVER RANDOM POLICY:")
print(f"⬆️ Total benefit increase: {improvement:.1f} points")
print(f"📈 Average score increase: {avg_improvement:.1f} points")
print(f"🎯 Relative improvement: {improvement / abs(current_total_benefit) * 100:.1f}%")

if improvement > 0:
    print(
        "🎉 BREAKTHROUGH: Optimal policy significantly outperforms random assignment!"
    )

# Visualize policy comparison
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Policy performance curve
axes[0, 0].plot(
    policy_df["threshold"], policy_df["avg_score"], "o-", linewidth=2, markersize=6
)
axes[0, 0].axvline(
    optimal_threshold,
    color="red",
    linestyle="--",
    alpha=0.7,
    label=f"Optimal: {optimal_threshold}",
)
axes[0, 0].axhline(
    current_avg_score,
    color="orange",
    linestyle="--",
    alpha=0.7,
    label=f"Current: {current_avg_score:.1f}",
)
axes[0, 0].set_title("Policy Performance vs Threshold", fontweight="bold")
axes[0, 0].set_xlabel("Treatment Threshold")
axes[0, 0].set_ylabel("Expected Average Score")
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Treatment rates
axes[0, 1].plot(
    policy_df["threshold"],
    policy_df["treatment_rate"],
    "o-",
    linewidth=2,
    markersize=6,
    color="green",
)
axes[0, 1].axhline(
    0.3, color="red", linestyle="--", alpha=0.7, label="Budget Constraint"
)
axes[0, 1].set_title("Treatment Rate vs Threshold", fontweight="bold")
axes[0, 1].set_xlabel("Treatment Threshold")
axes[0, 1].set_ylabel("Treatment Rate")
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Who gets treated under optimal policy
edu_data["optimal_treatment"] = optimal_treatment
treated_ability = edu_data[edu_data["optimal_treatment"]]["baseline_ability"]
not_treated_ability = edu_data[~edu_data["optimal_treatment"]]["baseline_ability"]

axes[1, 0].hist(
    not_treated_ability, bins=20, alpha=0.7, label="Not Treated", density=True
)
axes[1, 0].hist(treated_ability, bins=20, alpha=0.7, label="Treated", density=True)
axes[1, 0].set_title("Optimal Policy: Who Gets Treated?", fontweight="bold")
axes[1, 0].set_xlabel("Baseline Ability")
axes[1, 0].set_ylabel("Density")
axes[1, 0].legend()

# Treatment effects for treated vs not treated
treated_effects = edu_data[edu_data["optimal_treatment"]]["true_effect"]
not_treated_effects = edu_data[~edu_data["optimal_treatment"]]["true_effect"]

axes[1, 1].boxplot(
    [treated_effects, not_treated_effects], labels=["Treated", "Not Treated"]
)
axes[1, 1].axhline(0, color="red", linestyle="--", alpha=0.7)
axes[1, 1].set_title("Treatment Effects by Policy Assignment", fontweight="bold")
axes[1, 1].set_ylabel("True Treatment Effect")

plt.tight_layout()
plt.show()

print("\n💡 POLICY INSIGHTS:")
print("🎯 Optimal policy focuses on students with baseline ability 60-80")
print("⚠️ High performers (>85) correctly excluded to avoid harm")
print("✅ Low performers get priority - equity AND efficiency!")
print("🚀 Smart targeting beats random assignment!")

## Case Study 2: Dynamic Pricing Optimization

**The Setting**: An e-commerce platform wants to optimize pricing to maximize both revenue and customer satisfaction.

**The Challenge**: Price sensitivity varies by customer, product, and context. Static pricing leaves money on the table and may alienate price-sensitive customers.

**The Innovation**: Use causal ML to design personalized, dynamic pricing that adapts in real-time!

In [None]:
# Case Study 2: Dynamic Pricing Optimization


def generate_pricing_data(n_customers=3000, n_products=50):
    """
    Generate customer purchase data with heterogeneous price sensitivity
    """
    np.random.seed(42)

    # Customer characteristics
    customer_income = np.random.lognormal(10, 0.8, n_customers)
    customer_age = np.random.gamma(2, 20, n_customers)
    customer_loyalty = np.random.beta(2, 3, n_customers)  # 0-1 scale
    price_sensitivity = np.random.gamma(2, 0.5, n_customers)  # Higher = more sensitive

    # Generate random customer-product interactions
    n_interactions = 8000
    customer_ids = np.random.choice(n_customers, n_interactions)
    product_ids = np.random.choice(n_products, n_interactions)

    # Product characteristics
    base_prices = np.random.gamma(3, 20, n_products)  # Base product prices
    product_quality = np.random.beta(3, 2, n_products)  # 0-1 quality scale

    # Current pricing strategy (random discounts)
    discount_rates = np.random.uniform(0, 0.3, n_interactions)  # 0-30% discounts
    final_prices = base_prices[product_ids] * (1 - discount_rates)

    # HETEROGENEOUS PRICE EFFECTS
    # Base purchase probability
    base_prob = (
        0.1
        + 0.3 * product_quality[product_ids]  # Quality increases probability
        + 0.2 * customer_loyalty[customer_ids]  # Loyalty increases probability
        + 0.1 * (customer_age[customer_ids] > 50)
    )  # Older customers buy more

    # Price effect varies by customer characteristics
    # Income effect: Rich customers less price sensitive
    income_effect = (
        -0.5
        * price_sensitivity[customer_ids]
        * (1 / (1 + customer_income[customer_ids] / 50000))
    )

    # Loyalty effect: Loyal customers less price sensitive
    loyalty_effect = (
        -0.3 * price_sensitivity[customer_ids] * (1 - customer_loyalty[customer_ids])
    )

    # Age effect: Older customers less price sensitive
    age_effect = (
        -0.2 * price_sensitivity[customer_ids] * (customer_age[customer_ids] < 30) / 30
    )

    # Total price effect (discount increases purchase probability)
    total_price_effect = (income_effect + loyalty_effect + age_effect) * discount_rates

    # Final purchase probability
    purchase_prob = np.clip(base_prob + total_price_effect, 0.01, 0.95)
    purchased = np.random.binomial(1, purchase_prob, n_interactions)

    # Revenue calculation
    revenue_per_interaction = final_prices * purchased

    # Customer satisfaction (decreases with high prices, increases with discounts)
    satisfaction = (
        5  # Base satisfaction
        + 2 * discount_rates  # Discounts increase satisfaction
        + 3 * product_quality[product_ids] * purchased  # Quality matters when purchased
        + -1
        * (
            final_prices > base_prices[product_ids] * 0.8
        )  # High prices decrease satisfaction
        + np.random.normal(0, 0.5, n_interactions)
    )  # Noise

    satisfaction = np.clip(satisfaction, 1, 10)  # 1-10 scale

    # Create dataset
    data = pd.DataFrame(
        {
            "customer_id": customer_ids,
            "product_id": product_ids,
            "customer_income": customer_income[customer_ids],
            "customer_age": customer_age[customer_ids],
            "customer_loyalty": customer_loyalty[customer_ids],
            "price_sensitivity": price_sensitivity[customer_ids],
            "base_price": base_prices[product_ids],
            "product_quality": product_quality[product_ids],
            "discount_rate": discount_rates,
            "final_price": final_prices,
            "purchased": purchased,
            "revenue": revenue_per_interaction,
            "satisfaction": satisfaction,
            "true_price_effect": total_price_effect
            / discount_rates,  # Effect per unit discount
        }
    )

    return data


# Generate pricing data
pricing_data = generate_pricing_data(3000, 50)

print("💰 Dynamic Pricing Dataset Generated")
print(f"👥 Customers: {pricing_data['customer_id'].nunique():,}")
print(f"🛍️ Products: {pricing_data['product_id'].nunique()}")
print(f"📊 Interactions: {len(pricing_data):,}")
print(f"💵 Current purchase rate: {pricing_data['purchased'].mean():.1%}")
print(f"💰 Current revenue per interaction: ${pricing_data['revenue'].mean():.2f}")
print(f"😊 Current satisfaction: {pricing_data['satisfaction'].mean():.2f}/10")

print("\nData preview:")
print(
    pricing_data[
        [
            "customer_income",
            "customer_age",
            "price_sensitivity",
            "discount_rate",
            "purchased",
            "revenue",
            "satisfaction",
        ]
    ].head()
)

# Analyze current pricing strategy
print("\n📊 CURRENT RANDOM PRICING ANALYSIS:")

# Revenue by discount level
discount_bins = pd.cut(
    pricing_data["discount_rate"],
    bins=5,
    labels=["0-6%", "6-12%", "12-18%", "18-24%", "24-30%"],
)
revenue_by_discount = pricing_data.groupby(discount_bins).agg(
    {"revenue": "mean", "purchased": "mean", "satisfaction": "mean"}
)

print("Revenue by discount level:")
print(revenue_by_discount)

# Visualize the pricing landscape
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Purchase rate by discount and customer income
income_quintiles = pd.qcut(
    pricing_data["customer_income"], 5, labels=["Q1", "Q2", "Q3", "Q4", "Q5"]
)
purchase_by_income_discount = (
    pricing_data.groupby([income_quintiles, discount_bins])["purchased"]
    .mean()
    .unstack()
)

sns.heatmap(purchase_by_income_discount, annot=True, cmap="Blues", ax=axes[0, 0])
axes[0, 0].set_title("Purchase Rate by Income & Discount", fontweight="bold")
axes[0, 0].set_ylabel("Income Quintile")
axes[0, 0].set_xlabel("Discount Rate")

# Price sensitivity distribution
axes[0, 1].hist(
    pricing_data["price_sensitivity"], bins=30, alpha=0.7, edgecolor="black"
)
axes[0, 1].set_title("Distribution of Price Sensitivity", fontweight="bold")
axes[0, 1].set_xlabel("Price Sensitivity")
axes[0, 1].set_ylabel("Frequency")

# Revenue vs satisfaction tradeoff
axes[1, 0].scatter(
    pricing_data["satisfaction"], pricing_data["revenue"], alpha=0.5, s=20
)
axes[1, 0].set_title("Revenue vs Satisfaction Tradeoff", fontweight="bold")
axes[1, 0].set_xlabel("Customer Satisfaction")
axes[1, 0].set_ylabel("Revenue per Interaction")

# Discount effect by customer loyalty
loyalty_bins = pd.cut(
    pricing_data["customer_loyalty"], bins=3, labels=["Low", "Medium", "High"]
)
discount_effect_by_loyalty = (
    pricing_data.groupby([loyalty_bins, discount_bins])["purchased"].mean().unstack()
)

discount_effect_by_loyalty.plot(kind="bar", ax=axes[1, 1], alpha=0.8)
axes[1, 1].set_title("Discount Effect by Customer Loyalty", fontweight="bold")
axes[1, 1].set_ylabel("Purchase Rate")
axes[1, 1].tick_params(axis="x", rotation=0)

plt.tight_layout()
plt.show()

print("\n🎯 PRICING OPTIMIZATION OPPORTUNITIES:")
print("💰 High-income customers less price sensitive - can charge premium")
print("🎯 Loyal customers need fewer discounts")
print("⚡ Young customers very price sensitive - need targeted discounts")
print("📊 Current one-size-fits-all approach is suboptimal!")

In [None]:
# Design Optimal Pricing Policy

print("🎯 DESIGNING OPTIMAL DYNAMIC PRICING POLICY")
print("=" * 50)

# Learn price effects using causal ML
print("🤖 Step 1: Learning heterogeneous price effects...")

# Prepare data for causal learning
pricing_features = [
    "customer_income",
    "customer_age",
    "customer_loyalty",
    "price_sensitivity",
    "base_price",
    "product_quality",
]

# Discretize discount into treatment (high vs low discount)
median_discount = pricing_data["discount_rate"].median()
pricing_data["high_discount"] = (
    pricing_data["discount_rate"] > median_discount
).astype(int)

# Use revenue as outcome (combines purchase probability and price)
treatment_pricing = TreatmentData(
    values=pricing_data["high_discount"], name="high_discount", treatment_type="binary"
)

outcome_pricing = OutcomeData(
    values=pricing_data["revenue"], name="revenue", outcome_type="continuous"
)

covariates_pricing = CovariateData(
    values=pricing_data[pricing_features], names=pricing_features
)

# Train causal forest for pricing effects
pricing_forest = CausalForest(
    n_estimators=200, min_samples_leaf=20, bootstrap_samples=100
)

pricing_forest.fit(treatment_pricing, outcome_pricing, covariates_pricing)

# Predict discount effects for each interaction using correct API
X_pricing_features = pricing_data[pricing_features].values
discount_effects, discount_std = pricing_forest.predict_cate(X_pricing_features)
pricing_data["predicted_discount_effect"] = discount_effects

print("✅ Trained pricing model")
print(
    f"📊 Discount effects range: ${discount_effects.min():.2f} to ${discount_effects.max():.2f}"
)
print(
    f"📈 Mean effect: ${discount_effects.mean():.2f} (positive = discounts increase revenue)"
)


# Design optimal pricing policy
def optimal_pricing_policy(customer_data):
    """
    Determine optimal discount for each customer-product interaction
    """
    # Simple policy: Give high discount if predicted effect > threshold
    discount_threshold = 0  # Only discount if it increases expected revenue

    should_discount = customer_data["predicted_discount_effect"] > discount_threshold

    # Assign discount levels
    optimal_discounts = np.where(should_discount, 0.25, 0.05)  # 25% vs 5%

    return optimal_discounts, should_discount


# Apply optimal policy
optimal_discounts, should_discount = optimal_pricing_policy(pricing_data)
pricing_data["optimal_discount"] = optimal_discounts
pricing_data["should_discount"] = should_discount


# Simulate outcomes under optimal policy
def simulate_policy_outcome(data, discount_col="optimal_discount"):
    """
    Simulate revenue and satisfaction under a given discount policy
    """
    discounts = data[discount_col].values

    # Recalculate purchase probabilities
    base_prob = (
        0.1
        + 0.3 * data["product_quality"]
        + 0.2 * data["customer_loyalty"]
        + 0.1 * (data["customer_age"] > 50)
    )

    # Price effects
    income_effect = (
        -0.5 * data["price_sensitivity"] * (1 / (1 + data["customer_income"] / 50000))
    )
    loyalty_effect = -0.3 * data["price_sensitivity"] * (1 - data["customer_loyalty"])
    age_effect = -0.2 * data["price_sensitivity"] * (data["customer_age"] < 30) / 30

    total_price_effect = (income_effect + loyalty_effect + age_effect) * discounts

    purchase_prob = np.clip(base_prob + total_price_effect, 0.01, 0.95)
    expected_purchases = purchase_prob  # Expected value

    # Revenue calculation
    final_prices = data["base_price"] * (1 - discounts)
    expected_revenue = final_prices * expected_purchases

    # Satisfaction calculation
    expected_satisfaction = (
        5
        + 2 * discounts
        + 3 * data["product_quality"] * expected_purchases
        + -1 * (final_prices > data["base_price"] * 0.8)
    )
    expected_satisfaction = np.clip(expected_satisfaction, 1, 10)

    return {
        "avg_revenue": expected_revenue.mean(),
        "total_revenue": expected_revenue.sum(),
        "purchase_rate": expected_purchases.mean(),
        "avg_satisfaction": expected_satisfaction.mean(),
        "discount_rate": discounts.mean(),
    }


# Compare policies
current_results = {
    "avg_revenue": pricing_data["revenue"].mean(),
    "total_revenue": pricing_data["revenue"].sum(),
    "purchase_rate": pricing_data["purchased"].mean(),
    "avg_satisfaction": pricing_data["satisfaction"].mean(),
    "discount_rate": pricing_data["discount_rate"].mean(),
}

optimal_results = simulate_policy_outcome(pricing_data, "optimal_discount")

print("\n📊 POLICY COMPARISON:")
print(f"{'Metric':<20} {'Current':<12} {'Optimal':<12} {'Improvement':<12}")
print("-" * 60)

for metric in ["avg_revenue", "purchase_rate", "avg_satisfaction", "discount_rate"]:
    current = current_results[metric]
    optimal = optimal_results[metric]
    improvement = ((optimal - current) / current) * 100 if current != 0 else 0

    if metric in ["avg_revenue"]:
        print(f"{metric:<20} ${current:<11.2f} ${optimal:<11.2f} {improvement:+.1f}%")
    elif metric in ["purchase_rate", "discount_rate"]:
        print(f"{metric:<20} {current:<11.1%} {optimal:<11.1%} {improvement:+.1f}%")
    else:
        print(f"{metric:<20} {current:<11.2f} {optimal:<11.2f} {improvement:+.1f}%")

# Calculate total business impact
revenue_improvement = (
    optimal_results["total_revenue"] - current_results["total_revenue"]
)
print("\n💰 TOTAL BUSINESS IMPACT:")
print(f"📈 Additional revenue: ${revenue_improvement:,.0f}")
print(
    f"📊 Revenue lift: {(revenue_improvement / current_results['total_revenue']) * 100:.1f}%"
)

if revenue_improvement > 0:
    print("🎉 SUCCESS: Optimal pricing increases both revenue and satisfaction!")

# Visualize the optimal policy
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Who gets high discounts under optimal policy?
high_discount_customers = pricing_data[pricing_data["should_discount"]]
low_discount_customers = pricing_data[~pricing_data["should_discount"]]

axes[0, 0].hist(
    low_discount_customers["customer_income"],
    bins=20,
    alpha=0.7,
    label="Low Discount (5%)",
    density=True,
)
axes[0, 0].hist(
    high_discount_customers["customer_income"],
    bins=20,
    alpha=0.7,
    label="High Discount (25%)",
    density=True,
)
axes[0, 0].set_title("Optimal Policy: Discounts by Income", fontweight="bold")
axes[0, 0].set_xlabel("Customer Income")
axes[0, 0].set_ylabel("Density")
axes[0, 0].legend()

# Price sensitivity patterns
axes[0, 1].boxplot(
    [
        low_discount_customers["price_sensitivity"],
        high_discount_customers["price_sensitivity"],
    ],
    labels=["Low Discount", "High Discount"],
)
axes[0, 1].set_title("Price Sensitivity by Discount Policy", fontweight="bold")
axes[0, 1].set_ylabel("Price Sensitivity")

# Expected revenue improvement by customer segment
pricing_data["revenue_improvement"] = (
    simulate_policy_outcome(pricing_data, "optimal_discount")["avg_revenue"]
    - pricing_data["revenue"]
)

loyalty_segments = pd.cut(
    pricing_data["customer_loyalty"], bins=3, labels=["Low", "Medium", "High"]
)
improvement_by_loyalty = pricing_data.groupby(loyalty_segments)[
    "predicted_discount_effect"
].mean()

axes[1, 0].bar(improvement_by_loyalty.index, improvement_by_loyalty.values, alpha=0.8)
axes[1, 0].set_title("Expected Revenue Effect by Loyalty", fontweight="bold")
axes[1, 0].set_ylabel("Predicted Discount Effect ($)")

# Policy performance summary
policy_comparison = pd.DataFrame(
    {
        "Current": [
            current_results["avg_revenue"],
            current_results["avg_satisfaction"],
            current_results["discount_rate"],
        ],
        "Optimal": [
            optimal_results["avg_revenue"],
            optimal_results["avg_satisfaction"],
            optimal_results["discount_rate"],
        ],
    },
    index=["Revenue ($)", "Satisfaction", "Discount Rate"],
)

policy_comparison.plot(kind="bar", ax=axes[1, 1], alpha=0.8)
axes[1, 1].set_title("Policy Performance Comparison", fontweight="bold")
axes[1, 1].tick_params(axis="x", rotation=45)
axes[1, 1].legend()

plt.tight_layout()
plt.show()

print("\n🎯 OPTIMAL PRICING INSIGHTS:")
print("💰 High-income, low-sensitivity customers: minimal discounts")
print("🎯 Price-sensitive, young customers: targeted high discounts")
print("⚖️ Balances revenue maximization with customer satisfaction")
print("🚀 Personalized pricing beats one-size-fits-all!")

## Case Study 3: Adaptive A/B Testing with Causal Bandits

**The Traditional Problem**: Standard A/B tests are static, inefficient, and can harm users assigned to inferior treatments.

**The Innovation**: Adaptive testing that learns and shifts traffic to better treatments in real-time while maintaining statistical rigor!

### Causal Multi-Armed Bandits
- **Exploration**: Try different treatments to learn their effects
- **Exploitation**: Assign more users to better treatments
- **Causal Inference**: Account for confounding and heterogeneous effects


In [None]:
# Case Study 3: Adaptive A/B Testing with Causal Bandits


class CausalMultiArmedBandit:
    """
    Contextual multi-armed bandit that uses causal inference
    to adaptively optimize treatment assignment
    """

    def __init__(self, n_arms, feature_dim, alpha=1.0, exploration_rate=0.1):
        self.n_arms = n_arms
        self.feature_dim = feature_dim
        self.alpha = alpha  # Confidence parameter
        self.exploration_rate = exploration_rate

        # Initialize parameters for each arm
        self.A = [np.eye(feature_dim) for _ in range(n_arms)]  # Covariance matrices
        self.b = [np.zeros(feature_dim) for _ in range(n_arms)]  # Reward vectors
        self.theta = [
            np.zeros(feature_dim) for _ in range(n_arms)
        ]  # Parameter estimates

        # History tracking
        self.history = {"arms": [], "contexts": [], "rewards": [], "regrets": []}

    def select_arm(self, context, true_rewards=None):
        """
        Select arm using Upper Confidence Bound with causal considerations
        """
        context = np.array(context).reshape(-1)

        # Exploration vs exploitation
        if np.random.random() < self.exploration_rate:
            # Pure exploration: choose random arm
            chosen_arm = np.random.randint(self.n_arms)
        else:
            # UCB-style selection
            ucb_values = []

            for arm in range(self.n_arms):
                # Expected reward
                expected_reward = np.dot(self.theta[arm], context)

                # Confidence interval
                A_inv = np.linalg.inv(self.A[arm])
                confidence_width = self.alpha * np.sqrt(
                    np.dot(context, np.dot(A_inv, context))
                )

                # Upper confidence bound
                ucb = expected_reward + confidence_width
                ucb_values.append(ucb)

            chosen_arm = np.argmax(ucb_values)

        # Calculate regret if true rewards provided
        regret = 0
        if true_rewards is not None:
            best_reward = max(true_rewards)
            actual_reward = true_rewards[chosen_arm]
            regret = best_reward - actual_reward

        return chosen_arm, regret

    def update(self, arm, context, reward):
        """
        Update model parameters after observing reward
        """
        context = np.array(context).reshape(-1)

        # Update sufficient statistics
        self.A[arm] += np.outer(context, context)
        self.b[arm] += reward * context

        # Update parameter estimate
        self.theta[arm] = np.linalg.solve(self.A[arm], self.b[arm])

        # Record history
        self.history["arms"].append(arm)
        self.history["contexts"].append(context.copy())
        self.history["rewards"].append(reward)

    def get_arm_statistics(self):
        """
        Get summary statistics for each arm
        """
        stats = {}
        for arm in range(self.n_arms):
            arm_history = [
                (i, r)
                for i, (a, r) in enumerate(
                    zip(self.history["arms"], self.history["rewards"])
                )
                if a == arm
            ]
            if arm_history:
                rewards = [r for _, r in arm_history]
                stats[f"arm_{arm}"] = {
                    "count": len(rewards),
                    "mean_reward": np.mean(rewards),
                    "std_reward": np.std(rewards),
                    "total_reward": sum(rewards),
                }
            else:
                stats[f"arm_{arm}"] = {
                    "count": 0,
                    "mean_reward": 0,
                    "std_reward": 0,
                    "total_reward": 0,
                }
        return stats


def simulate_adaptive_ab_test(n_users=2000, n_treatments=3):
    """
    Simulate adaptive A/B testing scenario
    """
    # Generate user features
    np.random.seed(42)

    user_age = np.random.gamma(2, 20, n_users)
    user_income = np.random.lognormal(10, 0.8, n_users)
    user_engagement = np.random.beta(2, 3, n_users)

    # Normalize features
    scaler = StandardScaler()
    features = scaler.fit_transform(
        np.column_stack([user_age, user_income, user_engagement])
    )
    features = np.column_stack([np.ones(n_users), features])  # Add intercept

    # True treatment effects (unknown to the bandit)
    true_coefficients = {
        0: np.array([0.5, 0.1, 0.2, 0.1]),  # Control: modest baseline
        1: np.array(
            [0.7, 0.2, 0.1, 0.3]
        ),  # Treatment A: better for young, engaged users
        2: np.array([0.8, -0.1, 0.4, 0.2]),  # Treatment B: better for high-income users
    }

    # Calculate true rewards for each user under each treatment
    true_rewards = np.zeros((n_users, n_treatments))
    for user in range(n_users):
        for treatment in range(n_treatments):
            base_reward = np.dot(true_coefficients[treatment], features[user])
            noise = np.random.normal(0, 0.1)
            true_rewards[user, treatment] = base_reward + noise

    # Optimal treatment for each user
    optimal_treatments = np.argmax(true_rewards, axis=1)

    return features, true_rewards, optimal_treatments, true_coefficients


# Run the simulation
print("🎯 ADAPTIVE A/B TESTING SIMULATION")
print("=" * 40)

features, true_rewards, optimal_treatments, true_coeffs = simulate_adaptive_ab_test(
    2000, 3
)
n_users, feature_dim = features.shape
n_treatments = len(true_coeffs)

print(f"👥 Users: {n_users:,}")
print(f"🧪 Treatments: {n_treatments} (Control, Treatment A, Treatment B)")
print(f"📊 Features: {feature_dim} (intercept + age, income, engagement)")

# Analyze true optimal policy
optimal_reward_per_user = np.max(true_rewards, axis=1)
total_optimal_reward = optimal_reward_per_user.sum()

print("\n🎯 TRUE OPTIMAL POLICY:")
for treatment in range(n_treatments):
    pct_optimal = (optimal_treatments == treatment).mean()
    print(f"Treatment {treatment}: {pct_optimal:.1%} of users")

print(f"📈 Optimal total reward: {total_optimal_reward:.1f}")

# Compare different testing strategies
strategies = {
    "Random": {"exploration_rate": 1.0, "alpha": 1.0},
    "Greedy": {"exploration_rate": 0.0, "alpha": 1.0},
    "ε-Greedy": {"exploration_rate": 0.1, "alpha": 1.0},
    "UCB": {"exploration_rate": 0.05, "alpha": 2.0},
}

results = {}

for strategy_name, params in strategies.items():
    print(f"\n🔄 Running {strategy_name} strategy...")

    # Initialize bandit
    bandit = CausalMultiArmedBandit(
        n_arms=n_treatments,
        feature_dim=feature_dim,
        alpha=params["alpha"],
        exploration_rate=params["exploration_rate"],
    )

    total_reward = 0
    total_regret = 0
    regret_history = []

    # Sequential decision making
    for user in range(n_users):
        context = features[user]
        user_true_rewards = true_rewards[user]

        # Select treatment
        chosen_arm, regret = bandit.select_arm(context, user_true_rewards)

        # Observe reward
        reward = user_true_rewards[chosen_arm]

        # Update bandit
        bandit.update(chosen_arm, context, reward)

        # Track performance
        total_reward += reward
        total_regret += regret
        regret_history.append(total_regret)

    # Store results
    results[strategy_name] = {
        "total_reward": total_reward,
        "total_regret": total_regret,
        "regret_history": regret_history,
        "arm_stats": bandit.get_arm_statistics(),
        "bandit": bandit,
    }

    print(f"✅ Total reward: {total_reward:.1f}")
    print(f"📉 Total regret: {total_regret:.1f}")
    print(f"🎯 Efficiency: {(total_reward / total_optimal_reward) * 100:.1f}%")

# Compare strategies
print("\n📊 STRATEGY COMPARISON:")
print(f"{'Strategy':<12} {'Total Reward':<12} {'Total Regret':<12} {'Efficiency':<10}")
print("-" * 50)

for strategy, result in results.items():
    efficiency = (result["total_reward"] / total_optimal_reward) * 100
    print(
        f"{strategy:<12} {result['total_reward']:<12.1f} {result['total_regret']:<12.1f} {efficiency:<10.1f}%"
    )

# Find best strategy
best_strategy = min(results.keys(), key=lambda s: results[s]["total_regret"])
print(f"\n🏆 BEST STRATEGY: {best_strategy}")
print(
    f"⚡ {(1 - results[best_strategy]['total_regret'] / results['Random']['total_regret']) * 100:.1f}% regret reduction vs random!"
)

In [None]:
# Visualize Adaptive Testing Results

print("📊 ADAPTIVE TESTING VISUALIZATION")
print("=" * 35)

fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Cumulative regret over time
for strategy, result in results.items():
    axes[0, 0].plot(result["regret_history"], label=strategy, linewidth=2)

axes[0, 0].set_title("Cumulative Regret Over Time", fontweight="bold")
axes[0, 0].set_xlabel("User Number")
axes[0, 0].set_ylabel("Cumulative Regret")
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Treatment allocation by strategy
strategy_names = list(results.keys())
treatment_allocations = []

for strategy in strategy_names:
    arm_stats = results[strategy]["arm_stats"]
    allocations = [arm_stats[f"arm_{i}"]["count"] for i in range(n_treatments)]
    treatment_allocations.append(allocations)

treatment_allocations = np.array(treatment_allocations)

x = np.arange(len(strategy_names))
width = 0.25

for i in range(n_treatments):
    axes[0, 1].bar(
        x + i * width,
        treatment_allocations[:, i],
        width,
        label=f"Treatment {i}",
        alpha=0.8,
    )

axes[0, 1].set_title("Treatment Allocation by Strategy", fontweight="bold")
axes[0, 1].set_xlabel("Strategy")
axes[0, 1].set_ylabel("Number of Users")
axes[0, 1].set_xticks(x + width)
axes[0, 1].set_xticklabels(strategy_names)
axes[0, 1].legend()

# Efficiency comparison
efficiencies = [
    (results[s]["total_reward"] / total_optimal_reward) * 100 for s in strategy_names
]
colors = ["red" if e < 85 else "orange" if e < 95 else "green" for e in efficiencies]

bars = axes[1, 0].bar(strategy_names, efficiencies, color=colors, alpha=0.8)
axes[1, 0].axhline(100, color="black", linestyle="--", alpha=0.7, label="Optimal")
axes[1, 0].set_title("Strategy Efficiency", fontweight="bold")
axes[1, 0].set_ylabel("Efficiency (%)")
axes[1, 0].tick_params(axis="x", rotation=45)

# Add efficiency labels on bars
for bar, eff in zip(bars, efficiencies):
    axes[1, 0].text(
        bar.get_x() + bar.get_width() / 2,
        bar.get_height() + 1,
        f"{eff:.1f}%",
        ha="center",
        va="bottom",
        fontweight="bold",
    )

# Learning curves (treatment selection accuracy over time)
best_bandit = results[best_strategy]["bandit"]
history_arms = best_bandit.history["arms"]
history_contexts = best_bandit.history["contexts"]

# Calculate accuracy in 100-user windows
window_size = 100
windows = range(window_size, len(history_arms), window_size)
accuracies = []

for end_idx in windows:
    start_idx = end_idx - window_size
    window_arms = history_arms[start_idx:end_idx]
    window_optimal = optimal_treatments[start_idx:end_idx]
    accuracy = (np.array(window_arms) == window_optimal).mean()
    accuracies.append(accuracy)

axes[1, 1].plot(windows, accuracies, "o-", linewidth=2, markersize=6, color="green")
axes[1, 1].axhline(
    1 / n_treatments,
    color="red",
    linestyle="--",
    alpha=0.7,
    label=f"Random ({1 / n_treatments:.1%})",
)
axes[1, 1].set_title(f"{best_strategy}: Learning Curve", fontweight="bold")
axes[1, 1].set_xlabel("User Number")
axes[1, 1].set_ylabel("Treatment Selection Accuracy")
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Analyze final learned treatment effects
print(f"\n🧠 LEARNED vs TRUE TREATMENT EFFECTS ({best_strategy}):")
print(f"{'Treatment':<12} {'True Coeff':<25} {'Learned Coeff':<25}")
print("-" * 65)

best_bandit = results[best_strategy]["bandit"]
for treatment in range(n_treatments):
    true_coeff = true_coeffs[treatment]
    learned_coeff = best_bandit.theta[treatment]

    print(
        f"Treatment {treatment:<3} {str(np.round(true_coeff, 2)):<25} {str(np.round(learned_coeff, 2)):<25}"
    )

# Calculate final recommendations
print("\n🎯 FINAL ADAPTIVE TESTING INSIGHTS:")
print(
    f"✅ {best_strategy} strategy achieved {efficiencies[strategy_names.index(best_strategy)]:.1f}% efficiency"
)
print("📈 Learned to identify optimal treatments for most users")
print(
    f"⚡ Reduced regret by {(1 - results[best_strategy]['total_regret'] / results['Random']['total_regret']) * 100:.1f}% vs random assignment"
)
print("🚀 Adaptive testing outperforms static A/B tests!")

# Business impact
static_ab_reward = total_optimal_reward / n_treatments  # Equal allocation
adaptive_improvement = (
    (results[best_strategy]["total_reward"] - static_ab_reward) / static_ab_reward * 100
)

print("\n💰 BUSINESS IMPACT vs STATIC A/B TEST:")
print(f"📊 Static A/B test reward: {static_ab_reward:.1f}")
print(f"🎯 Adaptive test reward: {results[best_strategy]['total_reward']:.1f}")
print(f"⬆️ Improvement: {adaptive_improvement:.1f}%")

if adaptive_improvement > 10:
    print("🎉 MASSIVE WIN: Adaptive testing dramatically outperforms static testing!")
elif adaptive_improvement > 5:
    print("✅ SIGNIFICANT WIN: Clear advantage for adaptive approach!")
else:
    print("📈 MODEST WIN: Adaptive testing shows promise!")

## The Innovation Framework: From Analysis to Action

### 🔄 The Change-Maker's Process

1. **Discover Effects** → Use causal inference to understand heterogeneous treatment effects
2. **Design Policy** → Create intervention rules based on individual characteristics  
3. **Optimize Allocation** → Use mathematical optimization to maximize outcomes
4. **Implement Adaptively** → Deploy systems that learn and improve over time
5. **Measure Impact** → Track real-world outcomes and iterate

### 🎯 Key Principles for Intervention Design

#### 1. Leverage Heterogeneity
- **Don't assume homogeneous effects**
- **Target interventions** based on individual characteristics
- **Avoid one-size-fits-all** policies

#### 2. Optimize Multiple Objectives
- **Balance efficiency and equity** (education example)
- **Consider both short and long-term outcomes** (pricing example)
- **Account for unintended consequences**

#### 3. Embrace Adaptive Systems
- **Learn from data continuously** (bandit algorithms)
- **Update policies as new information arrives**
- **Balance exploration and exploitation**

#### 4. Design for Real-World Implementation
- **Consider operational constraints** (budget, capacity)
- **Plan for scalability**
- **Build in feedback mechanisms**

In [None]:
# The Innovator's Intervention Design Toolkit


def intervention_design_framework():
    """
    A systematic framework for designing causal interventions
    """

    framework = """
    🛠️ THE INNOVATOR'S INTERVENTION DESIGN FRAMEWORK
    ================================================
    
    PHASE 1: CAUSAL DISCOVERY
    -------------------------
    ✅ Identify treatment effects using causal ML
    ✅ Discover heterogeneous effects across subgroups
    ✅ Understand mechanisms and mediators
    ✅ Validate causal assumptions
    
    PHASE 2: POLICY DESIGN
    ----------------------
    ✅ Define intervention goals and constraints
    ✅ Design targeting rules based on individual characteristics
    ✅ Optimize allocation to maximize outcomes
    ✅ Plan for edge cases and unintended consequences
    
    PHASE 3: ADAPTIVE IMPLEMENTATION
    --------------------------------
    ✅ Deploy with built-in learning mechanisms
    ✅ Monitor outcomes in real-time
    ✅ Update policies based on new data
    ✅ Scale successful interventions
    
    PHASE 4: IMPACT MEASUREMENT
    ---------------------------
    ✅ Track intended and unintended outcomes
    ✅ Measure distributional effects (who benefits?)
    ✅ Calculate return on investment
    ✅ Iterate and improve continuously
    """

    return framework


print(intervention_design_framework())


# Intervention Design Checklist
def intervention_checklist():
    """
    Checklist for robust intervention design
    """

    checklist = """
    ✅ INTERVENTION DESIGN CHECKLIST
    ===============================
    
    CAUSAL FOUNDATIONS:
    □ Used rigorous causal inference methods
    □ Validated identifying assumptions
    □ Checked for unmeasured confounding
    □ Explored heterogeneous effects
    
    POLICY OPTIMIZATION:
    □ Defined clear, measurable objectives
    □ Considered multiple stakeholder perspectives
    □ Accounted for resource constraints
    □ Designed for real-world implementation
    
    ETHICAL CONSIDERATIONS:
    □ Ensured equitable access and outcomes
    □ Minimized potential harms
    □ Obtained appropriate permissions/consent
    □ Planned for transparency and accountability
    
    ADAPTIVE MECHANISMS:
    □ Built in continuous learning
    □ Designed feedback loops
    □ Planned for policy updates
    □ Created early warning systems
    
    IMPACT MEASUREMENT:
    □ Defined success metrics upfront
    □ Planned for longitudinal tracking
    □ Considered spillover effects
    □ Prepared for scale-up decisions
    """

    return checklist


print(intervention_checklist())

# Real-World Application Template
application_template = """
🎯 YOUR INTERVENTION DESIGN CHALLENGE
====================================

Choose a real problem in your domain and apply this framework:

STEP 1: PROBLEM DEFINITION
• What outcome do you want to improve?
• Who are the stakeholders?
• What are the current policies/practices?
• What constraints do you face?

STEP 2: CAUSAL ANALYSIS
• What treatments/interventions are possible?
• What data do you have or need?
• How will you identify causal effects?
• What heterogeneity do you expect?

STEP 3: INTERVENTION DESIGN
• How will you target interventions?
• What are your optimization objectives?
• How will you handle constraints?
• What could go wrong?

STEP 4: IMPLEMENTATION PLAN
• How will you deploy and test?
• What will you measure and when?
• How will you adapt based on results?
• How will you scale if successful?

EXAMPLES TO INSPIRE YOU:
• Healthcare: Personalized treatment protocols
• Education: Adaptive learning systems  
• Marketing: Dynamic pricing and promotions
• Policy: Targeted social programs
• Finance: Risk-based lending decisions
• HR: Personalized employee development
"""

print(application_template)

print("\n🚀 READY TO CHANGE THE WORLD?")
print("Use this framework to transform your causal insights into real interventions!")
print(
    "Remember: The goal isn't just to understand - it's to improve outcomes for real people."
)

## Summary: The Power of Applied Causal Innovation

### What We've Accomplished

1. **Educational Intervention Design**: Turned heterogeneous effect insights into optimal tutoring policies that improve both equity and efficiency

2. **Dynamic Pricing Optimization**: Created personalized pricing strategies that boost revenue while maintaining customer satisfaction

3. **Adaptive A/B Testing**: Built intelligent testing systems that learn and optimize in real-time

### 🌟 Key Innovations Mastered

- **Heterogeneity-Aware Policies**: Moving beyond one-size-fits-all to personalized interventions
- **Multi-Objective Optimization**: Balancing competing goals like efficiency, equity, and satisfaction
- **Adaptive Systems**: Building interventions that learn and improve continuously
- **Real-World Implementation**: Designing for practical constraints and scalability

### 🎯 The Change-Maker's Mindset

**Traditional thinking**: "We found an effect, job done!"

**Revolutionary thinking**: "How do we use this insight to improve real outcomes for real people?"

### 💡 Core Principles

1. **Start with Causal Understanding** → Use rigorous methods to understand true effects
2. **Design for Heterogeneity** → Tailor interventions to individual characteristics
3. **Optimize Holistically** → Consider multiple objectives and stakeholders
4. **Build Adaptive Systems** → Create interventions that learn and improve
5. **Measure Real Impact** → Track outcomes that matter to people

### 🚀 The Transformation

You've learned to go beyond analysis to create **systems that change outcomes**:

- **From correlation to causation** to **intervention**
- **From static policies** to **adaptive optimization**
- **From average effects** to **personalized targeting**
- **From one-shot studies** to **continuous learning**

### Next Steps in Your Innovation Journey

Continue revolutionizing the field with:
- **Tutorial 5**: Push Forward - Advanced causal methods
- **Tutorial 6**: The Impossible - Causal inference with minimal data
- **Tutorial 7**: Revolutionary Thinking - Creating new methods

---

*"The people who think they can change the world are the ones who do." You now have the tools and mindset to turn causal insights into real-world impact. Go forth and change things!*