# Phase 4: API Serving & Deployment

This notebook demonstrates the API serving layer for the Telco Churn Prediction system, focusing on:

1. **API Setup**: Starting and configuring the FastAPI service
2. **Endpoint Testing**: Testing all API endpoints (health, metadata, predict)
3. **Prediction Examples**: Making single and batch predictions
4. **SHAP Explanations**: Verifying explainability in API responses
5. **Integration Testing**: End-to-end testing of the prediction pipeline

## Objectives
- Verify API service functionality
- Test prediction accuracy and consistency
- Validate SHAP explanations
- Ensure production readiness


In [None]:
# Simple setup for API testing
import sys
from pathlib import Path

# Find project root
PROJECT_ROOT = Path().resolve().parent if (Path().resolve().parent / 'src').exists() else Path().resolve()
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

print(f"Project root: {PROJECT_ROOT}")


Project root: C:\Users\tiwar\OneDrive - The University of Melbourne\Desktop\New folder\telco-churn-retention


In [None]:
# Import libraries for API testing
import json
import time
import pandas as pd
import numpy as np
import requests
from rich.console import Console
from rich.table import Table

console = Console()
print("[OK] Libraries imported")


[OK] Libraries imported


## 2. Verify Prerequisites

Before testing the API, we need to ensure:
- Models have been trained (Phase 3 completed)
- Feature pipeline has been saved
- API service can be started


In [None]:
# Check if models exist
models_dir = PROJECT_ROOT / "models"
processed_dir = PROJECT_ROOT / "data" / "processed"

console.print("\n[bold cyan]Checking Prerequisites...[/bold cyan]")

# Check models
if models_dir.exists():
    model_dirs = [d for d in models_dir.iterdir() if d.is_dir()]
    if model_dirs:
        latest_model_dir = max(model_dirs, key=lambda p: p.name)
        console.print(f"[green][OK] Models found: {latest_model_dir}[/green]")
        
        # Check for model files
        model_files = list(latest_model_dir.rglob("*_model.pkl"))
        pipeline_files = list(latest_model_dir.rglob("feature_pipeline.pkl"))
        
        console.print(f"  - Model files: {len(model_files)}")
        console.print(f"  - Pipeline files: {len(pipeline_files)}")
        
        if not model_files:
            console.print("[red][FAIL] No model files found. Please run Phase 3 first.[/red]")
        if not pipeline_files:
            console.print("[yellow][WARN] No pipeline files found. Pipeline may not load correctly.[/yellow]")
    else:
        console.print("[red][FAIL] No model directories found. Please run Phase 3 first.[/red]")
else:
    console.print("[red][FAIL] Models directory not found. Please run Phase 3 first.[/red]")

# Check processed data
if processed_dir.exists():
    processed_dirs = [d for d in processed_dir.iterdir() if d.is_dir()]
    if processed_dirs:
        console.print(f"[green][OK] Processed data found: {len(processed_dirs)} versions[/green]")
    else:
        console.print("[yellow][WARN] No processed data found. SHAP explanations may not work.[/yellow]")
else:
    console.print("[yellow][WARN] Processed data directory not found.[/yellow]")



[1;36mChecking Prerequisites...[0m
[32m[OK] Models found: 20251129T094943Z[0m
  - Model files: 1
  - Pipeline files: 1
[32m[OK] Processed data found: 5 versions[0m


## 3. Load Test Data

We'll load sample customer data from the raw dataset to test the API.


In [None]:
# Load sample customer data for testing
raw_data_path = PROJECT_ROOT / "data" / "raw" / "telco_data_28_11_2025.csv"

if raw_data_path.exists():
    df_raw = pd.read_csv(raw_data_path)
    console.print(f"[green][OK] Loaded raw data: {len(df_raw)} customers[/green]")
    
    # Clean data (same as Phase 1)
    df_raw.columns = [c.strip() for c in df_raw.columns]
    df_raw["TotalCharges"] = pd.to_numeric(df_raw["TotalCharges"], errors="coerce")
    df_raw["SeniorCitizen"] = df_raw["SeniorCitizen"].astype("Int64")
    df_raw = df_raw.dropna(subset=["customerID"])
    df_raw["customerID"] = df_raw["customerID"].str.strip()
    
    # Select a few test customers
    test_customers = df_raw.head(5).to_dict('records')
    console.print(f"[green][OK] Prepared {len(test_customers)} test customers[/green]")
    
    # Display first customer
    console.print("\n[bold]Sample Customer Data:[/bold]")
    sample = test_customers[0]
    for key, value in list(sample.items())[:10]:
        console.print(f"  {key}: {value}")
else:
    console.print("[red][FAIL] Raw data not found. Creating sample data...[/red]")
    # Create sample data
    test_customers = [
        {
            "customerID": "1234-ABCD",
            "gender": "Male",
            "SeniorCitizen": 0,
            "Partner": "Yes",
            "Dependents": "No",
            "tenure": 12,
            "PhoneService": "Yes",
            "MultipleLines": "No",
            "InternetService": "DSL",
            "OnlineSecurity": "No",
            "OnlineBackup": "Yes",
            "DeviceProtection": "No",
            "TechSupport": "No",
            "StreamingTV": "No",
            "StreamingMovies": "No",
            "Contract": "Month-to-month",
            "PaperlessBilling": "Yes",
            "PaymentMethod": "Electronic check",
            "MonthlyCharges": 70.5,
            "TotalCharges": 845.0,
        }
    ]


[32m[OK] Loaded raw data: 7043 customers[0m
[32m[OK] Prepared 5 test customers[0m

[1mSample Customer Data:[0m
  customerID: 7590-VHVEG
  gender: Female
  SeniorCitizen: 0
  Partner: Yes
  Dependents: No
  tenure: 1
  PhoneService: No
  MultipleLines: No phone service
  InternetService: DSL
  OnlineSecurity: No


## 4. Start API Service

**Important**: The API service needs to be started separately in a terminal before running the tests below.

Run this command in a terminal:
```bash
python scripts/run_phase4_api.py --port 8001
# Or
make phase4
```

The API will be available at `http://localhost:8001`


## 5. Test API Endpoints

Now we'll test all the API endpoints to verify functionality.


In [None]:
# API base URL
API_BASE_URL = "http://localhost:8001"

def check_api_health() -> bool:
    """Check if API is running."""
    try:
        response = requests.get(f"{API_BASE_URL}/health", timeout=5)
        if response.status_code == 200:
            data = response.json()
            console.print(f"[green][OK] API is healthy[/green]")
            console.print(f"  Status: {data['status']}")
            console.print(f"  Model loaded: {data['model_loaded']}")
            console.print(f"  Pipeline loaded: {data['pipeline_loaded']}")
            return data['model_loaded'] and data['pipeline_loaded']
        else:
            console.print(f"[red][FAIL] API returned status {response.status_code}[/red]")
            return False
    except requests.exceptions.ConnectionError:
        console.print("[red][FAIL] Cannot connect to API. Is it running?[/red]")
        console.print("[yellow]  Start it with: python scripts/run_phase4_api.py --port 8001[/yellow]")
        return False
    except Exception as e:
        console.print(f"[red][FAIL] Error checking API: {e}[/red]")
        return False

# Check health
console.print("\n[bold cyan]Testing API Health...[/bold cyan]")
api_ready = check_api_health()



[1;36mTesting API Health...[0m
[31m[FAIL] Cannot connect to API. Is it running?[0m
[33m  Start it with: python scripts/run_phase4_api.py --port 8001[0m


In [None]:
# Get model metadata
if api_ready:
    console.print("\n[bold cyan]Fetching Model Metadata...[/bold cyan]")
    try:
        response = requests.get(f"{API_BASE_URL}/metadata", timeout=10)
        if response.status_code == 200:
            metadata = response.json()
            
            table = Table(title="Model Metadata")
            table.add_column("Property", style="cyan")
            table.add_column("Value", style="green")
            
            table.add_row("Model Type", metadata.get("model_type", "N/A"))
            table.add_row("Model Version", metadata.get("model_version", "N/A"))
            table.add_row("Feature Count", str(metadata.get("feature_count", 0)))
            table.add_row("Threshold", str(metadata.get("threshold", 0.5)))
            
            # Performance metrics
            perf_metrics = metadata.get("performance_metrics", {})
            for metric, value in perf_metrics.items():
                table.add_row(f"Performance: {metric}", f"{value:.4f}")
            
            console.print(table)
            
            # Show feature names (first 10)
            feature_names = metadata.get("feature_names", [])
            if feature_names:
                console.print(f"\n[dim]Features ({len(feature_names)} total): {', '.join(feature_names[:10])}...[/dim]")
        else:
            console.print(f"[red][FAIL] Failed to get metadata: {response.status_code}[/red]")
    except Exception as e:
        console.print(f"[red][FAIL] Error fetching metadata: {e}[/red]")
else:
    console.print("[yellow][WARN] Skipping metadata test - API not ready[/yellow]")


[33m[WARN] Skipping metadata test - API not ready[0m


## 6. Test Single Predictions

Test the `/predict` endpoint with individual customers.


In [None]:
if api_ready:
    console.print("\n[bold cyan]Testing Single Predictions...[/bold cyan]")
    
    for i, customer in enumerate(test_customers[:3], 1):
        console.print(f"\n[bold]Test Customer {i}: {customer.get('customerID', 'Unknown')}[/bold]")
        
        try:
            # Prepare request (ensure TotalCharges is handled)
            request_data = customer.copy()
            if pd.isna(request_data.get("TotalCharges")):
                request_data["TotalCharges"] = None
            else:
                request_data["TotalCharges"] = float(request_data["TotalCharges"])
            
            # Make prediction
            start_time = time.time()
            response = requests.post(
                f"{API_BASE_URL}/predict",
                json=request_data,
                timeout=30
            )
            elapsed_time = time.time() - start_time
            
            if response.status_code == 200:
                result = response.json()
                
                console.print(f"  [green][OK] Prediction successful ({elapsed_time:.3f}s)[/green]")
                console.print(f"  Churn Probability: {result['churn_probability']:.4f}")
                console.print(f"  Churn Prediction: {result['churn_prediction']}")
                console.print(f"  Threshold: {result['threshold']}")
                
                # Show top explanations
                explanations = result.get('explanation', [])
                if explanations:
                    console.print(f"  \n  Top Features:")
                    for exp in explanations[:5]:
                        shap_val = exp['shap_value']
                        sign = "+" if shap_val >= 0 else ""
                        console.print(f"    {exp['feature']}: {sign}{shap_val:.4f}")
                else:
                    console.print(f"  [yellow]  [WARN] No explanations available[/yellow]")
            else:
                console.print(f"  [red][FAIL] Prediction failed: {response.status_code}[/red]")
                console.print(f"  Error: {response.text}")
        except Exception as e:
            console.print(f"  [red][FAIL] Error: {e}[/red]")
else:
    console.print("[yellow][WARN] Skipping prediction tests - API not ready[/yellow]")


[33m[WARN] Skipping prediction tests - API not ready[0m


## 7. Test Batch Predictions

Test the `/predict/batch` endpoint with multiple customers.


In [None]:
if api_ready:
    console.print("\n[bold cyan]Testing Batch Predictions...[/bold cyan]")
    
    # Prepare batch request
    batch_customers = []
    for customer in test_customers[:5]:
        request_data = customer.copy()
        if pd.isna(request_data.get("TotalCharges")):
            request_data["TotalCharges"] = None
        else:
            request_data["TotalCharges"] = float(request_data["TotalCharges"])
        batch_customers.append(request_data)
    
    try:
        start_time = time.time()
        response = requests.post(
            f"{API_BASE_URL}/predict/batch",
            json={"customers": batch_customers},
            timeout=60
        )
        elapsed_time = time.time() - start_time
        
        if response.status_code == 200:
            result = response.json()
            
            console.print(f"[green][OK] Batch prediction successful ({elapsed_time:.3f}s)[/green]")
            console.print(f"  Total customers: {result['total_customers']}")
            console.print(f"  Predictions returned: {len(result['predictions'])}")
            
            # Show summary
            predictions = result['predictions']
            churn_count = sum(1 for p in predictions if p['churn_prediction'])
            avg_prob = np.mean([p['churn_probability'] for p in predictions])
            
            console.print(f"\n  Summary:")
            console.print(f"    Churn predictions: {churn_count}/{len(predictions)}")
            console.print(f"    Average probability: {avg_prob:.4f}")
            
            # Show individual results
            console.print(f"\n  Individual Results:")
            for pred in predictions:
                console.print(f"    {pred['customerID']}: {pred['churn_probability']:.4f} ({'Churn' if pred['churn_prediction'] else 'No Churn'})")
        else:
            console.print(f"[red][FAIL] Batch prediction failed: {response.status_code}[/red]")
            console.print(f"  Error: {response.text}")
    except Exception as e:
        console.print(f"[red][FAIL] Error: {e}[/red]")
else:
    console.print("[yellow][WARN] Skipping batch prediction tests - API not ready[/yellow]")


[33m[WARN] Skipping batch prediction tests - API not ready[0m


## 8. Validation Tests

Test input validation and error handling.


In [None]:
if api_ready:
    console.print("\n[bold cyan]Running Validation Tests...[/bold cyan]")
    
    test_results = []
    
    # Test 1: Invalid customer data
    console.print("\n[bold]Test 1: Invalid customer data[/bold]")
    try:
        invalid_data = {"customerID": "test", "tenure": -1}  # Invalid tenure
        response = requests.post(f"{API_BASE_URL}/predict", json=invalid_data, timeout=10)
        if response.status_code == 422:  # Validation error
            console.print("  [green][OK] Validation error correctly returned[/green]")
            test_results.append(("Validation", True))
        else:
            console.print(f"  [red][FAIL] Expected 422, got {response.status_code}[/red]")
            test_results.append(("Validation", False))
    except Exception as e:
        console.print(f"  [red][FAIL] Error: {e}[/red]")
        test_results.append(("Validation", False))
    
    # Test 2: Missing required fields
    console.print("\n[bold]Test 2: Missing required fields[/bold]")
    try:
        incomplete_data = {"customerID": "test"}
        response = requests.post(f"{API_BASE_URL}/predict", json=incomplete_data, timeout=10)
        if response.status_code == 422:
            console.print("  [green][OK] Missing fields correctly rejected[/green]")
            test_results.append(("Required Fields", True))
        else:
            console.print(f"  [red][FAIL] Expected 422, got {response.status_code}[/red]")
            test_results.append(("Required Fields", False))
    except Exception as e:
        console.print(f"  [red][FAIL] Error: {e}[/red]")
        test_results.append(("Required Fields", False))
    
    # Test 3: Empty batch
    console.print("\n[bold]Test 3: Empty batch request[/bold]")
    try:
        response = requests.post(f"{API_BASE_URL}/predict/batch", json={"customers": []}, timeout=10)
        if response.status_code == 422:
            console.print("  [green][OK] Empty batch correctly rejected[/green]")
            test_results.append(("Empty Batch", True))
        else:
            console.print(f"  [red][FAIL] Expected 422, got {response.status_code}[/red]")
            test_results.append(("Empty Batch", False))
    except Exception as e:
        console.print(f"  [red][FAIL] Error: {e}[/red]")
        test_results.append(("Empty Batch", False))
    
    # Summary
    console.print("\n[bold cyan]Validation Test Summary[/bold cyan]")
    passed = sum(1 for _, result in test_results if result)
    total = len(test_results)
    
    table = Table(title="Test Results")
    table.add_column("Test", style="cyan")
    table.add_column("Status", style="green")
    
    for test_name, result in test_results:
        status = "[green][OK] PASS[/green]" if result else "[red][FAIL] FAIL[/red]"
        table.add_row(test_name, status)
    
    console.print(table)
    console.print(f"\n[bold]Total: {passed}/{total} tests passed[/bold]")
else:
    console.print("[yellow][WARN] Skipping validation tests - API not ready[/yellow]")


[33m[WARN] Skipping validation tests - API not ready[0m


## 9. Performance Testing

Measure API latency and throughput.


In [None]:
if api_ready:
    console.print("\n[bold cyan]Performance Testing...[/bold cyan]")
    
    # Test latency
    num_requests = 10
    latencies = []
    
    test_customer = test_customers[0].copy()
    if pd.isna(test_customer.get("TotalCharges")):
        test_customer["TotalCharges"] = None
    else:
        test_customer["TotalCharges"] = float(test_customer["TotalCharges"])
    
    console.print(f"  Running {num_requests} requests...")
    
    for i in range(num_requests):
        try:
            start_time = time.time()
            response = requests.post(f"{API_BASE_URL}/predict", json=test_customer, timeout=30)
            elapsed = time.time() - start_time
            
            if response.status_code == 200:
                latencies.append(elapsed * 1000)  # Convert to ms
            else:
                console.print(f"  [yellow][WARN] Request {i+1} failed: {response.status_code}[/yellow]")
        except Exception as e:
            console.print(f"  [red][FAIL] Request {i+1} error: {e}[/red]")
    
    if latencies:
        avg_latency = np.mean(latencies)
        p50_latency = np.percentile(latencies, 50)
        p95_latency = np.percentile(latencies, 95)
        p99_latency = np.percentile(latencies, 99)
        
        console.print(f"\n  [green][OK] Performance Metrics:[/green]")
        console.print(f"    Average: {avg_latency:.2f} ms")
        console.print(f"    P50: {p50_latency:.2f} ms")
        console.print(f"    P95: {p95_latency:.2f} ms")
        console.print(f"    P99: {p99_latency:.2f} ms")
        
        # FAANG-level requirements
        if avg_latency < 100:
            console.print(f"    [green][OK] Average latency < 100ms (FAANG standard)[/green]")
        else:
            console.print(f"    [yellow][WARN] Average latency >= 100ms[/yellow]")
        
        if p95_latency < 200:
            console.print(f"    [green][OK] P95 latency < 200ms (FAANG standard)[/green]")
        else:
            console.print(f"    [yellow][WARN] P95 latency >= 200ms[/yellow]")
        
        # Create visualization
        import matplotlib.pyplot as plt
        plt.figure(figsize=(10, 6))
        plt.hist(latencies, bins=20, edgecolor='black', alpha=0.7)
        plt.axvline(avg_latency, color='red', linestyle='--', label=f'Mean: {avg_latency:.2f}ms')
        plt.axvline(p95_latency, color='orange', linestyle='--', label=f'P95: {p95_latency:.2f}ms')
        plt.xlabel('Latency (ms)')
        plt.ylabel('Frequency')
        plt.title('API Request Latency Distribution')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()
    else:
        console.print("  [red][FAIL] No successful requests for performance testing[/red]")
else:
    console.print("[yellow][WARN] Skipping performance tests - API not ready[/yellow]")


[33m[WARN] Skipping performance tests - API not ready[0m


In [None]:
console.print("\n" + "=" * 80)
console.print("[bold green]Phase 4 API Testing Complete![/bold green]")
console.print("=" * 80)

console.print("\n[bold cyan]Next Steps:[/bold cyan]")
console.print("  1. Review API documentation at http://localhost:8001/docs")
console.print("  2. Integrate API with frontend applications")
console.print("  3. Set up monitoring and logging (Phase 5)")
console.print("  4. Deploy to production using Docker")
console.print("  5. Implement streaming pipeline (optional)")

console.print("\n[bold cyan]API Endpoints:[/bold cyan]")
console.print("  - Health: http://localhost:8001/health")
console.print("  - Metadata: http://localhost:8001/metadata")
console.print("  - Predict: http://localhost:8001/predict")
console.print("  - Batch: http://localhost:8001/predict/batch")
console.print("  - Docs: http://localhost:8001/docs")

console.print("\n[bold cyan]Key Features Verified:[/bold cyan]")
console.print("  [OK] Health check endpoint")
console.print("  [OK] Model metadata endpoint")
console.print("  [OK] Single prediction endpoint")
console.print("  [OK] Batch prediction endpoint")
console.print("  [OK] Input validation")
console.print("  [OK] SHAP explanations")
console.print("  [OK] Error handling")



[1;32mPhase 4 API Testing Complete![0m

[1;36mNext Steps:[0m
  1. Review API documentation at http://localhost:8001/docs
  2. Integrate API with frontend applications
  3. Set up monitoring and logging (Phase 5)
  4. Deploy to production using Docker
  5. Implement streaming pipeline (optional)

[1;36mAPI Endpoints:[0m
  - Health: http://localhost:8001/health
  - Metadata: http://localhost:8001/metadata
  - Predict: http://localhost:8001/predict
  - Batch: http://localhost:8001/predict/batch
  - Docs: http://localhost:8001/docs

[1;36mKey Features Verified:[0m
  [OK] Health check endpoint
  [OK] Model metadata endpoint
  [OK] Single prediction endpoint
  [OK] Batch prediction endpoint
  [OK] Input validation
  [OK] SHAP explanations
  [OK] Error handling
