In [None]:
import pandas as pd
import numpy as np
import joblib
import yaml
import logging
from pathlib import Path
from time import time
from typing import Dict, List, Union, Optional
from sklearn.base import BaseEstimator
from sklearn.model_selection import cross_val_score
from sklearn.metrics import make_scorer, mean_squared_error
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from sklearn.linear_model import Ridge
from sklearn.tree import DecisionTreeRegressor

In [None]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

In [None]:

class ModelTrainer:
    """
    End-to-end model training pipeline for car price prediction
    
    Features:
    - Multiple model support with hyperparameter configurations
    - Cross-validation with early stopping
    - Automatic model persistence
    - Comprehensive performance tracking
    - Memory-efficient data handling
    """
    
    def __init__(self, config_path: str = 'config/training_config.yaml'):
        """
        Initialize trainer with configuration
        
        Args:
            config_path: Path to YAML configuration file
        """
        self.config = self._load_config(config_path)
        self.models: Dict[str, BaseEstimator] = {}
        self.results: Dict[str, Dict] = {}
        self._validate_config()
        
    def _load_config(self, config_path: str) -> Dict:
        """Load training configuration from YAML file"""
        try:
            with open(config_path) as f:
                config = yaml.safe_load(f)
                
            # Set default values if not specified
            defaults = {
                'cross_validation': {
                    'cv_folds': 5,
                    'scoring': 'neg_root_mean_squared_error'
                },
                'random_state': 42,
                'n_jobs': -1
            }
            
            return {**defaults, **config}
            
        except Exception as e:
            logger.error(f"Failed to load config: {e}")
            raise
            
    def _validate_config(self) -> None:
        """Validate training configuration"""
        required_keys = ['input_data', 'models', 'output_dir']
        if not all(key in self.config for key in required_keys):
            raise ValueError(f"Config missing required keys: {required_keys}")
            
        if not isinstance(self.config['models'], dict):
            raise ValueError("Models config must be a dictionary")
            
    def load_data(self) -> Tuple[pd.DataFrame, pd.Series]:
        """Load and validate training data"""
        try:
            logger.info(f"Loading data from {self.config['input_data']}")
            data = pd.read_csv(self.config['input_data'])
            
            # Validate required columns
            required_cols = self.config.get('required_columns', [])
            if required_cols and not all(col in data.columns for col in required_cols):
                missing = set(required_cols) - set(data.columns)
                raise ValueError(f"Missing required columns: {missing}")
                
            X = data.drop(columns=[self.config['target_column']])
            y = data[self.config['target_column']]
            
            return X, y
            
        except Exception as e:
            logger.error(f"Data loading failed: {e}")
            raise
            
    def initialize_models(self) -> None:
        """Create model instances from configuration"""
        model_factories = {
            'random_forest': RandomForestRegressor,
            'xgboost': XGBRegressor,
            'ridge': Ridge,
            'decision_tree': DecisionTreeRegressor
        }
        
        for model_name, params in self.config['models'].items():
            if model_name not in model_factories:
                logger.warning(f"Unknown model type: {model_name}. Skipping.")
                continue
                
            try:
                # Merge global and model-specific params
                model_params = {
                    'random_state': self.config['random_state'],
                    'n_jobs': self.config['n_jobs'],
                    **params
                }
                
                self.models[model_name] = model_factories[model_name](**model_params)
                logger.info(f"Initialized {model_name} with params: {model_params}")
                
            except Exception as e:
                logger.error(f"Failed to initialize {model_name}: {e}")
                
    def train_models(self, X: pd.DataFrame, y: pd.Series) -> None:
        """Train all configured models with cross-validation"""
        if not self.models:
            raise ValueError("No models initialized. Call initialize_models() first.")
            
        scorer = make_scorer(
            mean_squared_error,
            squared=False,  # Returns RMSE
            greater_is_better=False
        )
        
        for model_name, model in self.models.items():
            logger.info(f"Training {model_name}...")
            start_time = time()
            
            try:
                # Train model
                model.fit(X, y)
                
                # Cross-validation
                cv_scores = cross_val_score(
                    model, X, y,
                    cv=self.config['cross_validation']['cv_folds'],
                    scoring=scorer,
                    n_jobs=self.config['n_jobs']
                )
                
                # Store results
                self.results[model_name] = {
                    'train_time': time() - start_time,
                    'cv_mean_rmse': -np.mean(cv_scores),
                    'cv_std_rmse': np.std(cv_scores),
                    'cv_scores': -cv_scores.tolist(),  # Convert to positive RMSE
                    'model_params': model.get_params()
                }
                
                logger.info(
                    f"{model_name} trained in {self.results[model_name]['train_time']:.2f}s. "
                    f"CV RMSE: {self.results[model_name]['cv_mean_rmse']:.2f} ± "
                    f"{self.results[model_name]['cv_std_rmse']:.2f}"
                )
                
            except Exception as e:
                logger.error(f"Failed to train {model_name}: {e}")
                self.results[model_name] = {'error': str(e)}
                
    def save_models_and_results(self) -> None:
        """Persist trained models and results to disk"""
        output_dir = Path(self.config['output_dir'])
        output_dir.mkdir(parents=True, exist_ok=True)
        
        # Save models
        for model_name, model in self.models.items():
            if 'error' not in self.results.get(model_name, {}):
                try:
                    joblib.dump(
                        model,
                        output_dir / f"{model_name}_model.joblib",
                        compress=3
                    )
                    logger.info(f"Saved {model_name} model to {output_dir}")
                except Exception as e:
                    logger.error(f"Failed to save {model_name}: {e}")
                    
        # Save training results
        try:
            results_df = pd.DataFrame.from_dict(self.results, orient='index')
            results_df.to_csv(output_dir / 'training_results.csv')
            logger.info(f"Saved training results to {output_dir}")
        except Exception as e:
            logger.error(f"Failed to save results: {e}")
            
    def run_training_pipeline(self) -> None:
        """Execute complete training workflow"""
        try:
            X, y = self.load_data()
            self.initialize_models()
            self.train_models(X, y)
            self.save_models_and_results()
            
            # Print summary
            logger.info("\n=== Training Summary ===")
            for model_name, result in self.results.items():
                if 'error' not in result:
                    logger.info(
                        f"{model_name}: "
                        f"RMSE {result['cv_mean_rmse']:.2f} ± {result['cv_std_rmse']:.2f} "
                        f"(trained in {result['train_time']:.2f}s)"
                    )
                    
        except Exception as e:
            logger.error(f"Training pipeline failed: {e}")
            raise

if __name__ == "__main__":
    try:
        trainer = ModelTrainer(config_path="config/training_config.yaml")
        trainer.run_training_pipeline()
        
    except Exception as e:
        logger.error(f"Training failed: {e}")
        sys.exit(1)

Hyperparameter Tuning

In [None]:
from sklearn.model_selection import RandomizedSearchCV

param_dist = {
    'n_estimators': [100, 200, 300],
    'max_depth': [5, 10, 15],
    'min_samples_split': [2, 5, 10]
}

search = RandomizedSearchCV(
    RandomForestRegressor(),
    param_distributions=param_dist,
    n_iter=10,
    cv=5,
    scoring='neg_root_mean_squared_error'
)
search.fit(X, y)

Model Interpretability:

In [None]:
import shap

explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X)
shap.summary_plot(shap_values, X)