# Recall Optimization for Exoplanet Classifier

> Goal: Increase recall to 98% while maintaining reasonable precision

## Strategies:
1. **Threshold Optimization**: Find threshold that gives 98% recall
2. **Class Weighting**: Balance classes to favor recall
3. **Cost-Sensitive Learning**: Penalize false negatives more
4. **Feature Engineering**: Add recall-focused features
5. **Ensemble Methods**: Combine multiple models optimized for recall


In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.metrics import (roc_auc_score, average_precision_score, balanced_accuracy_score,
                             precision_recall_curve, confusion_matrix, brier_score_loss,
                             classification_report, recall_score, precision_score, f1_score)
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import HistGradientBoostingClassifier, RandomForestClassifier
from sklearn.inspection import permutation_importance
from sklearn.calibration import calibration_curve
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import VotingClassifier
import joblib, json, math
import warnings
warnings.filterwarnings('ignore')

sns.set_theme(context='notebook', style='dark')


In [None]:
# Load the data and preprocessing (same as before)
df = pd.read_csv('data/processed/exoplanets_unified_derived_imputed.csv')

# Target encoding
POSITIVE_SET = {'CONFIRMED', 'CANDIDATE'}
df['disposition_upper'] = df.get('disposition', 'UNKNOWN').fillna('UNKNOWN').str.upper()
y = df['disposition_upper'].apply(lambda v: 1 if v in POSITIVE_SET else 0)

print(f'Positive Rate: {y.mean():.3f}')
print(f'Total samples: {len(df)}')
print(f'Positive samples: {y.sum()}')


In [None]:
# Load existing models and get baseline predictions
hgb = joblib.load('./models/hgb_baseline.pkl')
rf = joblib.load('./models/rf_model.pkl')
imp = joblib.load('./models/imputer.pkl')

# Feature selection (same as before)
drop_prefixes = ['orbital_period_err', 'planet_radius_re_err', 'transit_depth_err',
                 'stellar_teff_err', 'stellar_radius_err', 'stellar_mass_err']

raw_duplicate = {'planet_eq_temp_k', 'planet_insolation_earth'}
id_like = {'host_name', 'planet_name', 'source_catalog', 'disposition', 'disposition_upper'}

features = []
for c in df.columns:
    if c in id_like or c in raw_duplicate:
        continue
    if any(c.startswith(p) for p in drop_prefixes):
        continue
    features.append(c)

# Separate numeric vs categorical
numeric_cols = [c for c in features if pd.api.types.is_numeric_dtype(df[c])]
categorical_cols = [c for c in features if c not in numeric_cols]

print(f'Features: {len(features)} (Numeric: {len(numeric_cols)}, Categorical: {len(categorical_cols)})')


In [None]:
# One-hot encode categorical features
low_card_threshold = 12
encoded_parts = []

for c in categorical_cols:
    card = df[c].nunique(dropna=True)
    if card <= low_card_threshold:
        dummies = pd.get_dummies(df[c], prefix=c, dummy_na=True)
        encoded_parts.append(dummies)

if encoded_parts:
    X_cat = pd.concat(encoded_parts, axis=1)
else:
    X_cat = pd.DataFrame(index=df.index)

X_num = df[numeric_cols].copy()
X = pd.concat([X_num, X_cat], axis=1)

# Remove constant columns
const_cols = [c for c in X.columns if X[c].nunique(dropna=True) <= 1]
X = X.drop(columns=const_cols)

print(f'Final feature matrix shape: {X.shape}')


In [None]:
# Train-test split (same as original)
train_idx, test_idx = train_test_split(X.index, test_size=0.20, stratify=y, random_state=32)
X_train, X_test = X.loc[train_idx], X.loc[test_idx]
y_train, y_test = y.loc[train_idx], y.loc[test_idx]

# Impute missing values
X_train_imp = pd.DataFrame(imp.fit_transform(X_train), columns=X_train.columns, index=X_train.index)
X_test_imp = pd.DataFrame(imp.transform(X_test), columns=X_test.columns, index=X_test.index)

print(f'Train: {X_train_imp.shape}, Test: {X_test_imp.shape}')

# Get baseline predictions
prob_hgb = hgb.predict_proba(X_test_imp)[:, 1]
prob_rf = rf.predict_proba(X_test_imp)[:, 1]
prob_ensemble = (prob_hgb + prob_rf) / 2

print("Baseline predictions obtained")


## Strategy 1: Threshold Optimization for 98% Recall


In [None]:
# Find threshold for 98% recall
def find_threshold_for_recall(y_true, y_proba, target_recall=0.98, min_precision=0.1):
    """Find threshold that achieves target recall with minimum precision constraint"""
    prec, rec, thr = precision_recall_curve(y_true, y_proba)
    
    # Find points with recall >= target_recall and precision >= min_precision
    valid_indices = (rec >= target_recall) & (prec >= min_precision)
    
    if not valid_indices.any():
        # If no valid point, find highest recall with any precision
        best_idx = np.argmax(rec)
        return {
            'threshold': thr[best_idx] if best_idx < len(thr) else 0.0,
            'recall': rec[best_idx],
            'precision': prec[best_idx],
            'f1': 2 * (prec[best_idx] * rec[best_idx]) / (prec[best_idx] + rec[best_idx]) if (prec[best_idx] + rec[best_idx]) > 0 else 0
        }
    
    # Among valid points, choose the one with highest precision
    valid_indices = np.where(valid_indices)[0]
    best_idx = valid_indices[np.argmax(prec[valid_indices])]
    
    return {
        'threshold': thr[best_idx] if best_idx < len(thr) else 0.0,
        'recall': rec[best_idx],
        'precision': prec[best_idx],
        'f1': 2 * (prec[best_idx] * rec[best_idx]) / (prec[best_idx] + rec[best_idx]) if (prec[best_idx] + rec[best_idx]) > 0 else 0
    }

# Test different models
models = {
    'HGB': prob_hgb,
    'Random Forest': prob_rf,
    'Ensemble': prob_ensemble
}

threshold_results = {}
for name, prob in models.items():
    result = find_threshold_for_recall(y_test, prob, target_recall=0.98, min_precision=0.1)
    threshold_results[name] = result
    print(f"{name}: Threshold={result['threshold']:.4f}, Recall={result['recall']:.3f}, Precision={result['precision']:.3f}")


## Strategy 2: Class Weighting for Recall Optimization


In [None]:
# Calculate class weights for recall optimization
from sklearn.utils.class_weight import compute_class_weight

# Standard class weights (balanced)
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weight_dict = {0: class_weights[0], 1: class_weights[1]}

# Recall-focused weights (heavily favor positive class)
recall_weights = {0: 1.0, 1: 10.0}  # 10x weight for positive class
extreme_recall_weights = {0: 1.0, 1: 20.0}  # 20x weight for positive class

print(f"Balanced weights: {class_weight_dict}")
print(f"Recall-focused weights: {recall_weights}")
print(f"Extreme recall weights: {extreme_recall_weights}")

# Train models with different class weights
weight_strategies = {
    'Balanced': class_weight_dict,
    'Recall-Focused': recall_weights,
    'Extreme-Recall': extreme_recall_weights
}

weighted_models = {}
for strategy, weights in weight_strategies.items():
    # HGB with class weights
    hgb_weighted = HistGradientBoostingClassifier(
        random_state=32,
        class_weight=weights
    )
    hgb_weighted.fit(X_train_imp, y_train)
    
    # RF with class weights
    rf_weighted = RandomForestClassifier(
        n_estimators=600,
        random_state=32,
        class_weight=weights,
        n_jobs=-1
    )
    rf_weighted.fit(X_train_imp, y_train)
    
    # Get predictions
    prob_hgb_w = hgb_weighted.predict_proba(X_test_imp)[:, 1]
    prob_rf_w = rf_weighted.predict_proba(X_test_imp)[:, 1]
    prob_ensemble_w = (prob_hgb_w + prob_rf_w) / 2
    
    weighted_models[strategy] = {
        'hgb': hgb_weighted,
        'rf': rf_weighted,
        'prob_hgb': prob_hgb_w,
        'prob_rf': prob_rf_w,
        'prob_ensemble': prob_ensemble_w
    }
    
    # Find threshold for 98% recall
    result = find_threshold_for_recall(y_test, prob_ensemble_w, target_recall=0.98, min_precision=0.1)
    print(f"{strategy}: Threshold={result['threshold']:.4f}, Recall={result['recall']:.3f}, Precision={result['precision']:.3f}")


## Strategy 3: Cost-Sensitive Learning with Custom Loss


In [None]:
# Custom cost matrix: heavily penalize false negatives
cost_matrix = {
    'tn': 0,    # True Negative (correctly predicted negative)
    'fp': 1,    # False Positive (predicted positive, actual negative) - minor cost
    'fn': 50,   # False Negative (predicted negative, actual positive) - HIGH COST
    'tp': 0     # True Positive (correctly predicted positive)
}

print(f"Cost matrix: {cost_matrix}")
print(f"FN cost is {cost_matrix['fn']}x higher than FP cost")

# Train models with cost-sensitive approach
# For HGB, we'll use extreme class weights to simulate cost-sensitive learning
cost_sensitive_weights = {
    0: 1.0,   # Normal weight for negative class
    1: cost_matrix['fn']  # High weight for positive class (matches FN cost)
}

hgb_cost = HistGradientBoostingClassifier(
    random_state=32,
    class_weight=cost_sensitive_weights
)
hgb_cost.fit(X_train_imp, y_train)

rf_cost = RandomForestClassifier(
    n_estimators=600,
    random_state=32,
    class_weight=cost_sensitive_weights,
    n_jobs=-1
)
rf_cost.fit(X_train_imp, y_train)

# Get predictions
prob_hgb_cost = hgb_cost.predict_proba(X_test_imp)[:, 1]
prob_rf_cost = rf_cost.predict_proba(X_test_imp)[:, 1]
prob_ensemble_cost = (prob_hgb_cost + prob_rf_cost) / 2

# Find threshold for 98% recall
result_cost = find_threshold_for_recall(y_test, prob_ensemble_cost, target_recall=0.98, min_precision=0.1)
print(f"Cost-Sensitive: Threshold={result_cost['threshold']:.4f}, Recall={result_cost['recall']:.3f}, Precision={result_cost['precision']:.3f}")


## Strategy 4: Feature Engineering for Recall


In [None]:
# Add recall-focused features
def add_recall_features(df):
    """Add features that are highly correlated with positive class"""
    df_enhanced = df.copy()
    
    # High-confidence positive indicators
    if 'transit_depth_ppm' in df.columns and 'planet_radius_re' in df.columns:
        # Strong transit signal
        df_enhanced['strong_transit_signal'] = (
            (df['transit_depth_ppm'] > df['transit_depth_ppm'].quantile(0.7)) &
            (df['planet_radius_re'] > 0.5) & (df['planet_radius_re'] < 5.0)
        ).astype(int)
    
    # Habitable zone indicators
    if 'planet_insolation_earth' in df.columns:
        df_enhanced['in_habitable_zone'] = (
            (df['planet_insolation_earth'] > 0.3) & (df['planet_insolation_earth'] < 3.0)
        ).astype(int)
    
    # Stellar characteristics favorable for planet detection
    if 'stellar_teff_k' in df.columns:
        df_enhanced['favorable_stellar_type'] = (
            (df['stellar_teff_k'] > 4000) & (df['stellar_teff_k'] < 7000)
        ).astype(int)
    
    # Orbital period in detection-friendly range
    if 'orbital_period_days' in df.columns:
        df_enhanced['detection_friendly_period'] = (
            (df['orbital_period_days'] > 10) & (df['orbital_period_days'] < 500)
        ).astype(int)
    
    # Combined confidence score
    recall_features = ['strong_transit_signal', 'in_habitable_zone', 'favorable_stellar_type', 'detection_friendly_period']
    available_features = [f for f in recall_features if f in df_enhanced.columns]
    
    if available_features:
        df_enhanced['recall_confidence_score'] = df_enhanced[available_features].sum(axis=1)
    
    return df_enhanced

# Apply feature engineering
df_enhanced = add_recall_features(df)
new_features = [col for col in df_enhanced.columns if col not in df.columns]
print(f"Added {len(new_features)} recall-focused features: {new_features}")

# Update feature matrix with enhanced features
X_enhanced = X.copy()
for feature in new_features:
    if feature in df_enhanced.columns:
        X_enhanced[feature] = df_enhanced[feature]

# Update train-test split
X_train_enhanced, X_test_enhanced = X_enhanced.loc[train_idx], X_enhanced.loc[test_idx]

# Impute enhanced features
X_train_enhanced_imp = pd.DataFrame(
    imp.fit_transform(X_train_enhanced), 
    columns=X_train_enhanced.columns, 
    index=X_train_enhanced.index
)
X_test_enhanced_imp = pd.DataFrame(
    imp.transform(X_test_enhanced), 
    columns=X_test_enhanced.columns, 
    index=X_test_enhanced.index
)

print(f"Enhanced feature matrix: {X_enhanced.shape}")
print(f"New features added: {list(new_features)}")


In [None]:
# Train models with enhanced features and recall-focused weights
hgb_enhanced = HistGradientBoostingClassifier(
    random_state=32,
    class_weight=cost_sensitive_weights
)
hgb_enhanced.fit(X_train_enhanced_imp, y_train)

rf_enhanced = RandomForestClassifier(
    n_estimators=600,
    random_state=32,
    class_weight=cost_sensitive_weights,
    n_jobs=-1
)
rf_enhanced.fit(X_train_enhanced_imp, y_train)

# Get predictions
prob_hgb_enhanced = hgb_enhanced.predict_proba(X_test_enhanced_imp)[:, 1]
prob_rf_enhanced = rf_enhanced.predict_proba(X_test_enhanced_imp)[:, 1]
prob_ensemble_enhanced = (prob_hgb_enhanced + prob_rf_enhanced) / 2

# Find threshold for 98% recall
result_enhanced = find_threshold_for_recall(y_test, prob_ensemble_enhanced, target_recall=0.98, min_precision=0.1)
print(f"Enhanced Features: Threshold={result_enhanced['threshold']:.4f}, Recall={result_enhanced['recall']:.3f}, Precision={result_enhanced['precision']:.3f}")


## Strategy 5: Advanced Ensemble with Recall Focus


In [None]:
# Create advanced ensemble with multiple models optimized for recall
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression

# Individual models with different strategies
models_recall = [
    ('hgb_recall', HistGradientBoostingClassifier(random_state=32, class_weight=cost_sensitive_weights)),
    ('rf_recall', RandomForestClassifier(n_estimators=600, random_state=32, class_weight=cost_sensitive_weights, n_jobs=-1)),
    ('logit_recall', LogisticRegression(random_state=32, class_weight=cost_sensitive_weights, max_iter=1000))
]

# Voting ensemble
ensemble_recall = VotingClassifier(models_recall, voting='soft')
ensemble_recall.fit(X_train_enhanced_imp, y_train)

# Get predictions
prob_ensemble_advanced = ensemble_recall.predict_proba(X_test_enhanced_imp)[:, 1]

# Find threshold for 98% recall
result_advanced = find_threshold_for_recall(y_test, prob_ensemble_advanced, target_recall=0.98, min_precision=0.1)
print(f"Advanced Ensemble: Threshold={result_advanced['threshold']:.4f}, Recall={result_advanced['recall']:.3f}, Precision={result_advanced['precision']:.3f}")


## Results Comparison and Analysis


In [None]:
# Compare all strategies
results_comparison = {
    'Original Ensemble': threshold_results['Ensemble'],
    'Cost-Sensitive': result_cost,
    'Enhanced Features': result_enhanced,
    'Advanced Ensemble': result_advanced
}

print("\n=== RECALL OPTIMIZATION RESULTS ===")
print("Strategy\t\t\tThreshold\tRecall\t\tPrecision\tF1-Score")
print("-" * 80)

for strategy, result in results_comparison.items():
    print(f"{strategy:<20}\t{result['threshold']:.4f}\t\t{result['recall']:.3f}\t\t{result['precision']:.3f}\t\t{result['f1']:.3f}")

# Find best strategy for 98% recall
target_recall = 0.98
best_strategy = None
best_precision = 0

for strategy, result in results_comparison.items():
    if result['recall'] >= target_recall and result['precision'] > best_precision:
        best_strategy = strategy
        best_precision = result['precision']

print(f"\n=== BEST STRATEGY FOR 98% RECALL ===")
if best_strategy:
    best_result = results_comparison[best_strategy]
    print(f"Strategy: {best_strategy}")
    print(f"Threshold: {best_result['threshold']:.4f}")
    print(f"Recall: {best_result['recall']:.3f}")
    print(f"Precision: {best_result['precision']:.3f}")
    print(f"F1-Score: {best_result['f1']:.3f}")
else:
    print("No strategy achieved exactly 98% recall. Showing closest:")
    closest_strategy = max(results_comparison.items(), key=lambda x: x[1]['recall'])
    print(f"Strategy: {closest_strategy[0]}")
    print(f"Recall: {closest_strategy[1]['recall']:.3f}")
