data pre-processing

In [2]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, PowerTransformer, RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import ADASYN, SVMSMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.feature_selection import SelectFromModel, mutual_info_classif  # Corrected import
from sklearn.ensemble import RandomForestClassifier


class AdvancedPreprocessor:
    """
    Comprehensive preprocessing including feature engineering, selection, and data balancing
    """
    def __init__(self, balance_strategy='SVMSMOTE', feature_selection=True):
        self.balance_strategy = balance_strategy
        self.feature_selection = feature_selection
        self.pipeline = None
        self.feature_selector = None
        
    def create_feature_engineer(self):
        class FeatureEngineer:
            def fit(self, X, y=None):
                return self
            
            def transform(self, X):
                df = X.copy()
                # Basic engineering
                df['temperature_differential'] = df['Process temperature'] - df['Air temperature']
                df['power_speed_ratio'] = df['Power'] / (df['Rotational speed'] + 1)
                
                # Advanced engineering
                df['thermal_stress'] = df['temperature_differential'] * df['Power']
                df['operational_stress'] = df['Torque'] * df['Tool wear']
                df['efficiency_index'] = df['Power'] / (df['Torque'] * df['Rotational speed'] + 1)
                df['wear_rate'] = df['Tool wear'] / (df['operational_stress'] + 1)
                df['temperature_stability'] = df['temperature_differential'].rolling(window=3).std()
                
                # Interaction features
                df['power_wear_interaction'] = df['Power'] * df['Tool wear']
                df['speed_torque_efficiency'] = df['Rotational speed'] / (df['Torque'] + 1)
                
                return df
                
        return FeatureEngineer()

    def create_outlier_handler(self):
        class OutlierHandler:
            def __init__(self):
                self.thresholds = {}
                
            def fit(self, X, y=None):
                for col in X.columns:
                    if col in ['temperature_cols']:
                        threshold = 2.0
                    else:
                        threshold = 1.5
                        
                    Q1 = X[col].quantile(0.25)
                    Q3 = X[col].quantile(0.75)
                    IQR = Q3 - Q1
                    
                    self.thresholds[col] = {
                        'lower': Q1 - threshold * IQR,
                        'upper': Q3 + threshold * IQR
                    }
                return self
                
            def transform(self, X):
                X_clean = X.copy()
                for col in X_clean.columns:
                    if col in self.thresholds:
                        X_clean[col] = X_clean[col].clip(
                            lower=self.thresholds[col]['lower'],
                            upper=self.thresholds[col]['upper']
                        )
                return X_clean
                
        return OutlierHandler()

    def create_feature_selector(self):
        class HybridFeatureSelector:
            def __init__(self, n_features=None):
                self.n_features = n_features
                self.selected_features_ = None
                self.importance_scores_ = None
                
            def fit(self, X, y):
                # Multiple importance methods
                mi_scores = mutual_info_classif(X, y)
                
                rf = RandomForestClassifier(n_estimators=100, random_state=42)
                rf.fit(X, y)
                rf_importance = rf.feature_importances_
                
                # Normalize and combine scores
                mi_scores_norm = mi_scores / np.sum(mi_scores)
                rf_importance_norm = rf_importance / np.sum(rf_importance)
                
                self.importance_scores_ = (mi_scores_norm + rf_importance_norm) / 2
                
                # Select features
                if self.n_features is None:
                    self.n_features = len(X.columns) // 2
                    
                feature_indices = np.argsort(self.importance_scores_)[-self.n_features:]
                self.selected_features_ = X.columns[feature_indices]
                
                return self
                
            def transform(self, X):
                return X[self.selected_features_]
                
            def get_feature_importance(self):
                return pd.Series(self.importance_scores_, index=self.selected_features_)
                
        return HybridFeatureSelector()

    def fit(self, X, y):
        # Validate input
        self._validate_input(X)
        
        # Create preprocessing steps
        numeric_transformer = Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('outlier_handler', self.create_outlier_handler()),
            ('scaler', PowerTransformer(standardize=True))
        ])
        
        # Create main pipeline
        pipeline_steps = [
            ('numeric', numeric_transformer),
            ('feature_engineer', self.create_feature_engineer())
        ]
        
        if self.feature_selection:
            self.feature_selector = self.create_feature_selector()
            pipeline_steps.append(('feature_selector', self.feature_selector))
            
        if self.balance_strategy:
            if self.balance_strategy == 'SVMSMOTE':
                sampler = SVMSMOTE(random_state=42)
            else:
                sampler = ADASYN(random_state=42)
            pipeline_steps.append(('sampler', sampler))
            
        self.pipeline = ImbPipeline(pipeline_steps)
        self.pipeline.fit(X, y)
        
        return self

    def transform(self, X):
        # Transform data
        X_transformed = self.pipeline.transform(X)
        
        # Validate output
        self._validate_output(X_transformed)
        
        return X_transformed

    def _validate_input(self, X):
        required_columns = ['Air temperature', 'Process temperature', 'Rotational speed', 
                          'Torque', 'Tool wear', 'Power']
        assert all(col in X.columns for col in required_columns), "Missing required columns"
        assert not X.isnull().any().any(), "Input contains null values"
        
    def _validate_output(self, X):
        assert not np.any(np.isnan(X)), "Output contains NaN values"
        assert not np.any(np.isinf(X)), "Output contains infinite values"

    def get_feature_importance(self):
        if self.feature_selector:
            return self.feature_selector.get_feature_importance()
        return None

# Usage example:
def prepare_data(df, test_size=0.3, random_state=42):
    """
    Complete data preparation pipeline
    """
    from sklearn.model_selection import train_test_split
    
    # Split features and target
    X = df.drop(['Machine failure', 'kmeans_cluster', 'hierarchical_cluster', 'dbscan_cluster'], axis=1)
    y = df['Machine failure']
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=y
    )
    
    # Create and fit preprocessor
    preprocessor = AdvancedPreprocessor(
        balance_strategy='SVMSMOTE',
        feature_selection=True
    )
    
    # Process data
    X_train_processed = preprocessor.fit_transform(X_train, y_train)
    X_test_processed = preprocessor.transform(X_test)
    
    # Get feature importance
    feature_importance = preprocessor.get_feature_importance()
    
    return X_train_processed, X_test_processed, y_train, y_test, preprocessor, feature_importance

In [3]:
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.cluster.hierarchy import dendrogram, linkage

class AdvancedClusterAnalyzer:
    """
    Comprehensive clustering analysis with multiple algorithms and advanced validation
    """
    def __init__(self, random_state=42):
        self.random_state = random_state
        self.results = {}
        self.best_models = {}
        self.cluster_labels = {}
        self.feature_importance = {}
        
    def fit(self, X, y=None):
        """
        Fit multiple clustering algorithms and store results
        """
        self.X = StandardScaler().fit_transform(X)
        self.X_original = X
        
        # Perform clustering with different algorithms
        self._perform_kmeans()
        self._perform_dbscan()
        self._perform_hierarchical()
        self._analyze_feature_importance()
        
        return self
        
    def _perform_kmeans(self, max_clusters=10):
        """
        Perform KMeans clustering with comprehensive evaluation
        """
        kmeans_results = {
            'silhouette_scores': [],
            'calinski_scores': [],
            'davies_scores': [],
            'inertia': [],
            'models': []
        }
        
        for n_clusters in range(2, max_clusters + 1):
            kmeans = KMeans(
                n_clusters=n_clusters,
                random_state=self.random_state,
                n_init=10
            )
            labels = kmeans.fit_predict(self.X)
            
            # Calculate clustering metrics
            kmeans_results['silhouette_scores'].append(silhouette_score(self.X, labels))
            kmeans_results['calinski_scores'].append(calinski_harabasz_score(self.X, labels))
            kmeans_results['davies_scores'].append(davies_bouldin_score(self.X, labels))
            kmeans_results['inertia'].append(kmeans.inertia_)
            kmeans_results['models'].append(kmeans)
        
        # Find optimal number of clusters
        optimal_n_clusters = self._find_optimal_clusters(kmeans_results)
        self.best_models['kmeans'] = kmeans_results['models'][optimal_n_clusters-2]
        self.cluster_labels['kmeans'] = self.best_models['kmeans'].labels_
        self.results['kmeans'] = kmeans_results
        
    def _perform_dbscan(self):
        """
        Perform DBSCAN clustering with parameter optimization
        """
        from sklearn.neighbors import NearestNeighbors
        
        # Determine optimal eps using k-distance graph
        neigh = NearestNeighbors(n_neighbors=2)
        nbrs = neigh.fit(self.X)
        distances, indices = nbrs.kneighbors(self.X)
        distances = np.sort(distances[:, 1])
        
        # Find knee point for optimal eps
        from kneed import KneeLocator
        knee_locator = KneeLocator(
            range(len(distances)), distances,
            curve='convex', direction='increasing'
        )
        optimal_eps = distances[knee_locator.knee]
        
        dbscan_results = {
            'eps_values': [],
            'n_clusters': [],
            'silhouette_scores': [],
            'noise_points': []
        }
        
        eps_range = np.linspace(optimal_eps * 0.5, optimal_eps * 1.5, 20)
        
        for eps in eps_range:
            dbscan = DBSCAN(eps=eps, min_samples=5)
            labels = dbscan.fit_predict(self.X)
            
            if len(np.unique(labels)) > 1:  # More than just noise
                dbscan_results['eps_values'].append(eps)
                dbscan_results['n_clusters'].append(len(np.unique(labels[labels != -1])))
                dbscan_results['silhouette_scores'].append(silhouette_score(self.X, labels))
                dbscan_results['noise_points'].append(np.sum(labels == -1))
        
        # Select optimal eps based on silhouette score
        optimal_idx = np.argmax(dbscan_results['silhouette_scores'])
        optimal_eps = dbscan_results['eps_values'][optimal_idx]
        
        # Fit final DBSCAN model
        self.best_models['dbscan'] = DBSCAN(eps=optimal_eps, min_samples=5)
        self.cluster_labels['dbscan'] = self.best_models['dbscan'].fit_predict(self.X)
        self.results['dbscan'] = dbscan_results
        
    def _perform_hierarchical(self, max_clusters=10):
        """
        Perform Hierarchical clustering with different linkage methods
        """
        linkage_methods = ['ward', 'complete', 'average']
        hierarchical_results = {method: {} for method in linkage_methods}
        
        for method in linkage_methods:
            scores = {
                'silhouette_scores': [],
                'calinski_scores': [],
                'davies_scores': [],
                'models': []
            }
            
            for n_clusters in range(2, max_clusters + 1):
                hierarchical = AgglomerativeClustering(
                    n_clusters=n_clusters,
                    linkage=method
                )
                labels = hierarchical.fit_predict(self.X)
                
                scores['silhouette_scores'].append(silhouette_score(self.X, labels))
                scores['calinski_scores'].append(calinski_harabasz_score(self.X, labels))
                scores['davies_scores'].append(davies_bouldin_score(self.X, labels))
                scores['models'].append(hierarchical)
                
            hierarchical_results[method] = scores
            
        # Find best method and number of clusters
        best_method = max(linkage_methods, 
                         key=lambda m: max(hierarchical_results[m]['silhouette_scores']))
        optimal_n_clusters = np.argmax(hierarchical_results[best_method]['silhouette_scores']) + 2
        
        self.best_models['hierarchical'] = AgglomerativeClustering(
            n_clusters=optimal_n_clusters,
            linkage=best_method
        )
        self.cluster_labels['hierarchical'] = self.best_models['hierarchical'].fit_predict(self.X)
        self.results['hierarchical'] = hierarchical_results
        
    def _analyze_feature_importance(self):
        """
        Analyze feature importance for clustering results
        """
        from sklearn.ensemble import RandomForestClassifier
        
        for method in self.cluster_labels:
            rf = RandomForestClassifier(n_estimators=100, random_state=self.random_state)
            rf.fit(self.X_original, self.cluster_labels[method])
            
            self.feature_importance[method] = pd.Series(
                rf.feature_importances_,
                index=self.X_original.columns
            ).sort_values(ascending=False)
            
    def plot_results(self, figsize=(20, 15)):
        """
        Comprehensive visualization of clustering results
        """
        fig = plt.figure(figsize=figsize)
        
        # Plot KMeans results
        if 'kmeans' in self.results:
            plt.subplot(331)
            plt.plot(range(2, len(self.results['kmeans']['silhouette_scores']) + 2),
                    self.results['kmeans']['silhouette_scores'])
            plt.title('KMeans - Silhouette Scores')
            
            plt.subplot(332)
            plt.plot(range(2, len(self.results['kmeans']['inertia']) + 2),
                    self.results['kmeans']['inertia'])
            plt.title('KMeans - Elbow Curve')
            
        # Plot DBSCAN results
        if 'dbscan' in self.results:
            plt.subplot(334)
            plt.plot(self.results['dbscan']['eps_values'],
                    self.results['dbscan']['silhouette_scores'])
            plt.title('DBSCAN - Silhouette Scores vs Eps')
            
            plt.subplot(335)
            plt.plot(self.results['dbscan']['eps_values'],
                    self.results['dbscan']['noise_points'])
            plt.title('DBSCAN - Noise Points vs Eps')
            
        # Plot Hierarchical results
        if 'hierarchical' in self.results:
            plt.subplot(337)
            for method in self.results['hierarchical']:
                plt.plot(range(2, len(self.results['hierarchical'][method]['silhouette_scores']) + 2),
                        self.results['hierarchical'][method]['silhouette_scores'],
                        label=method)
            plt.title('Hierarchical - Silhouette Scores')
            plt.legend()
            
        # Plot feature importance
        plt.subplot(333)
        if self.feature_importance:
            self.feature_importance['kmeans'].plot(kind='bar')
            plt.title('Feature Importance - KMeans')
            plt.xticks(rotation=45)
            
        plt.tight_layout()
        plt.show()
        
    def get_cluster_summary(self):
        """
        Generate summary statistics for each clustering method
        """
        summary = {}
        
        for method in self.cluster_labels:
            labels = self.cluster_labels[method]
            n_clusters = len(np.unique(labels[labels != -1]))
            
            summary[method] = {
                'n_clusters': n_clusters,
                'silhouette_score': silhouette_score(self.X, labels) if n_clusters > 1 else None,
                'calinski_score': calinski_harabasz_score(self.X, labels) if n_clusters > 1 else None,
                'davies_score': davies_bouldin_score(self.X, labels) if n_clusters > 1 else None,
                'cluster_sizes': pd.Series(labels).value_counts().to_dict()
            }
            
        return pd.DataFrame(summary).T

# Usage example:
def perform_clustering_analysis(X):
    """
    Perform comprehensive clustering analysis
    """
    analyzer = AdvancedClusterAnalyzer()
    analyzer.fit(X)
    
    # Plot results
    analyzer.plot_results()
    
    # Get clustering summary
    summary = analyzer.get_cluster_summary()
    print("\nClustering Summary:")
    print(summary)
    
    return analyzer.best_models, analyzer.cluster_labels, summary

In [4]:
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.metrics import (make_scorer, accuracy_score, precision_score, 
                           recall_score, f1_score, roc_auc_score, confusion_matrix)
import time
from typing import Dict, List, Optional, Tuple

class AdvancedModelManager:
    """
    Comprehensive model management system for predictive maintenance
    """
    def __init__(self, random_state: int = 42):
        self.random_state = random_state
        self.models = {}
        self.results = {}
        self.best_model = None
        self.feature_importance = {}
        
    def add_model(self, name: str, model: BaseEstimator, params: Dict = None):
        """
        Add a model to the manager with optional hyperparameters
        """
        self.models[name] = {
            'model': model,
            'params': params or {},
            'best_params': None,
            'cv_results': None,
            'trained_model': None
        }
        
    def setup_default_models(self):
        """
        Setup default models with optimized configurations for predictive maintenance
        """
        from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
        from sklearn.neural_network import MLPClassifier
        from sklearn.svm import SVC
        from xgboost import XGBClassifier
        from lightgbm import LGBMClassifier
        
        # Random Forest
        rf_params = {
            'n_estimators': [100, 200, 300],
            'max_depth': [None, 10, 20, 30],
            'min_samples_split': [2, 5, 10],
            'class_weight': ['balanced', 'balanced_subsample']
        }
        self.add_model('RandomForest', 
                      RandomForestClassifier(random_state=self.random_state),
                      rf_params)
        
        # Gradient Boosting
        gb_params = {
            'n_estimators': [100, 200, 300],
            'learning_rate': [0.01, 0.1, 0.3],
            'max_depth': [3, 5, 7],
            'subsample': [0.8, 0.9, 1.0]
        }
        self.add_model('GradientBoosting',
                      GradientBoostingClassifier(random_state=self.random_state),
                      gb_params)
        
        # Neural Network
        nn_params = {
            'hidden_layer_sizes': [(50,), (100,), (50, 50)],
            'activation': ['relu', 'tanh'],
            'learning_rate': ['adaptive'],
            'max_iter': [300, 500]
        }
        self.add_model('NeuralNetwork',
                      MLPClassifier(random_state=self.random_state),
                      nn_params)
        
        # XGBoost
        xgb_params = {
            'n_estimators': [100, 200, 300],
            'max_depth': [3, 5, 7],
            'learning_rate': [0.01, 0.1, 0.3],
            'subsample': [0.8, 0.9, 1.0],
            'scale_pos_weight': [1, 3, 5]
        }
        self.add_model('XGBoost',
                      XGBClassifier(random_state=self.random_state),
                      xgb_params)
        
        # LightGBM
        lgb_params = {
            'n_estimators': [100, 200, 300],
            'max_depth': [3, 5, 7],
            'learning_rate': [0.01, 0.1, 0.3],
            'subsample': [0.8, 0.9, 1.0],
            'class_weight': ['balanced']
        }
        self.add_model('LightGBM',
                      LGBMClassifier(random_state=self.random_state),
                      lgb_params)
        
    def _create_custom_scorer(self):
        """
        Create custom scoring metrics for model evaluation
        """
        return {
            'accuracy': make_scorer(accuracy_score),
            'precision': make_scorer(precision_score, average='weighted'),
            'recall': make_scorer(recall_score, average='weighted'),
            'f1': make_scorer(f1_score, average='weighted'),
            'roc_auc': make_scorer(roc_auc_score, average='weighted',
                                 needs_proba=True)
        }
        
    def optimize_hyperparameters(self, X: np.ndarray, y: np.ndarray):
        """
        Perform hyperparameter optimization for all models
        """
        from sklearn.model_selection import RandomizedSearchCV
        
        for name, model_info in self.models.items():
            print(f"\nOptimizing {name}...")
            
            # Create RandomizedSearchCV
            random_search = RandomizedSearchCV(
                estimator=model_info['model'],
                param_distributions=model_info['params'],
                n_iter=20,
                cv=StratifiedKFold(n_splits=5, shuffle=True,
                                 random_state=self.random_state),
                scoring=self._create_custom_scorer(),
                refit='f1',
                random_state=self.random_state,
                n_jobs=-1
            )
            
            # Fit and store results
            random_search.fit(X, y)
            self.models[name]['best_params'] = random_search.best_params_
            self.models[name]['cv_results'] = random_search.cv_results_
            self.models[name]['trained_model'] = random_search.best_estimator_
            
    def evaluate_models(self, X: np.ndarray, y: np.ndarray):
        """
        Evaluate all models using cross-validation
        """
        results = []
        
        for name, model_info in self.models.items():
            if model_info['trained_model'] is None:
                continue
                
            # Perform cross-validation
            cv_results = cross_validate(
                model_info['trained_model'],
                X, y,
                cv=StratifiedKFold(n_splits=5, shuffle=True,
                                 random_state=self.random_state),
                scoring=self._create_custom_scorer(),
                return_train_score=True
            )
            
            # Calculate and store results
            mean_results = {
                'Model': name,
                'Test_Accuracy': cv_results['test_accuracy'].mean(),
                'Test_Precision': cv_results['test_precision'].mean(),
                'Test_Recall': cv_results['test_recall'].mean(),
                'Test_F1': cv_results['test_f1'].mean(),
                'Test_ROC_AUC': cv_results['test_roc_auc'].mean(),
                'Train_Accuracy': cv_results['train_accuracy'].mean(),
                'Fit_Time': cv_results['fit_time'].mean(),
                'Score_Time': cv_results['score_time'].mean()
            }
            
            results.append(mean_results)
            
        self.results = pd.DataFrame(results)
        self._select_best_model()
        
    def _select_best_model(self):
        """
        Select the best model based on F1 score
        """
        best_idx = self.results['Test_F1'].idxmax()
        self.best_model = self.models[self.results.loc[best_idx, 'Model']]['trained_model']
        
    def analyze_feature_importance(self, X: pd.DataFrame):
        """
        Analyze feature importance for supported models
        """
        for name, model_info in self.models.items():
            model = model_info['trained_model']
            
            if hasattr(model, 'feature_importances_'):
                self.feature_importance[name] = pd.Series(
                    model.feature_importances_,
                    index=X.columns
                ).sort_values(ascending=False)
            elif hasattr(model, 'coef_'):
                self.feature_importance[name] = pd.Series(
                    np.abs(model.coef_[0]),
                    index=X.columns
                ).sort_values(ascending=False)
                
    def plot_results(self, figsize=(15, 10)):
        """
        Visualize model comparison results
        """
        import matplotlib.pyplot as plt
        import seaborn as sns
        
        fig, axes = plt.subplots(2, 2, figsize=figsize)
        
        # Plot model metrics comparison
        metrics = ['Test_Accuracy', 'Test_Precision', 'Test_Recall', 'Test_F1']
        self.results[['Model'] + metrics].melt(
            id_vars=['Model'],
            value_vars=metrics
        ).plot(kind='bar', x='Model', y='value', hue='variable',
               ax=axes[0, 0])
        axes[0, 0].set_title('Model Performance Comparison')
        
        # Plot training times
        self.results[['Model', 'Fit_Time']].plot(
            kind='bar', x='Model', y='Fit_Time',
            ax=axes[0, 1])
        axes[0, 1].set_title('Training Time Comparison')
        
        # Plot feature importance for best model
        if self.feature_importance:
            best_model_name = self.results.loc[
                self.results['Test_F1'].idxmax(), 'Model'
            ]
            self.feature_importance[best_model_name].plot(
                kind='bar', ax=axes[1, 0])
            axes[1, 0].set_title(f'Feature Importance ({best_model_name})')
        
        plt.tight_layout()
        plt.show()
        
    def get_prediction_proba(self, X: np.ndarray) -> np.ndarray:
        """
        Get probability predictions from best model
        """
        if self.best_model is None:
            raise ValueError("No best model selected. Run evaluate_models first.")
        return self.best_model.predict_proba(X)
        
    def get_feature_importance(self) -> Dict[str, pd.Series]:
        """
        Get feature importance for all supported models
        """
        return self.feature_importance

# Usage example:
def train_and_evaluate_models(X_train, X_test, y_train, y_test):
    """
    Train and evaluate all models
    """
    # Initialize model manager
    model_manager = AdvancedModelManager()
    
    # Setup default models
    model_manager.setup_default_models()
    
    # Optimize hyperparameters
    model_manager.optimize_hyperparameters(X_train, y_train)
    
    # Evaluate models
    model_manager.evaluate_models(X_train, y_train)
    
    # Analyze feature importance
    if isinstance(X_train, pd.DataFrame):
        model_manager.analyze_feature_importance(X_train)
    
    # Plot results
    model_manager.plot_results()
    
    # Print detailed results
    print("\nModel Evaluation Results:")
    print(model_manager.results)
    
    return model_manager

In [5]:
import shap
import lime
import lime.lime_tabular
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Union, Optional
from sklearn.base import BaseEstimator
import plotly.express as px
import plotly.graph_objects as go

class AdvancedModelInterpreter:
    """
    Comprehensive model interpretation using multiple techniques and visualizations
    """
    def __init__(self, 
                 model: BaseEstimator, 
                 X_train: np.ndarray,
                 feature_names: List[str],
                 class_names: List[str] = ['No Failure', 'Failure']):
        """
        Initialize the interpreter
        
        Parameters:
        -----------
        model: trained model
        X_train: training data
        feature_names: list of feature names
        class_names: list of class names
        """
        self.model = model
        self.X_train = X_train
        self.feature_names = feature_names
        self.class_names = class_names
        self.shap_explainer = None
        self.lime_explainer = None
        self.feature_importance = None
        
    def setup_explainers(self):
        """
        Initialize SHAP and LIME explainers
        """
        # Initialize SHAP explainer based on model type
        if hasattr(self.model, 'apply'):  # Tree-based models
            self.shap_explainer = shap.TreeExplainer(self.model)
        else:  # Other models
            background = shap.sample(self.X_train, 100)  # Sample background data
            self.shap_explainer = shap.KernelExplainer(
                self.model.predict_proba, background
            )
            
        # Initialize LIME explainer
        self.lime_explainer = lime.lime_tabular.LimeTabularExplainer(
            self.X_train,
            feature_names=self.feature_names,
            class_names=self.class_names,
            mode='classification'
        )
        
    def explain_prediction(self, 
                         X: np.ndarray, 
                         idx: int = 0,
                         method: str = 'both') -> Dict:
        """
        Generate explanation for a single prediction
        
        Parameters:
        -----------
        X: input data
        idx: index of instance to explain
        method: 'shap', 'lime', or 'both'
        """
        if self.shap_explainer is None or self.lime_explainer is None:
            self.setup_explainers()
            
        explanation = {}
        
        if method in ['shap', 'both']:
            # Generate SHAP explanation
            shap_values = self.shap_explainer.shap_values(X[idx:idx+1])
            if isinstance(shap_values, list):
                shap_values = shap_values[1]  # For tree-based models
                
            explanation['shap'] = {
                'values': shap_values,
                'base_value': self.shap_explainer.expected_value if not isinstance(
                    self.shap_explainer.expected_value, list) 
                    else self.shap_explainer.expected_value[1]
            }
            
        if method in ['lime', 'both']:
            # Generate LIME explanation
            lime_exp = self.lime_explainer.explain_instance(
                X[idx], 
                self.model.predict_proba,
                num_features=len(self.feature_names)
            )
            explanation['lime'] = lime_exp
            
        return explanation
    
    def plot_feature_importance(self, 
                              plot_type: str = 'shap',
                              top_n: Optional[int] = None):
        """
        Plot global feature importance
        
        Parameters:
        -----------
        plot_type: 'shap' or 'permutation'
        top_n: number of top features to show
        """
        plt.figure(figsize=(12, 8))
        
        if plot_type == 'shap':
            if self.shap_explainer is None:
                self.setup_explainers()
                
            # Calculate SHAP values for all instances
            shap_values = self.shap_explainer.shap_values(self.X_train)
            if isinstance(shap_values, list):
                shap_values = shap_values[1]
                
            # Plot SHAP summary
            shap.summary_plot(
                shap_values, 
                self.X_train,
                feature_names=self.feature_names,
                plot_type='bar',
                max_display=top_n
            )
        else:
            # Calculate permutation importance
            from sklearn.inspection import permutation_importance
            perm_importance = permutation_importance(
                self.model, self.X_train, 
                self.model.predict(self.X_train),
                n_repeats=10
            )
            
            # Create importance DataFrame
            importance_df = pd.DataFrame({
                'feature': self.feature_names,
                'importance': perm_importance.importances_mean
            }).sort_values('importance', ascending=False)
            
            if top_n:
                importance_df = importance_df.head(top_n)
                
            # Plot permutation importance
            sns.barplot(
                data=importance_df,
                x='importance',
                y='feature'
            )
            plt.title('Permutation Feature Importance')
            
        plt.tight_layout()
        plt.show()
        
    def plot_feature_interactions(self, 
                                X: np.ndarray,
                                feature1: str,
                                feature2: str):
        """
        Plot interaction effects between two features
        """
        if self.shap_explainer is None:
            self.setup_explainers()
            
        shap_values = self.shap_explainer.shap_values(X)
        if isinstance(shap_values, list):
            shap_values = shap_values[1]
            
        # Get feature indices
        feat1_idx = self.feature_names.index(feature1)
        feat2_idx = self.feature_names.index(feature2)
        
        plt.figure(figsize=(10, 8))
        shap.dependence_plot(
            feat1_idx,
            shap_values,
            X,
            interaction_index=feat2_idx,
            feature_names=self.feature_names
        )
        plt.title(f'Interaction between {feature1} and {feature2}')
        plt.show()
        
    def plot_decision_boundary(self, 
                             X: np.ndarray,
                             feature1: str,
                             feature2: str):
        """
        Plot decision boundary for two features
        """
        # Get feature indices
        feat1_idx = self.feature_names.index(feature1)
        feat2_idx = self.feature_names.index(feature2)
        
        # Create mesh grid
        x1_min, x1_max = X[:, feat1_idx].min() - 1, X[:, feat1_idx].max() + 1
        x2_min, x2_max = X[:, feat2_idx].min() - 1, X[:, feat2_idx].max() + 1
        xx1, xx2 = np.meshgrid(
            np.arange(x1_min, x1_max, (x1_max - x1_min) / 100),
            np.arange(x2_min, x2_max, (x2_max - x2_min) / 100)
        )
        
        # Make predictions
        X_mesh = np.zeros((xx1.ravel().shape[0], X.shape[1]))
        X_mesh[:, feat1_idx] = xx1.ravel()
        X_mesh[:, feat2_idx] = xx2.ravel()
        Z = self.model.predict(X_mesh)
        Z = Z.reshape(xx1.shape)
        
        # Plot
        plt.figure(figsize=(10, 8))
        plt.contourf(xx1, xx2, Z, alpha=0.4)
        plt.scatter(X[:, feat1_idx], X[:, feat2_idx], 
                   c=self.model.predict(X), alpha=0.8)
        plt.xlabel(feature1)
        plt.ylabel(feature2)
        plt.title('Decision Boundary')
        plt.colorbar()
        plt.show()
        
    def plot_prediction_explanation(self, 
                                  X: np.ndarray,
                                  idx: int = 0):
        """
        Plot detailed explanation for a single prediction
        """
        explanation = self.explain_prediction(X, idx)
        
        # Create subplots
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 12))
        
        # Plot SHAP explanation
        if 'shap' in explanation:
            shap_values = explanation['shap']['values']
            base_value = explanation['shap']['base_value']
            
            # Waterfall plot
            shap.plots._waterfall.waterfall_plot(
                base_value,
                shap_values[0],
                feature_names=self.feature_names,
                ax=ax1
            )
            ax1.set_title('SHAP Feature Contributions')
            
        # Plot LIME explanation
        if 'lime' in explanation:
            lime_exp = explanation['lime']
            lime_exp.as_pyplot_figure(ax=ax2)
            ax2.set_title('LIME Feature Contributions')
            
        plt.tight_layout()
        plt.show()
        
    def get_critical_features(self, 
                            X: np.ndarray,
                            threshold: float = 0.1) -> Dict[str, List[str]]:
        """
        Identify critical features for predictions
        
        Parameters:
        -----------
        X: input data
        threshold: importance threshold
        
        Returns:
        --------
        Dictionary of positive and negative influential features
        """
        if self.shap_explainer is None:
            self.setup_explainers()
            
        shap_values = self.shap_explainer.shap_values(X)
        if isinstance(shap_values, list):
            shap_values = shap_values[1]
            
        # Calculate mean absolute SHAP values
        mean_abs_shap = np.abs(shap_values).mean(axis=0)
        
        # Identify critical features
        critical_features = {
            'positive': [],
            'negative': []
        }
        
        for idx, importance in enumerate(mean_abs_shap):
            if importance > threshold:
                if np.mean(shap_values[:, idx]) > 0:
                    critical_features['positive'].append(self.feature_names[idx])
                else:
                    critical_features['negative'].append(self.feature_names[idx])
                    
        return critical_features

# Usage example:
def interpret_model(model, X_train, X_test, feature_names):
    """
    Generate comprehensive model interpretation
    """
    # Initialize interpreter
    interpreter = AdvancedModelInterpreter(
        model=model,
        X_train=X_train,
        feature_names=feature_names
    )
    
    # Generate global feature importance
    print("Generating feature importance plots...")
    interpreter.plot_feature_importance(plot_type='shap')
    interpreter.plot_feature_importance(plot_type='permutation')
    
    # Generate local explanation for a sample
    print("\nGenerating local explanation for a sample prediction...")
    interpreter.plot_prediction_explanation(X_test, idx=0)
    
    # Identify critical features
    print("\nIdentifying critical features...")
    critical_features = interpreter.get_critical_features(X_test)
    print("Positive influential features:", critical_features['positive'])
    print("Negative influential features:", critical_features['negative'])
    
    # Generate feature interactions for top features
    if len(critical_features['positive']) >= 2:
        print("\nGenerating feature interaction plot...")
        interpreter.plot_feature_interactions(
            X_test,
            critical_features['positive'][0],
            critical_features['positive'][1]
        )
    
    return interpreter

ModuleNotFoundError: No module named 'shap'