In [1]:
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.mixture import GaussianMixture
from sklearn.ensemble import IsolationForest
from scipy.stats import gaussian_kde
from scipy.signal import find_peaks
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.utils.validation import check_is_fitted, check_array
from typing import List, Union
import warnings
from sklearn.utils.estimator_checks import check_estimator 
from sklearn.utils.estimator_checks import _yield_all_checks

        
def check_my_estimator(estimator):
    # skip test for nan and inf checks because your transformer allows them
    skipped = ['check_estimators_nan_inf']
    for check in _yield_all_checks(estimator):
        if check.func.__name__ in skipped:
            continue
        check(estimator)  

class SmartOutlierHandler(TransformerMixin, BaseEstimator):
    def __init__(
        self,
        method: str = 'iqr',
        replace_strategy: str = 'median',
        border_quantile: float = 0.01,
        contamination: float = 0.05,
        threshold: Optional[float] = None,
        min_std: float = 1e-3,
        default_value: float = 0,
        verbose: bool = False,
        columns: Optional[List[Union[str, int]]] = None,
        max_components: int = 5,
        min_peak_height_ratio: float = 0.05,
        random_state: Optional[int] = None
    ):

        self.method = method
        self.replace_strategy = replace_strategy
        self.border_quantile = border_quantile
        self.contamination = contamination
        self.threshold = threshold
        self.min_std = min_std
        self.default_value = default_value
        self.verbose = verbose
        self.columns = columns
        self.max_components = max_components
        self.min_peak_height_ratio = min_peak_height_ratio
        self.random_state = random_state
            
    def fit(self, X: Union[pd.DataFrame, np.ndarray], y=None):
        X = check_array(X, ensure_2d=True, ensure_all_finite='allow-nan') 
        #X = check_array(X, ensure_2d=True)
        if np.iscomplexobj(X):
            raise ValueError("Complex data not supported")
        """Fit the outlier handler to the data."""
        # Convert to DataFrame if not already
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
            generated_cols = True
        else:
            generated_cols = False
            
        # Store feature information
        self.n_features_in_ = X.shape[1]
        self.feature_names_in_ = np.asarray(X.columns, dtype=object) if not generated_cols else None
        self.dtypes_ = X.dtypes.to_dict()
        
        # If columns not specified and we have generated columns, use all numeric columns
        if self.columns is None:
            if generated_cols:
                self.columns_ = list(range(X.shape[1]))  # Use all columns for array input
            else:
                self.columns_ = [i for i, col in enumerate(X.columns) 
                               if pd.api.types.is_numeric_dtype(X.iloc[:, i])]
        else:
            self.columns_ = []
            for col in self.columns:
                if isinstance(col, int) and col < X.shape[1]:
                    self.columns_.append(col)
                elif isinstance(col, str) and col in X.columns:
                    self.columns_.append(X.columns.get_loc(col))
        
        if not self.columns_:
            raise ValueError("No valid numeric columns found to process")
        
        # Rest of your original fit logic...
        self.clip_params_ = {} 
        
        for col_idx in self.columns_:
            col_name = X.columns[col_idx]
            series = X.iloc[:, col_idx].dropna()
            
            if len(series) < 2 or series.nunique() < 2:
                if self.verbose:
                    print(f"\n📌 [Column {col_name}] Skipped - insufficient data (n={len(series)}, unique={series.nunique()})")
                continue

            try:
                if self.verbose:
                    print("\n" + "-"*40)
                    print(f"🔧 Processing column: {col_name}")
                    print(f"📈 Data stats: mean={series.mean():.2f}, std={series.std():.2f}")
                    print(f"📊 Min={series.min():.2f}, 25%={series.quantile(0.25):.2f}, "
                          f"50%={series.median():.2f}, 75%={series.quantile(0.75):.2f}, "
                          f"Max={series.max():.2f}")
                    print(f"🔢 Non-null values: {len(series)}")
                
                if self.method == 'gmm':
                    if self.verbose:
                        print("\n🔮 Fitting Gaussian Mixture Model...")
                    model, clip_info = self._fit_gmm(series)
                    self.clip_params_[col_idx] = {'model': model, 'clip_info': clip_info}
                    if self.verbose:
                        print(f"✅ GMM fitted with {model.n_components} components")
                        for cluster, bounds in clip_info.items():
                            print(f"   Cluster {cluster}: bounds=[{bounds['low']:.2f}, {bounds['high']:.2f}]")
                elif self.method == 'iqr':
                    Q1, Q3 = series.quantile([0.25, 0.75])
                    IQR = Q3 - Q1
                    self.clip_params_[col_idx] = {
                        'low': Q1 - 1.5 * IQR,
                        'high': Q3 + 1.5 * IQR
                    }
                    if self.verbose:
                        print("\n📏 Using IQR method:")
                        print(f"   Q1: {Q1:.2f}, Q3: {Q3:.2f}, IQR: {IQR:.2f}")
                        print(f"   Lower bound: {Q1 - 1.5*IQR:.2f}")
                        print(f"   Upper bound: {Q3 + 1.5*IQR:.2f}")
                elif self.method == 'clip':
                    low = series.quantile(self.border_quantile)
                    high = series.quantile(1 - self.border_quantile)
                    self.clip_params_[col_idx] = {'low': low, 'high': high}
                    if self.verbose:
                        print("\n✂️ Using quantile clipping:")
                        print(f"   Lower bound ({self.border_quantile*100}%): {low:.2f}")
                        print(f"   Upper bound ({(1-self.border_quantile)*100}%): {high:.2f}")
                elif self.method == 'isolation_forest':
                    if self.verbose:
                        print("\n🌲 Fitting Isolation Forest...")
                    iso = IsolationForest(
                        contamination=self.contamination, 
                        random_state=42
                    )
                    iso.fit(series.values.reshape(-1, 1))
                    self.clip_params_[col_idx] = {
                        'model': iso,
                        'low': series.quantile(self.border_quantile),
                        'high': series.quantile(1 - self.border_quantile)
                    }
                    if self.verbose:
                        print(f"✅ Isolation Forest fitted with contamination={self.contamination}")
                        print(f"   Fallback bounds: [{self.clip_params_[col_idx]['low']:.2f}, "
                              f"{self.clip_params_[col_idx]['high']:.2f}]")
                
                if self.verbose and self.method in ['iqr', 'clip', 'isolation_forest']:
                    bounds = self.clip_params_[col_idx]
                    n_outliers_low = (series < bounds['low']).sum()
                    n_outliers_high = (series > bounds['high']).sum()
                    total_outliers = n_outliers_low + n_outliers_high
                    outlier_pct = total_outliers / len(series) * 100
                    print(f"🔍 Outliers detected: {total_outliers} ({outlier_pct:.1f}%)")
                    print(f"   - Below lower bound: {n_outliers_low}")
                    print(f"   - Above upper bound: {n_outliers_high}")
                
            except Exception as e:
                if self.verbose:
                    print(f"\n❌ [Column {col_name}] Error during fitting: {str(e)}")
                continue

        if self.verbose:
            print("\n" + "="*50)
            print("✅ SmartOutlierHandler fitting completed successfully!")
            print(f"📋 Processed {len(self.clip_params_)} columns")
            print("="*50 + "\n")
        
        return self

    def transform(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray:
        """Transform the data by handling outliers with proper dtype preservation."""
        check_is_fitted(self)

        X = check_array(X, ensure_2d=True, ensure_all_finite='allow-nan')
        #X = check_array(X, ensure_2d=True)
        if np.iscomplexobj(X):
            raise ValueError("Complex data not supported")
        # Convert to DataFrame if not already
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
            
        # Validate feature dimensions
        if X.shape[1] != self.n_features_in_:
            raise ValueError(
                f"X has {X.shape[1]} features, but SmartOutlierHandler is "
                f"expecting {self.n_features_in_} features as input."
            )
            
        # Create a copy to avoid modifying original data
        X = X.copy() 
        
        numeric_cols = [X.columns[col_idx] for col_idx in self.columns_ if col_idx < X.shape[1]]
        
        if self.verbose:
            print(f"🔢 Processing {len(numeric_cols)} numeric columns")
            print(f"📋 Columns: {numeric_cols}")
        
        # Convert numeric columns to float for processing while keeping original values
        original_values = {col: X[col].copy() for col in numeric_cols}
        X[numeric_cols] = X[numeric_cols].astype(np.float64)
        
        # Process outliers
        total_outliers = 0
        for col_idx in self.clip_params_:
            if col_idx >= X.shape[1]:
                continue
                
            col_name = X.columns[col_idx]
            col_values = X[col_name]
            mask_notna = col_values.notna()
            valid_vals = col_values[mask_notna]
            
            if len(valid_vals) == 0:
                if self.verbose:
                    print(f"\n📌 [Column {col_name}] No non-null values to process")
                continue
                
            if self.verbose:
                print(f"\n🔧 Processing column: {col_name}")
                print(f"📊 Non-null values: {len(valid_vals)}")
                
            col_outliers = 0
            
            if self.method == 'gmm':
                if self.verbose:
                    print("🔮 Using GMM method for outlier detection")
                clusters = self.clip_params_[col_idx]['model'].predict(
                    valid_vals.values.reshape(-1, 1))
                for idx, cluster_id in zip(valid_vals.index, clusters):
                    val = valid_vals.loc[idx]
                    clip_info = self.clip_params_[col_idx]['clip_info'][cluster_id]
                    if val < clip_info['low'] or val > clip_info['high']:
                        new_val = self._replace(val, valid_vals, 
                                             clip_info['low'], 
                                             clip_info['high'])
                        X.loc[idx, col_name] = new_val
                        col_outliers += 1
            elif self.method == 'isolation_forest':
                if self.verbose:
                    print("🌲 Using Isolation Forest for outlier detection")
                preds = self.clip_params_[col_idx]['model'].predict(
                    valid_vals.values.reshape(-1, 1))
                outliers_idx = valid_vals.index[preds == -1]
                col_outliers = len(outliers_idx)
                for idx in outliers_idx:
                    val = valid_vals.loc[idx]
                    bounds = self.clip_params_[col_idx]
                    new_val = self._replace(val, valid_vals,
                                         bounds['low'],
                                         bounds['high'])
                    X.loc[idx, col_name] = new_val
            else:
                bounds = self.clip_params_[col_idx]
                if self.verbose:
                    print(f"📏 Using {'IQR' if self.method == 'iqr' else 'quantile'} bounds:")
                    print(f"   Lower: {bounds['low']:.2f}, Upper: {bounds['high']:.2f}")
                
                for idx in valid_vals.index:
                    val = valid_vals.loc[idx]
                    if val < bounds['low'] or val > bounds['high']:
                        new_val = self._replace(val, valid_vals,
                                             bounds['low'],
                                             bounds['high'])
                        X.loc[idx, col_name] = new_val
                        col_outliers += 1
            
            total_outliers += col_outliers
            if self.verbose:
                outlier_pct = col_outliers / len(valid_vals) * 100
                print(f"🔍 Outliers handled: {col_outliers} ({outlier_pct:.1f}%)")
                print(f"🔄 Replacement strategy: {self.replace_strategy}")
        
        # Convert back to original dtypes carefully
        for col in numeric_cols:
            original_dtype = original_values[col].dtype
            if pd.api.types.is_integer_dtype(original_dtype):
                if X[col].isna().any():
                    # Use pandas' nullable integer type if there are NaNs
                    new_dtype = f'Int{original_dtype.itemsize*8}'
                    if self.verbose:
                        print(f"\n🔄 Converting {col} to nullable integer type: {new_dtype}")
                    X[col] = X[col].round().astype(new_dtype)
                else:
                    # Round and convert back to original integer type
                    if self.verbose:
                        print(f"\n🔄 Converting {col} back to original integer type: {original_dtype}")
                    X[col] = X[col].round().astype(original_dtype)
            else:
                # For non-integer types, convert directly
                if self.verbose:
                    print(f"\n🔄 Converting {col} back to original dtype: {original_dtype}")
                X[col] = X[col].astype(original_dtype)
        
        if self.verbose:
            print("\n" + "="*50)
            print("✅ Transformation completed successfully!")
            print(f"🔍 Total outliers handled: {total_outliers}")
            print("="*50 + "\n")
        
        return X.values

    def fit_transform(self, X, y=None):
        return self.fit(X, y).transform(X)

    def _fit_gmm(self, series: pd.Series):
        """Fit Gaussian Mixture Model to detect clusters with robust enhancements."""
        data = series.values.reshape(-1, 1)
        
        if self.verbose:
            print("\n🔍 Starting GMM fitting process...")
            print(f"📊 Data range: [{data.min():.2f}, {data.max():.2f}]")
        
        # **🌟 1. Improved KDE peak detection**
        if self.verbose:
            print("📈 Computing Kernel Density Estimation...")
        kde = gaussian_kde(data.ravel())
        x = np.linspace(data.min(), data.max(), 1000)
        y = kde(x)
        
        # **🌟 Better peak detection with prominence filtering**
        if self.verbose:
            print("🏔️ Detecting peaks in density estimation...")
        peaks, properties = find_peaks(y, 
                                     height=self.min_peak_height_ratio*y.max(),
                                     prominence=0.1*y.max())  # **🌟 Min prominence
        
        if self.verbose:
            print(f"🔍 Found {len(peaks)} potential peaks in density")
        
        # **🌟 2. Bayesian Information Criterion for component selection**
        bic_scores = []
        max_possible_components = min(len(peaks), self.max_components) or 1
        n_components_range = range(1, max_possible_components + 1)
        
        if self.verbose:
            print("\n🔮 Testing different numbers of components:")
            print(f"🔢 Testing components: {list(n_components_range)}")
        
        for n in n_components_range:
            gmm = GaussianMixture(n_components=n, 
                                 covariance_type='full',  # **🌟 Flexible shapes
                                 random_state=42,
                                 reg_covar=1e-6)  # **🌟 Numerical stability
            gmm.fit(data)
            bic_scores.append(gmm.bic(data))
            if self.verbose:
                print(f"   - Components={n}: BIC={gmm.bic(data):.2f}")
        
        # **🌟 Select model with lowest BIC**
        best_n = n_components_range[np.argmin(bic_scores)]
        best_gmm = GaussianMixture(n_components=best_n,
                                  covariance_type='full',
                                  random_state=42,
                                  reg_covar=1e-6).fit(data)
        
        if self.verbose:
            print(f"\n🏆 Selected model with {best_n} components (lowest BIC)")
            print(f"📊 Component weights: {best_gmm.weights_}")
        
        # **🌟 3. Cluster validation and filtering**
        clusters = best_gmm.predict(data)
        cluster_clip = {}
        valid_clusters = 0
        
        if self.verbose:
            print("\n🔍 Validating clusters:")
        
        for c in range(best_n):
            cluster_data = data[clusters == c]
            cluster_size = len(cluster_data)
            
            # **🌟 Minimum cluster size requirement (5% of data)**
            if cluster_size > max(5, 0.05 * len(data)):
                low = np.quantile(cluster_data, self.border_quantile)
                high = np.quantile(cluster_data, 1 - self.border_quantile)
                cluster_clip[c] = {'low': low, 'high': high}
                valid_clusters += 1
                
                if self.verbose:
                    print(f"   ✅ Cluster {c}: size={cluster_size} ({cluster_size/len(data)*100:.1f}%)")
                    print(f"      Bounds: [{low:.2f}, {high:.2f}]")
            else:
                if self.verbose:
                    print(f"   ❌ Cluster {c}: too small (size={cluster_size}), skipping")
        
        # **🌟 Fallback to global quantiles if no valid clusters**
        if valid_clusters == 0:
            if self.verbose:
                print("⚠️ No valid clusters found - using global quantiles")
            low = np.quantile(data, self.border_quantile)
            high = np.quantile(data, 1 - self.border_quantile)
            cluster_clip[0] = {'low': low, 'high': high}
            
            if self.verbose:
                print(f"   Global bounds: [{low:.2f}, {high:.2f}]")
        
        return best_gmm, cluster_clip

    def _replace(self, value: float, series: pd.Series, 
                low: float, high: float) -> float:
        """Replace outlier value according to strategy."""
        #if self.verbose: print(f"   🔄 Replacing outlier value {value:.2f} (bounds: [{low:.2f}, {high:.2f}])")
        
        if self.replace_strategy == 'clip':
            new_val = np.clip(value, low, high)
            #if self.verbose: print(f"      ✂️ Clipped to: {new_val:.2f}")
            return new_val
        elif self.replace_strategy == 'null':
            #if self.verbose: print("      🚫 Replaced with NaN")
            return np.nan
        elif self.replace_strategy == 'mean':
            new_val = series.mean()
            #if self.verbose: print(f"      📊 Replaced with mean: {new_val:.2f}")
            return new_val
        elif self.replace_strategy == 'median':
            new_val = series.median()
            #if self.verbose: print(f"      📊 Replaced with median: {new_val:.2f}")
            return new_val
        else:
            raise ValueError(f"Unknown replace strategy: {self.replace_strategy}")

    def get_feature_names_out(self, input_features=None):
        """Get output feature names."""
        check_is_fitted(self)
        if input_features is None:
            if hasattr(self, 'feature_names_in_'):
                return self.feature_names_in_
            return np.array([f"feature_{i}" for i in range(self.n_features_in_)])
        return np.asarray(input_features, dtype=object)

    @classmethod
    def build_full_pipeline(cls, data: pd.DataFrame, 
                           numeric_cols: List[str] = None,
                           method: str = 'iqr',
                           replace_strategy: str = 'clip',
                           **kwargs) -> Pipeline:
        """
        Build complete pipeline with imputation and outlier handling.
        """
        if numeric_cols is None:
            numeric_cols = data.select_dtypes(include=np.number).columns.tolist()
        
        return Pipeline([
            ('preprocessor', ColumnTransformer([
                ('numeric', Pipeline([
                    ('imputer', SimpleImputer(strategy='median')),
                    ('outlier', cls(
                        columns=numeric_cols,
                        method=method,
                        replace_strategy=replace_strategy,
                        **kwargs
                    ))
                ]), numeric_cols),
                ('passthrough', 'passthrough', 
                 list(set(data.columns) - set(numeric_cols)))
            ], remainder='passthrough'))
        ])
 

NameError: name 'Optional' is not defined