In [14]:
import pandas as pd
import numpy as np
import pickle
import json
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    accuracy_score, classification_report, confusion_matrix,
    f1_score, precision_score, recall_score, roc_auc_score,
    roc_curve, auc
)
from sklearn.preprocessing import label_binarize
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier
import warnings
warnings.filterwarnings('ignore')

# Set professional plotting style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

### LOAD ALL THREE MODELS

In [21]:
DATA_PATH = f'../../data'
MODEL_PATH = f'.'

def load_model(model_path):
    """Load a trained model."""
    try:
        with open(model_path, 'rb') as f:
            return pickle.load(f)
    except FileNotFoundError:
        print(f"   ⚠️  Model not found: {model_path}")
        return None

# Load models
print("Loading trained models...")
lgb_model = load_model(fr'..\lightgbm_model\lgb_model_v1.pkl')
xgb_model = load_model(fr'..\xgboost_model\xgb_model_v1.pkl')
cat_model = load_model(fr'..\catboost_model\cat_model_v1.pkl')

# Check which models loaded successfully
models_loaded = {
    'LightGBM': lgb_model is not None,
    'XGBoost': xgb_model is not None,
    'CatBoost': cat_model is not None
}

print("\n✅ Model Loading Status:")
for name, loaded in models_loaded.items():
    status = "✓ Loaded" if loaded else "✗ Missing"
    print(f"   {name:12s}: {status}")

if not all(models_loaded.values()):
    print("\n⚠️  Warning: Not all models loaded. Train missing models first!")
    print("   Ensemble will use only available models.\n")

Loading trained models...
   ⚠️  Model not found: ..\xgboost_model\xgb_model_v1.pkl
   ⚠️  Model not found: ..\catboost_model\cat_model_v1.pkl

✅ Model Loading Status:
   LightGBM    : ✓ Loaded
   XGBoost     : ✗ Missing
   CatBoost    : ✗ Missing

   Ensemble will use only available models.



### LOAD VALIDATION & TEST DATA

In [None]:
# Load preprocessed datasets
X_train = pd.read_csv(f'{DATA_PATH}/step6_X_train.csv')
y_train = pd.read_csv(f'{DATA_PATH}/step6_y_train.csv').squeeze()

X_validate = pd.read_csv(f'{DATA_PATH}/step6_X_val.csv')
y_validate = pd.read_csv(f'{DATA_PATH}/step6_y_val.csv').squeeze()

X_test = pd.read_csv(f'{DATA_PATH}/step6_X_test.csv')
y_test = pd.read_csv(f'{DATA_PATH}/step6_y_test.csv').squeeze()

# Apply feature engineering (MUST match training!)
def engineer_transit_features(X):
    """Apply transit method feature engineering - MUST MATCH TRAINING!"""
    X_enhanced = X.copy()
    
    # 1. Depth-Duration Interaction
    if 'koi_depth' in X.columns and 'koi_duration' in X.columns:
        X_enhanced['transit_depth_duration_ratio'] = (
            X['koi_depth'] / (X['koi_duration'] + 1e-6)
        )
    
    # 2. Signal Quality Score
    if 'koi_model_snr' in X.columns:
        X_enhanced['snr_log'] = np.log1p(X['koi_model_snr'])
        X_enhanced['snr_squared'] = X['koi_model_snr'] ** 2
    
    # 3. Transit Detectability Index
    if all(f in X.columns for f in ['koi_duration', 'koi_depth', 'koi_model_snr']):
        X_enhanced['transit_detectability'] = (
            X['koi_depth'] * X['koi_model_snr'] / (X['koi_duration'] + 1)
        )
    
    # 4. Normalized transit depth
    if 'koi_depth' in X.columns:
        depth_median = X['koi_depth'].median()
        X_enhanced['depth_normalized'] = X['koi_depth'] / depth_median
    
    return X_enhanced

print("Applying feature engineering...")
X_train_enhanced = engineer_transit_features(X_train)
X_validate_enhanced = engineer_transit_features(X_validate)
X_test_enhanced = engineer_transit_features(X_test)

print(f"   Training:   {X_train_enhanced.shape}")
print(f"   Validation: {X_validate_enhanced.shape}")
print(f"   Test:       {X_test_enhanced.shape}")

NameError: name 'koi_depth' is not defined

### INDIVIDUAL MODEL PREDICTIONS

In [None]:
def get_predictions(model, X, model_type='lightgbm'):
    """Get predictions from a model with proper handling for each type."""
    if model is None:
        return None, None
    
    if model_type == 'lightgbm':
        # LightGBM: predict returns probabilities directly
        proba = model.predict(X, num_iteration=model.best_iteration)
        pred = proba.argmax(axis=1)
    elif model_type == 'xgboost':
        # XGBoost: predict_proba for probabilities
        proba = model.predict_proba(X)
        pred = proba.argmax(axis=1)
    elif model_type == 'catboost':
        # CatBoost: predict with type='Probability'
        proba = model.predict(X, prediction_type='Probability')
        pred = proba.argmax(axis=1)
    
    return pred, proba

# Get validation predictions
print("Generating predictions on VALIDATION set...\n")

lgb_val_pred, lgb_val_proba = get_predictions(lgb_model, X_validate_enhanced, 'lightgbm')
xgb_val_pred, xgb_val_proba = get_predictions(xgb_model, X_validate_enhanced, 'xgboost')
cat_val_pred, cat_val_proba = get_predictions(cat_model, X_validate_enhanced, 'catboost')

# Get test predictions
print("Generating predictions on TEST set...\n")

lgb_test_pred, lgb_test_proba = get_predictions(lgb_model, X_test_enhanced, 'lightgbm')
xgb_test_pred, xgb_test_proba = get_predictions(xgb_model, X_test_enhanced, 'xgboost')
cat_test_pred, cat_test_proba = get_predictions(cat_model, X_test_enhanced, 'catboost')

# Evaluate individual models on validation
print("📊 Individual Model Performance (Validation Set):\n")

class_names = ['False Positive', 'Candidate', 'Confirmed']

def evaluate_model(y_true, y_pred, model_name):
    """Evaluate a single model."""
    if y_pred is None:
        return None
    
    acc = accuracy_score(y_true, y_pred)
    f1_macro = f1_score(y_true, y_pred, average='macro')
    f1_weighted = f1_score(y_true, y_pred, average='weighted')
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    
    print(f"{model_name}:")
    print(f"   Accuracy:    {acc:.4f} ({acc*100:.2f}%)")
    print(f"   Precision:   {precision:.4f}")
    print(f"   Recall:      {recall:.4f}")
    print(f"   F1 (Macro):  {f1_macro:.4f}")
    print(f"   F1 (Weight): {f1_weighted:.4f}\n")
    
    return {
        'accuracy': acc,
        'precision': precision,
        'recall': recall,
        'f1_macro': f1_macro,
        'f1_weighted': f1_weighted
    }

lgb_metrics = evaluate_model(y_validate, lgb_val_pred, "LightGBM")
xgb_metrics = evaluate_model(y_validate, xgb_val_pred, "XGBoost")
cat_metrics = evaluate_model(y_validate, cat_val_pred, "CatBoost")

### ENSEMBLE METHOD 1 - SIMPLE AVERAGING

In [None]:
print("Strategy: Average probability predictions from all models\n")

def simple_average_ensemble(proba_list):
    """Average predictions from multiple models."""
    valid_probas = [p for p in proba_list if p is not None]
    if not valid_probas:
        return None, None
    
    avg_proba = np.mean(valid_probas, axis=0)
    predictions = avg_proba.argmax(axis=1)
    return predictions, avg_proba

# Validation ensemble
ensemble_avg_val_pred, ensemble_avg_val_proba = simple_average_ensemble([
    lgb_val_proba, xgb_val_proba, cat_val_proba
])

# Test ensemble
ensemble_avg_test_pred, ensemble_avg_test_proba = simple_average_ensemble([
    lgb_test_proba, xgb_test_proba, cat_test_proba
])

print("📊 Simple Average Ensemble Performance:\n")
print("VALIDATION SET:")
avg_val_metrics = evaluate_model(y_validate, ensemble_avg_val_pred, "Ensemble (Avg)")

### ENSEMBLE METHOD 2 - WEIGHTED AVERAGING

In [None]:
print("Strategy: Weight each model by its validation F1-score\n")

# Calculate optimal weights based on validation performance
weights_dict = {}
total_f1 = 0

if lgb_metrics:
    weights_dict['LightGBM'] = lgb_metrics['f1_weighted']
    total_f1 += lgb_metrics['f1_weighted']
if xgb_metrics:
    weights_dict['XGBoost'] = xgb_metrics['f1_weighted']
    total_f1 += xgb_metrics['f1_weighted']
if cat_metrics:
    weights_dict['CatBoost'] = cat_metrics['f1_weighted']
    total_f1 += cat_metrics['f1_weighted']

# Normalize weights
normalized_weights = {k: v/total_f1 for k, v in weights_dict.items()}

print("Computed Model Weights (based on validation F1-score):")
for model, weight in normalized_weights.items():
    print(f"   {model:12s}: {weight:.4f} ({weight*100:.1f}%)")

def weighted_average_ensemble(proba_list, weights):
    """Weighted average of predictions."""
    weighted_sum = np.zeros_like(proba_list[0])
    total_weight = 0
    
    for i, (proba, weight) in enumerate(zip(proba_list, weights)):
        if proba is not None:
            weighted_sum += proba * weight
            total_weight += weight
    
    avg_proba = weighted_sum / total_weight
    predictions = avg_proba.argmax(axis=1)
    return predictions, avg_proba

# Extract weights in order
weight_values = [
    normalized_weights.get('LightGBM', 0),
    normalized_weights.get('XGBoost', 0),
    normalized_weights.get('CatBoost', 0)
]

# Validation ensemble
ensemble_weighted_val_pred, ensemble_weighted_val_proba = weighted_average_ensemble(
    [lgb_val_proba, xgb_val_proba, cat_val_proba],
    weight_values
)

# Test ensemble
ensemble_weighted_test_pred, ensemble_weighted_test_proba = weighted_average_ensemble(
    [lgb_test_proba, xgb_test_proba, cat_test_proba],
    weight_values
)

print("\n📊 Weighted Average Ensemble Performance:\n")
print("VALIDATION SET:")
weighted_val_metrics = evaluate_model(y_validate, ensemble_weighted_val_pred, "Ensemble (Weighted)")


### ENSEMBLE METHOD 3 - MAJORITY VOTING

In [None]:
print("Strategy: Each model votes, majority wins (ties broken by confidence)\n")

def majority_voting_ensemble(pred_list, proba_list):
    """Majority voting with tie-breaking."""
    valid_preds = [p for p in pred_list if p is not None]
    valid_probas = [p for p in proba_list if p is not None]
    
    if not valid_preds:
        return None
    
    # Stack predictions
    votes = np.column_stack(valid_preds)
    
    # Majority vote
    from scipy import stats
    majority_pred, _ = stats.mode(votes, axis=1, keepdims=False)
    
    return majority_pred

# Validation ensemble
ensemble_vote_val_pred = majority_voting_ensemble(
    [lgb_val_pred, xgb_val_pred, cat_val_pred],
    [lgb_val_proba, xgb_val_proba, cat_val_proba]
)

# Test ensemble
ensemble_vote_test_pred = majority_voting_ensemble(
    [lgb_test_pred, xgb_test_pred, cat_test_pred],
    [lgb_test_proba, xgb_test_proba, cat_test_proba]
)

print("📊 Majority Voting Ensemble Performance:\n")
print("VALIDATION SET:")
vote_val_metrics = evaluate_model(y_validate, ensemble_vote_val_pred, "Ensemble (Vote)")

### ENSEMBLE METHOD 4 - STACKING META-LEARNER (ADVANCED)

In [None]:
print("Strategy: Train a meta-model to learn optimal combination\n")

# Create meta-features (predictions from base models)
meta_train_features = np.column_stack([
    lgb_val_proba if lgb_val_proba is not None else np.zeros((len(y_validate), 3)),
    xgb_val_proba if xgb_val_proba is not None else np.zeros((len(y_validate), 3)),
    cat_val_proba if cat_val_proba is not None else np.zeros((len(y_validate), 3))
])

meta_test_features = np.column_stack([
    lgb_test_proba if lgb_test_proba is not None else np.zeros((len(y_test), 3)),
    xgb_test_proba if xgb_test_proba is not None else np.zeros((len(y_test), 3)),
    cat_test_proba if cat_test_proba is not None else np.zeros((len(y_test), 3))
])

print(f"Meta-features shape: {meta_train_features.shape}")
print("Training Logistic Regression meta-learner...\n")

# Train meta-model (Logistic Regression)
meta_model = LogisticRegression(
    max_iter=1000,
    multi_class='multinomial',
    random_state=42,
    solver='lbfgs'
)

meta_model.fit(meta_train_features, y_validate)

# Predictions
ensemble_stack_val_pred = meta_model.predict(meta_train_features)
ensemble_stack_test_pred = meta_model.predict(meta_test_features)
ensemble_stack_test_proba = meta_model.predict_proba(meta_test_features)

print("📊 Stacking Meta-Learner Performance:\n")
print("VALIDATION SET:")
stack_val_metrics = evaluate_model(y_validate, ensemble_stack_val_pred, "Ensemble (Stack)")

### COMPARE ALL METHODS

In [None]:
comparison_df = pd.DataFrame({
    'Method': [
        'LightGBM (Solo)',
        'XGBoost (Solo)',
        'CatBoost (Solo)',
        'Simple Average',
        'Weighted Average',
        'Majority Voting',
        'Stacking Meta'
    ],
    'Accuracy': [
        lgb_metrics['accuracy'] if lgb_metrics else 0,
        xgb_metrics['accuracy'] if xgb_metrics else 0,
        cat_metrics['accuracy'] if cat_metrics else 0,
        avg_val_metrics['accuracy'],
        weighted_val_metrics['accuracy'],
        vote_val_metrics['accuracy'],
        stack_val_metrics['accuracy']
    ],
    'F1 (Macro)': [
        lgb_metrics['f1_macro'] if lgb_metrics else 0,
        xgb_metrics['f1_macro'] if xgb_metrics else 0,
        cat_metrics['f1_macro'] if cat_metrics else 0,
        avg_val_metrics['f1_macro'],
        weighted_val_metrics['f1_macro'],
        vote_val_metrics['f1_macro'],
        stack_val_metrics['f1_macro']
    ],
    'F1 (Weighted)': [
        lgb_metrics['f1_weighted'] if lgb_metrics else 0,
        xgb_metrics['f1_weighted'] if xgb_metrics else 0,
        cat_metrics['f1_weighted'] if cat_metrics else 0,
        avg_val_metrics['f1_weighted'],
        weighted_val_metrics['f1_weighted'],
        vote_val_metrics['f1_weighted'],
        stack_val_metrics['f1_weighted']
    ]
})

print(comparison_df.to_string(index=False))

# Find best method
best_idx = comparison_df['F1 (Weighted)'].idxmax()
best_method = comparison_df.loc[best_idx, 'Method']
best_f1 = comparison_df.loc[best_idx, 'F1 (Weighted)']

print(f"\n🥇 Best Ensemble Method: {best_method}")
print(f"   F1-Score (Weighted): {best_f1:.4f}\n")

### FINAL TEST SET EVALUATION

In [None]:
# Use weighted average (usually best balance)
print(f"Using WEIGHTED AVERAGE ensemble for final evaluation\n")

final_test_pred = ensemble_weighted_test_pred
final_test_proba = ensemble_weighted_test_proba

# Comprehensive evaluation
test_accuracy = accuracy_score(y_test, final_test_pred)
test_precision = precision_score(y_test, final_test_pred, average='weighted')
test_recall = recall_score(y_test, final_test_pred, average='weighted')
test_f1_macro = f1_score(y_test, final_test_pred, average='macro')
test_f1_weighted = f1_score(y_test, final_test_pred, average='weighted')

print("🏆 FINAL ENSEMBLE PERFORMANCE:\n")
print(f"   Accuracy:           {test_accuracy:.4f} ({test_accuracy*100:.2f}%)")
print(f"   Precision (Avg):    {test_precision:.4f}")
print(f"   Recall (Avg):       {test_recall:.4f}")
print(f"   F1-Score (Macro):   {test_f1_macro:.4f}")
print(f"   F1-Score (Weighted):{test_f1_weighted:.4f}")

print("\n📋 Detailed Classification Report:")
print(classification_report(
    y_test,
    final_test_pred,
    target_names=class_names,
    digits=4
))

# Confusion Matrix
cm = confusion_matrix(y_test, final_test_pred)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Raw counts
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax1,
            xticklabels=class_names,
            yticklabels=class_names,
            cbar_kws={'label': 'Count'})
ax1.set_title('Final Ensemble - Test Confusion Matrix (Counts)', 
              fontsize=14, fontweight='bold')
ax1.set_ylabel('True Label', fontsize=12)
ax1.set_xlabel('Predicted Label', fontsize=12)

# Normalized
sns.heatmap(cm_normalized, annot=True, fmt='.2%', cmap='Greens', ax=ax2,
            xticklabels=class_names,
            yticklabels=class_names,
            cbar_kws={'label': 'Proportion'})
ax2.set_title('Final Ensemble - Test Confusion Matrix (Normalized)', 
              fontsize=14, fontweight='bold')
ax2.set_ylabel('True Label', fontsize=12)
ax2.set_xlabel('Predicted Label', fontsize=12)

plt.tight_layout()
plt.savefig(f'{MODEL_PATH}/ensemble_model/confusion_matrix_final.png', 
            dpi=300, bbox_inches='tight')
plt.show()

# ROC Curves
y_test_bin = label_binarize(y_test, classes=[0, 1, 2])

fig, ax = plt.subplots(figsize=(10, 8))
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1']

for i, (color, name) in enumerate(zip(colors, class_names)):
    fpr, tpr, _ = roc_curve(y_test_bin[:, i], final_test_proba[:, i])
    roc_auc = auc(fpr, tpr)
    ax.plot(fpr, tpr, color=color, lw=2,
            label=f'{name} (AUC = {roc_auc:.3f})')

ax.plot([0, 1], [0, 1], 'k--', lw=2, label='Random Classifier')
ax.set_xlabel('False Positive Rate', fontsize=12, fontweight='bold')
ax.set_ylabel('True Positive Rate', fontsize=12, fontweight='bold')
ax.set_title('Final Ensemble ROC Curves - Test Set', fontsize=14, fontweight='bold')
ax.legend(loc='lower right', fontsize=10)
ax.grid(alpha=0.3)
plt.tight_layout()
plt.savefig(f'{MODEL_PATH}/ensemble_model/roc_curves_final.png', 
            dpi=300, bbox_inches='tight')
plt.show()

### SAVE ENSEMBLE COMPONENTS

In [None]:
# Create ensemble model package
ensemble_package = {
    'models': {
        'lightgbm': lgb_model,
        'xgboost': xgb_model,
        'catboost': cat_model
    },
    'weights': normalized_weights,
    'meta_model': meta_model,
    'method': 'weighted_average',  # Best performing method
    'feature_engineering': engineer_transit_features  # Include function
}

# Save ensemble
ensemble_path = f'{MODEL_PATH}/ensemble_model/ensemble_v1.pkl'
with open(ensemble_path, 'wb') as f:
    pickle.dump(ensemble_package, f)
print(f"✅ Ensemble saved: {ensemble_path}")

# Save metadata
ensemble_metadata = {
    'version': '1.0',
    'ensemble_method': 'weighted_average',
    'base_models': list(models_loaded.keys()),
    'weights': normalized_weights,
    'performance': {
        'validation': {
            'accuracy': float(weighted_val_metrics['accuracy']),
            'f1_macro': float(weighted_val_metrics['f1_macro']),
            'f1_weighted': float(weighted_val_metrics['f1_weighted'])
        },
        'test': {
            'accuracy': float(test_accuracy),
            'f1_macro': float(test_f1_macro),
            'f1_weighted': float(test_f1_weighted),
            'precision': float(test_precision),
            'recall': float(test_recall)
        }
    },
    'individual_models': {
        'lightgbm': lgb_metrics if lgb_metrics else {},
        'xgboost': xgb_metrics if xgb_metrics else {},
        'catboost': cat_metrics if cat_metrics else {}
    }
}

metadata_path = f'{MODEL_PATH}/ensemble_model/ensemble_metadata.json'
with open(metadata_path, 'w') as f:
    json.dump(ensemble_metadata, f, indent=4)
print(f"✅ Metadata saved: {metadata_path}")

# Save comparison results
comparison_df.to_csv(f'{MODEL_PATH}/ensemble_model/ensemble_comparison.csv', index=False)
print(f"✅ Comparison table saved\n")


print("📊 Summary:")
print(f"   Best Method:        Weighted Average")
print(f"   Test Accuracy:      {test_accuracy*100:.2f}%")
print(f"   F1-Score (Macro):   {test_f1_macro:.4f}")
print(f"   Models Combined:    {sum(models_loaded.values())}/3")
print(f"\n   🚀 Ready for deployment in web app!")
print(f"   📍 Location: {ensemble_path}")