## Setup and configuration

In [None]:
# Standard library imports
import os
import sys
import time
from datetime import datetime
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count

# Disable GPU and limit threading
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['OMP_NUM_THREADS'] = '2'

# Third party imports
import joblib
import json
import pandas as pd
import tensorflow as tf

# Import ensemble modules
from functions import ensemble_database
from functions.ensemble_initialization import create_data_splits, create_base_preprocessor, train_founder_model
from functions.ensemble_parallel import train_single_candidate, prepare_training_batch
from functions.ensemble_evaluation import evaluate_candidate_ensemble
from functions.ensemble_stage2_training import train_or_expand_stage2_model, save_ensemble_bundle
from functions.ensemble_hill_climbing import (
    adaptive_simulated_annealing_acceptance,
    update_temperature,
    log_iteration
)
from functions.ensemble_stage2_model import save_checkpoint

# Configure TensorFlow
tf.get_logger().setLevel('ERROR')

# Detect available CPUs
n_cpus = cpu_count()
print(f"TensorFlow version: {tf.__version__}")
print(f"Available CPUs: {n_cpus}")
print(f"GPU disabled: CUDA drivers not available in dev container")

### Configuration parameters

In [None]:
# Random state for reproducibility
RANDOM_STATE = 315

# Parallel training configuration
BATCH_SIZE = 10  # Train this many candidates in parallel
N_WORKERS = min(10, n_cpus)

# Hill climbing configuration
MAX_ITERATIONS = 500
PLATEAU_ITERATIONS = 100
BASE_TEMPERATURE = 0.01
TEMPERATURE_DECAY = 0.995

# Stage 2 DNN configuration
STAGE2_BATCH_SIZE_MODELS = 10  # Retrain DNN every N accepted models
STAGE2_EPOCHS = 100
STAGE2_BATCH_SIZE = 128
STAGE2_PATIENCE = 10

# Paths
DATA_DIR = Path('../data')
MODELS_BASE_DIR = Path('../models')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
MODELS_DIR = MODELS_BASE_DIR / f'run_{timestamp}'
ENSEMBLE_DIR = MODELS_DIR / 'ensemble_stage1_models'
CHECKPOINT_PATH = MODELS_DIR / 'ensemble_checkpoint.pkl'

# Create directories
DATA_DIR.mkdir(parents=True, exist_ok=True)
MODELS_DIR.mkdir(parents=True, exist_ok=True)
ENSEMBLE_DIR.mkdir(parents=True, exist_ok=True)

# Initialize database
ensemble_database.reset_database()
ensemble_database.init_database()

print(f"\nConfiguration:")
print(f"  Random state: {RANDOM_STATE}")
print(f"  Parallel: {BATCH_SIZE} candidates, {N_WORKERS} workers")
print(f"  Max iterations: {MAX_ITERATIONS}")
print(f"  Training run directory: {MODELS_DIR}")

## Data loading and preparation

In [None]:
# Load training data
train_df_path = 'https://gperdrizet.github.io/FSA_devops/assets/data/unit3/diabetes_prediction_train.csv'
train_df = pd.read_csv(train_df_path)
train_df.drop_duplicates(inplace=True)

print(f"Training data shape: {train_df.shape}")
print(f"Class distribution:")
print(train_df['diagnosed_diabetes'].value_counts(normalize=True))

# Define features
label = 'diagnosed_diabetes'
numerical_features = [
    'age', 'alcohol_consumption_per_week', 'physical_activity_minutes_per_week',
    'diet_score', 'sleep_hours_per_day', 'screen_time_hours_per_day', 'bmi',
    'waist_to_hip_ratio', 'systolic_bp', 'diastolic_bp', 'heart_rate',
    'cholesterol_total', 'hdl_cholesterol', 'ldl_cholesterol', 'triglycerides',
    'family_history_diabetes', 'hypertension_history', 'cardiovascular_history'
]
ordinal_features = ['education_level', 'income_level']
education_categories = [['No formal', 'Highschool', 'Graduate', 'Postgraduate']]
income_categories = [['Low', 'Lower-Middle', 'Middle', 'Upper-Middle', 'High']]
nominal_features = ['gender', 'ethnicity', 'smoking_status', 'employment_status']

In [None]:
# Create fixed three-way data split
X_train_pool, X_val_s1, X_val_s2, y_train_pool, y_val_s1, y_val_s2 = create_data_splits(
    train_df, label, RANDOM_STATE
)

In [None]:
# Create base preprocessor
base_preprocessor = create_base_preprocessor(
    numerical_features, ordinal_features, nominal_features,
    education_categories, income_categories
)

## Initialize ensemble with founder model

In [None]:
# Train founder model
fitted_pipeline, founder_auc = train_founder_model(
    X_train_pool, X_val_s1, X_val_s2, y_train_pool, y_val_s1, y_val_s2,
    base_preprocessor, RANDOM_STATE, BASE_TEMPERATURE, ENSEMBLE_DIR
)

# Initialize ensemble
ensemble_models = [fitted_pipeline]
stage2_model = None
best_ensemble_score = founder_auc

# Initialize hill climbing variables
start_iteration = 1
temperature = BASE_TEMPERATURE

print(f"\nFounder model AUC: {founder_auc:.6f}")
print(f"Stage 2 DNN will be trained after {STAGE2_BATCH_SIZE_MODELS} accepted models")

## Parallel hill climbing loop

Iteratively trains batches of candidate models in parallel, evaluates with hybrid scoring,
and accepts/rejects using simulated annealing.

In [None]:
print(f"\n{'=' * 80}")
print("STARTING PARALLEL HILL CLIMBING LOOP")
print(f"{'=' * 80}")
print(f"Batch size: {BATCH_SIZE} candidates trained in parallel")
print(f"Workers: {N_WORKERS} parallel processes")

iterations_since_improvement = 0
iteration = start_iteration

while iteration < MAX_ITERATIONS and iterations_since_improvement < PLATEAU_ITERATIONS:
    print(f"\n{'=' * 80}")
    print(f"BATCH Starting at iteration {iteration + 1}")
    print(f"{'=' * 80}")
    print(f"Ensemble size: {len(ensemble_models)} | Best score: {best_ensemble_score:.6f} | "
          f"Temperature: {temperature:.6f} | No improvement: {iterations_since_improvement}/{PLATEAU_ITERATIONS}")
    
    # Prepare batch of training jobs
    batch_jobs = prepare_training_batch(
        iteration, BATCH_SIZE, MAX_ITERATIONS, X_train_pool, y_train_pool,
        X_val_s1, y_val_s1, base_preprocessor, RANDOM_STATE
    )
    
    print(f"\nTraining {len(batch_jobs)} candidates in parallel...")
    batch_start_time = time.time()
    
    # Train candidates in parallel
    trained_candidates = []
    with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:
        futures = {executor.submit(train_single_candidate, job): job for job in batch_jobs}
        
        for future in as_completed(futures):
            try:
                result = future.result()
                trained_candidates.append(result)
                print(f"  ✓ Iteration {result['iteration'] + 1}: {result['metadata']['classifier_type']} "
                      f"AUC={result['val_auc_s1']:.6f} ({result['training_time']:.1f}s)")
            except Exception as e:
                job = futures[future]
                print(f"  ✗ Iteration {job[0] + 1} failed: {e}")
    
    batch_time = time.time() - batch_start_time
    print(f"\nBatch complete ({batch_time:.1f}s, {batch_time/len(trained_candidates):.1f}s per model)")
    
    # Sort by iteration number
    trained_candidates.sort(key=lambda x: x['iteration'])
    
    # Process each trained candidate for acceptance/rejection
    for result in trained_candidates:
        current_iter = result['iteration']
        fitted_pipeline = result['fitted_pipeline']
        metadata = result['metadata']
        val_auc_s1 = result['val_auc_s1']
        pipeline_hash = result['pipeline_hash']
        training_memory_mb = result.get('memory_mb', None)
        training_time_sec = result.get('training_time_sec', None)
        
        print(f"\n{'-' * 80}")
        print(f"Iteration {current_iter + 1}: {metadata['classifier_type']} | Stage 1 AUC: {val_auc_s1:.6f}")
        
        # Evaluate ensemble with candidate
        candidate_ensemble = ensemble_models + [fitted_pipeline]
        candidate_score, diversity_score, aggregation_method = evaluate_candidate_ensemble(
            candidate_ensemble, ensemble_models, stage2_model,
            X_val_s1, X_val_s2, y_val_s1, y_val_s2
        )
        
        print(f"  Ensemble AUC ({aggregation_method}): {candidate_score:.6f} | Diversity: {diversity_score:.6f}")
        
        # Simulated annealing acceptance
        accept, reason = adaptive_simulated_annealing_acceptance(
            current_score=best_ensemble_score,
            candidate_score=candidate_score,
            temperature=temperature,
            random_state=RANDOM_STATE + current_iter
        )
        
        print(f"  Decision: {'✓ ACCEPT' if accept else '✗ REJECT'} ({reason})")
        
        # Track stage 2 memory and time for this iteration
        stage2_memory_mb = None
        stage2_time_sec = None
        
        # Log iteration
        log_iteration(
            iteration=current_iter,
            accepted=accept,
            rejection_reason=reason,
            pipeline_hash=pipeline_hash,
            stage1_val_auc=val_auc_s1,
            stage2_val_auc=candidate_score,
            ensemble_size=len(candidate_ensemble) if accept else len(ensemble_models),
            diversity_score=diversity_score,
            temperature=temperature,
            metadata=metadata,
            ensemble_id=f"iter_{current_iter}",
            training_memory_mb=training_memory_mb,
            stage2_memory_mb=stage2_memory_mb,
            training_time_sec=training_time_sec,
            stage2_time_sec=stage2_time_sec
        )
        
        # Update ensemble if accepted
        if accept:
            ensemble_models.append(fitted_pipeline)
            
            # Save model
            model_path = ENSEMBLE_DIR / f'model_{current_iter}.joblib'
            joblib.dump(fitted_pipeline, model_path)
            
            # Check if we should train/retrain stage 2 DNN
            if len(ensemble_models) % STAGE2_BATCH_SIZE_MODELS == 0:
                stage2_model, final_score, stage2_memory_mb, stage2_time_sec = train_or_expand_stage2_model(
                    ensemble_models, stage2_model, X_val_s1, y_val_s1, X_val_s2, y_val_s2,
                    STAGE2_EPOCHS, STAGE2_BATCH_SIZE, STAGE2_PATIENCE, current_iter
                )
                
                # Save ensemble bundle checkpoint
                save_ensemble_bundle(
                    ensemble_models, stage2_model, best_ensemble_score, current_iter,
                    MODELS_DIR, RANDOM_STATE, BATCH_SIZE, N_WORKERS, base_preprocessor,
                    numerical_features, ordinal_features, nominal_features,
                    education_categories, income_categories
                )
                print(f"{'=' * 80}\n")
            
            # Check if this is the best score
            if candidate_score > best_ensemble_score:
                print(f"  🎉 New best score: {candidate_score:.6f} (Δ={candidate_score - best_ensemble_score:.6f})")
                best_ensemble_score = candidate_score
                iterations_since_improvement = 0
            else:
                iterations_since_improvement += 1
        else:
            iterations_since_improvement += 1
        
        # Update temperature
        temperature = update_temperature(
            iteration=current_iter,
            acceptance_history=[accept],
            current_temperature=temperature,
            base_temperature=BASE_TEMPERATURE,
            decay_rate=TEMPERATURE_DECAY
        )
    
    # Move to next batch
    iteration += len(trained_candidates)
    
    # Check termination
    if iterations_since_improvement >= PLATEAU_ITERATIONS:
        print(f"\n{'=' * 80}")
        print(f"TERMINATING: No improvement for {PLATEAU_ITERATIONS} iterations")
        print(f"{'=' * 80}")
        break

# Calculate final acceptance rate
conn = ensemble_database.sqlite3.connect(ensemble_database.DB_PATH)
acceptance_stats = conn.execute("SELECT COUNT(*) as total, SUM(accepted) as accepted FROM ensemble_log").fetchone()
conn.close()
acceptance_rate = acceptance_stats[1] / acceptance_stats[0] if acceptance_stats[0] > 0 else 0.0

print(f"\n{'=' * 80}")
print("HILL CLIMBING COMPLETE")
print(f"{'=' * 80}")
print(f"Final ensemble size: {len(ensemble_models)}")
print(f"Best ensemble AUC: {best_ensemble_score:.6f}")
print(f"Total iterations: {iteration}")
print(f"Acceptance rate: {acceptance_rate:.1%}")

## Save final checkpoint and bundle

In [None]:
# Save final checkpoint
save_checkpoint(
    checkpoint_path=CHECKPOINT_PATH,
    ensemble_models=ensemble_models,
    stage2_model=stage2_model,
    iteration=iteration - 1,
    temperature=temperature,
    best_score=best_ensemble_score,
    acceptance_history=[],
    metadata={
        'total_iterations': iteration,
        'final_ensemble_size': len(ensemble_models),
        'acceptance_rate': acceptance_rate,
        'best_score': best_ensemble_score,
        'parallel_batch_size': BATCH_SIZE,
        'n_workers': N_WORKERS
    }
)

# Save metadata
metadata_path = MODELS_DIR / 'ensemble_metadata.json'
with open(metadata_path, 'w') as f:
    json.dump({
        'ensemble_size': len(ensemble_models),
        'total_iterations': iteration,
        'best_score': best_ensemble_score,
        'acceptance_rate': acceptance_rate,
        'training_completed': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'random_state': RANDOM_STATE,
        'parallel_batch_size': BATCH_SIZE,
        'n_workers': N_WORKERS
    }, f, indent=2)

print(f"\nCheckpoint saved: {CHECKPOINT_PATH}")
print(f"Metadata saved: {metadata_path}")

In [None]:
# Save final ensemble bundle for Kaggle
ensemble_bundle_path = MODELS_DIR / 'ensemble_bundle.joblib'

ensemble_bundle = {
    'ensemble_models': ensemble_models,
    'stage2_model': stage2_model,
    'metadata': {
        'ensemble_size': len(ensemble_models),
        'total_iterations': iteration,
        'best_score': best_ensemble_score,
        'acceptance_rate': acceptance_rate,
        'training_completed': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'random_state': RANDOM_STATE,
        'parallel_batch_size': BATCH_SIZE,
        'n_workers': N_WORKERS
    },
    'base_preprocessor': base_preprocessor,
    'feature_info': {
        'numerical_features': numerical_features,
        'ordinal_features': ordinal_features,
        'nominal_features': nominal_features,
        'education_categories': education_categories,
        'income_categories': income_categories
    }
}

joblib.dump(ensemble_bundle, ensemble_bundle_path, compress=3)

print(f"\nFinal ensemble bundle saved: {ensemble_bundle_path}")
print(f"File size: {ensemble_bundle_path.stat().st_size / (1024**2):.1f} MB")
print(f"\nTo load on Kaggle:")
print(f"  ensemble_bundle = joblib.load('ensemble_bundle.joblib')")
print(f"  ensemble_models = ensemble_bundle['ensemble_models']")
print(f"  stage2_model = ensemble_bundle['stage2_model']")

## Summary

In [None]:
print(f"\n{'=' * 80}")
print("ENSEMBLE TRAINING SUMMARY")
print(f"{'=' * 80}")
print(f"\nFinal Statistics:")
print(f"  Ensemble size: {len(ensemble_models)}")
print(f"  Best validation AUC: {best_ensemble_score:.6f}")
print(f"  Total iterations: {iteration}")
print(f"  Acceptance rate: {acceptance_rate:.1%}")
print(f"  Parallel configuration: {BATCH_SIZE} candidates, {N_WORKERS} workers")
print(f"\nFiles created:")
print(f"  Database: {ensemble_database.DB_PATH}")
print(f"  Models: {ENSEMBLE_DIR}")
print(f"  Checkpoint: {CHECKPOINT_PATH}")
print(f"  Metadata: {metadata_path}")
print(f"  Bundle: {ensemble_bundle_path}")
print(f"\n{'=' * 80}")