# Malnutrition Model Enhancements

This notebook implements critical improvements to make the model production-ready:

1. WHO Standards Integration
2. Enhanced Feature Engineering
3. Confidence Scoring
4. Comprehensive Validation

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report, confusion_matrix
import joblib
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

## 1. WHO Standards Integration

Adding WHO z-scores calculation for:
- Weight-for-age
- Height-for-age
- BMI-for-age
- MUAC-for-age

In [None]:
def calculate_zscore(measurement, age_months, gender, indicator):
    """Calculate z-score based on WHO standards
    
    Args:
        measurement (float): The measured value (weight in kg, height in cm, etc)
        age_months (float): Age in months
        gender (str): 'M' or 'F'
        indicator (str): One of 'wfa' (weight-for-age), 'hfa' (height-for-age),
                        'bfa' (bmi-for-age), 'mfa' (muac-for-age)
    """
    # TODO: Implement WHO standard calculations
    # For now using a simplified calculation based on normal distribution
    # This should be replaced with actual WHO LMS values
    
    if indicator == 'wfa':
        # Simplified weight-for-age z-score
        expected_weight = (age_months * 0.3) + 3.5  # Very simplified
        sd = expected_weight * 0.1
        return (measurement - expected_weight) / sd
    
    elif indicator == 'hfa':
        # Simplified height-for-age z-score
        expected_height = (age_months * 0.5) + 50  # Very simplified
        sd = expected_height * 0.05
        return (measurement - expected_height) / sd
    
    elif indicator == 'bfa':
        # Simplified BMI-for-age z-score
        expected_bmi = 16 + (age_months * 0.05)  # Very simplified
        sd = 2
        return (measurement - expected_bmi) / sd
    
    elif indicator == 'mfa':
        # Simplified MUAC-for-age z-score
        expected_muac = 14 + (age_months * 0.02)  # Very simplified
        sd = 1.5
        return (measurement - expected_muac) / sd
    
    return 0

## 2. Enhanced Feature Engineering

In [None]:
def engineer_enhanced_features(df):
    """Add enhanced features including WHO z-scores and clinical indicators"""
    
    enhanced = df.copy()
    
    # Calculate WHO z-scores
    enhanced['weight_for_age_z'] = enhanced.apply(
        lambda x: calculate_zscore(x['weight_kg'], x['age_months'], x.get('gender', 'F'), 'wfa'), 
        axis=1
    )
    
    enhanced['height_for_age_z'] = enhanced.apply(
        lambda x: calculate_zscore(x['height_cm'], x['age_months'], x.get('gender', 'F'), 'hfa'),
        axis=1
    )
    
    enhanced['bmi_for_age_z'] = enhanced.apply(
        lambda x: calculate_zscore(x['bmi'], x['age_months'], x.get('gender', 'F'), 'bfa'),
        axis=1
    )
    
    enhanced['muac_for_age_z'] = enhanced.apply(
        lambda x: calculate_zscore(x['muac_cm'], x['age_months'], x.get('gender', 'F'), 'mfa'),
        axis=1
    )
    
    # Add composite indicators
    enhanced['stunting_score'] = enhanced['height_for_age_z'].apply(lambda x: 1 if x < -2 else 0)
    enhanced['wasting_score'] = enhanced['weight_for_age_z'].apply(lambda x: 1 if x < -2 else 0)
    enhanced['underweight_score'] = enhanced['bmi_for_age_z'].apply(lambda x: 1 if x < -2 else 0)
    
    # Add interaction terms
    enhanced['weight_height_interaction'] = enhanced['weight_for_age_z'] * enhanced['height_for_age_z']
    enhanced['bmi_muac_interaction'] = enhanced['bmi_for_age_z'] * enhanced['muac_for_age_z']
    
    return enhanced

## 3. Confidence Scoring

In [None]:
class ConfidentMalnutritionClassifier:
    """Wrapper class that adds confidence scoring to predictions"""
    
    def __init__(self, base_model):
        self.model = base_model
        self.confidence_threshold = 0.8  # Minimum confidence for reliable predictions
    
    def calculate_confidence(self, features, prediction_proba):
        """Calculate confidence score based on:
        1. Model's prediction probability
        2. Feature reliability
        3. Distance from decision boundary
        """
        # Base confidence from model probability
        max_prob = np.max(prediction_proba)
        
        # Feature reliability score (check for outliers)
        feature_reliability = self._check_feature_reliability(features)
        
        # Combine scores
        confidence = (max_prob * 0.7) + (feature_reliability * 0.3)
        
        return confidence
    
    def _check_feature_reliability(self, features):
        """Check if features are within expected ranges"""
        reliability_scores = []
        
        # Check z-scores are within reasonable ranges
        for col in features.columns:
            if 'z' in col:
                z_scores = features[col]
                # Give lower reliability for extreme z-scores
                reliability = 1 - (np.abs(z_scores) > 3).mean()
                reliability_scores.append(reliability)
        
        return np.mean(reliability_scores) if reliability_scores else 0.5
    
    def predict_with_confidence(self, X):
        """Make predictions with confidence scores"""
        predictions = self.model.predict(X)
        probabilities = self.model.predict_proba(X)
        
        confidences = [self.calculate_confidence(X.iloc[i:i+1], prob) 
                      for i, prob in enumerate(probabilities)]
        
        return predictions, confidences

## 4. Comprehensive Validation

In [None]:
def validate_model_thoroughly(model, X, y):
    """Perform comprehensive model validation"""
    results = {}
    
    # 1. Stratified K-Fold Cross Validation
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_scores = cross_val_score(model, X, y, cv=skf)
    results['cv_scores'] = {
        'mean': cv_scores.mean(),
        'std': cv_scores.std(),
        'scores': cv_scores
    }
    
    # 2. Hold-out Test Set Validation
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y, random_state=42
    )
    
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    
    # 3. Calculate various metrics
    results['classification_report'] = classification_report(y_test, y_pred, output_dict=True)
    results['confusion_matrix'] = confusion_matrix(y_test, y_pred)
    
    # 4. Check for bias in predictions
    results['bias_check'] = check_prediction_bias(model, X_test, y_test)
    
    return results

def check_prediction_bias(model, X, y_true):
    """Check for systematic bias in predictions"""
    y_pred = model.predict(X)
    
    # Check for bias in different value ranges
    bias_results = {}
    
    for col in X.columns:
        if X[col].dtype in [np.float64, np.int64]:
            # Split into quartiles
            quartiles = pd.qcut(X[col], q=4)
            
            # Calculate accuracy for each quartile
            accuracies = []
            for q in quartiles.unique():
                mask = quartiles == q
                acc = (y_true[mask] == y_pred[mask]).mean()
                accuracies.append(acc)
            
            # Check if accuracies are significantly different
            f_stat, p_value = stats.f_oneway(*[[acc] for acc in accuracies])
            
            bias_results[col] = {
                'quartile_accuracies': accuracies,
                'p_value': p_value,
                'has_bias': p_value < 0.05
            }
    
    return bias_results

# Model Monitoring System

We'll implement a monitoring system to track:
1. Model performance over time
2. Data drift detection
3. Prediction quality metrics
4. System health indicators

In [None]:
class ModelMonitor:
    """System for monitoring model performance and data quality"""
    
    def __init__(self, model_name, feature_names):
        self.model_name = model_name
        self.feature_names = feature_names
        self.baseline_stats = None
        self.performance_log = []
        
    def set_baseline_statistics(self, X_baseline):
        """Calculate and store baseline statistics for drift detection"""
        self.baseline_stats = {
            'feature_means': X_baseline.mean().to_dict(),
            'feature_stds': X_baseline.std().to_dict(),
            'feature_ranges': {
                col: (X_baseline[col].min(), X_baseline[col].max())
                for col in X_baseline.columns
            }
        }
    
    def check_data_drift(self, new_data):
        """Check for significant drift in feature distributions"""
        if self.baseline_stats is None:
            raise ValueError("Baseline statistics not set. Call set_baseline_statistics first.")
        
        drift_report = {
            'feature_drift': {},
            'severe_drift_detected': False
        }
        
        for feature in self.feature_names:
            if feature not in new_data.columns:
                continue
                
            # Calculate drift metrics
            mean_drift = abs(new_data[feature].mean() - self.baseline_stats['feature_means'][feature])
            std_drift = abs(new_data[feature].std() - self.baseline_stats['feature_stds'][feature])
            
            # Normalize drift by baseline std
            normalized_drift = mean_drift / self.baseline_stats['feature_stds'][feature]
            
            drift_report['feature_drift'][feature] = {
                'mean_drift': mean_drift,
                'std_drift': std_drift,
                'normalized_drift': normalized_drift,
                'is_severe': normalized_drift > 0.5  # Threshold for severe drift
            }
            
            if normalized_drift > 0.5:
                drift_report['severe_drift_detected'] = True
        
        return drift_report
    
    def log_prediction_quality(self, y_true, y_pred, confidences):
        """Log prediction quality metrics"""
        from sklearn.metrics import accuracy_score, precision_recall_fscore_support
        
        metrics = {
            'timestamp': pd.Timestamp.now(),
            'accuracy': accuracy_score(y_true, y_pred),
            'mean_confidence': np.mean(confidences),
            'low_confidence_rate': (np.array(confidences) < 0.8).mean()
        }
        
        precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')
        metrics.update({
            'precision': precision,
            'recall': recall,
            'f1': f1
        })
        
        self.performance_log.append(metrics)
        return metrics
    
    def generate_monitoring_report(self):
        """Generate comprehensive monitoring report"""
        if not self.performance_log:
            return "No performance data available yet."
        
        df_metrics = pd.DataFrame(self.performance_log)
        
        report = {
            'latest_metrics': df_metrics.iloc[-1].to_dict(),
            'trend_analysis': {
                'accuracy_trend': df_metrics['accuracy'].rolling(5).mean().iloc[-1],
                'confidence_trend': df_metrics['mean_confidence'].rolling(5).mean().iloc[-1]
            },
            'alerts': []
        }
        
        # Check for concerning patterns
        if df_metrics['accuracy'].iloc[-1] < df_metrics['accuracy'].mean() - df_metrics['accuracy'].std():
            report['alerts'].append("Recent accuracy drop detected")
            
        if df_metrics['low_confidence_rate'].iloc[-1] > 0.2:
            report['alerts'].append("High rate of low confidence predictions")
        
        return report

In [None]:
# Integration with existing model
print("Loading existing model and data...")

# Load the data
malnutrition_data = pd.read_csv('preprocessed_malnutrition_data.csv')
model = joblib.load('malnutrition_model.joblib')
scaler = joblib.load('feature_scaler.joblib')

# Create enhanced features
print("\nApplying enhanced feature engineering...")
enhanced_data = engineer_enhanced_features(malnutrition_data)

# Initialize confident classifier and model monitor
confident_classifier = ConfidentMalnutritionClassifier(model)
monitor = ModelMonitor('malnutrition_model_v2', enhanced_data.columns)

# Set baseline statistics
print("\nSetting up monitoring baseline...")
monitor.set_baseline_statistics(enhanced_data)

# Prepare features and target
X = enhanced_data.drop(['nutrition_status'], axis=1)
y = malnutrition_data['nutrition_status']

# Validate model with new features
print("\nPerforming comprehensive validation...")
validation_results = validate_model_thoroughly(model, X, y)

# Make predictions with confidence scores
predictions, confidences = confident_classifier.predict_with_confidence(X)

# Log initial performance
print("\nLogging initial performance metrics...")
monitoring_metrics = monitor.log_prediction_quality(y, predictions, confidences)

print("\nInitial Monitoring Report:")
print("-" * 40)
print(f"Accuracy: {monitoring_metrics['accuracy']:.4f}")
print(f"Mean Confidence: {monitoring_metrics['mean_confidence']:.4f}")
print(f"Low Confidence Rate: {monitoring_metrics['low_confidence_rate']:.4f}")

if monitoring_metrics['low_confidence_rate'] > 0.2:
    print("\nWARNING: High rate of low confidence predictions detected!")
    
print("\nValidation Results:")
print("-" * 40)
print(f"Cross-validation Score: {validation_results['cv_scores']['mean']:.4f} (+/- {validation_results['cv_scores']['std']*2:.4f})")

# Check for data drift
drift_report = monitor.check_data_drift(X)
if drift_report['severe_drift_detected']:
    print("\nWARNING: Severe data drift detected in some features!")
    for feature, stats in drift_report['feature_drift'].items():
        if stats['is_severe']:
            print(f"- {feature}: Normalized drift = {stats['normalized_drift']:.4f}")

In [None]:
def retrain_if_needed(model, X, y, monitor, model_path='malnutrition_model.joblib', scaler_path='feature_scaler.joblib', min_accuracy_improvement=0.01, low_confidence_threshold=0.25):
    """
    Retrain model when monitoring signals indicate it's necessary.

    Strategy:
    - Check data drift via the monitor
    - Check recent low-confidence rate from monitor logs
    - Retrain on a fresh split and evaluate on holdout
    - If severe drift OR low-confidence OR new model improves accuracy by min_accuracy_improvement,
      accept the retrained model, save versioned artifacts and update baseline statistics.

    Returns: (model, scaler, retrained_bool, info_dict)
    """
    import os, joblib
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.ensemble import GradientBoostingClassifier

    # Safety checks
    if X.shape[0] < 50:
        return model, joblib.load(scaler_path) if os.path.exists(scaler_path) else None, False, {'reason': 'not_enough_data'}

    drift = monitor.check_data_drift(X)
    last_metrics = monitor.performance_log[-1] if monitor.performance_log else None
    low_confidence = (last_metrics and last_metrics.get('low_confidence_rate', 0) > low_confidence_threshold)
    severe_drift = drift.get('severe_drift_detected', False)

    # Prepare a retrain dataset
    X_train, X_holdout, y_train, y_holdout = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_holdout_scaled = scaler.transform(X_holdout)

    # Train a fresh model with the same hyperparameters used elsewhere
    new_model = GradientBoostingClassifier(
        n_estimators=200,
        learning_rate=0.1,
        max_depth=5,
        min_samples_split=5,
        min_samples_leaf=2,
        subsample=0.8,
        random_state=42
    )
    new_model.fit(X_train_scaled, y_train)

    new_acc = new_model.score(X_holdout_scaled, y_holdout)
    old_acc = last_metrics['accuracy'] if last_metrics and 'accuracy' in last_metrics else 0

    retrain = False
    reasons = []
    if severe_drift:
        retrain = True
        reasons.append('severe_drift')
    if low_confidence:
        retrain = True
        reasons.append('low_confidence')
    if new_acc > old_acc + min_accuracy_improvement:
        retrain = True
        reasons.append('accuracy_improvement')

    info = {
        'timestamp': str(pd.Timestamp.now()),
        'old_accuracy': old_acc,
        'new_accuracy': float(new_acc),
        'retrain': bool(retrain),
        'reasons': reasons,
        'drift_summary': drift
    }

    if retrain:
        # Save versioned artifacts
        ts = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
        versioned_model = f"malnutrition_model_{ts}.joblib"
        versioned_scaler = f"feature_scaler_{ts}.joblib"

        joblib.dump(new_model, versioned_model)
        joblib.dump(scaler, versioned_scaler)

        # Also write canonical names used by the rest of the system
        joblib.dump(new_model, model_path)
        joblib.dump(scaler, scaler_path)

        # Update monitoring baseline to match the new data distribution
        try:
            monitor.set_baseline_statistics(X)
        except Exception:
            pass

        # Append retrain event to log
        log_row = info.copy()
        log_row.update({
            'model_file': versioned_model,
            'scaler_file': versioned_scaler
        })

        import csv
        log_file = 'retrain_log.csv'
        write_header = not os.path.exists(log_file)
        with open(log_file, 'a', newline='', encoding='utf-8') as fh:
            writer = csv.DictWriter(fh, fieldnames=list(log_row.keys()))
            if write_header:
                writer.writeheader()
            writer.writerow(log_row)

        return new_model, scaler, True, info

    # No retrain accepted: return original model and existing scaler (if available)
    existing_scaler = None
    try:
        existing_scaler = joblib.load(scaler_path) if os.path.exists(scaler_path) else None
    except Exception:
        existing_scaler = None

    return model, existing_scaler, False, info


In [None]:
# Call retraining trigger after drift check
print('\nChecking whether automated retraining is needed...')
new_model, new_scaler, retrained, retrain_info = retrain_if_needed(model, X, y, monitor)
if retrained:
    model = new_model
    scaler = new_scaler
    print('\nModel was retrained and artifacts updated:')
    print(retrain_info)
else:
    print('\nNo retraining performed. Info:')
    print(retrain_info)

# Mark todo 1 as completed in the local todo list
# (The remote todo tool is authoritative; we'll update the manager after tests.)
