In [None]:
# data_preprocessor.py

import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, PowerTransformer, OrdinalEncoder
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import BorderlineSMOTE, ADASYN, SMOTE, SMOTENC
from imblearn.combine import SMOTEENN, SMOTETomek
from sklearn.ensemble import IsolationForest
from scipy.stats import shapiro, anderson
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import logging
from typing import List, Optional, Dict, Tuple
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import pairwise_distances
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm

class DataPreprocessor:
    def __init__(
        self, 
        model_type: str, 
        column_assets: Dict[str, List[str]], 
        perform_split: bool = True, 
        debug: bool = False
    ):
        """
        Initialize the DataPreprocessor.

        Args:
            model_type (str): Type of the machine learning model.
            column_assets (dict): Dictionary containing lists of ordinal, nominal, and numerical columns.
            perform_split (bool): Whether to perform train-test split and apply SMOTE.
            debug (bool): Flag to enable detailed debugging information.
        """
        self.model_type = model_type
        self.ordinal_categoricals = column_assets.get('ordinal_categoricals', [])
        self.nominal_categoricals = column_assets.get('nominal_categoricals', [])
        self.numericals = column_assets.get('numericals', [])
        self.y_variable = column_assets.get('y_variable', '')
        self.perform_split = perform_split
        self.debug = debug

        # Containers for transformers
        self.preprocessor = None
        self.pipeline = None
        self.smote = None

        # Initialize transformer to None
        self.transformer = None

        # To keep track of preprocessing steps and reasons
        self.preprocessing_steps = []
        self.feature_reasons = {col: '' for col in self.ordinal_categoricals + self.nominal_categoricals + self.numericals}

        # Configure logging
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
        handler.setFormatter(formatter)
        if not self.logger.handlers:
            self.logger.addHandler(handler)

        # Store encoders for inverse transformation
        self.ordinal_encoder = None
        self.nominal_encoder = None

    def analyze_smote_criteria(
        self,
        X_train: pd.DataFrame,
        y_train: pd.Series,
        debug: bool = False,
        imbalance_threshold: float = 0.2,
        noise_threshold: float = 0.5,
        overlap_threshold: float = 0.3,
        boundary_threshold: float = 0.4,
        extreme_imbalance_threshold: float = 0.05
    ) -> Dict:
        """
        Analyzes the training dataset to recommend the best SMOTE variant.

        Parameters:
            X_train (pd.DataFrame): Training features.
            y_train (pd.Series): Training target labels.
            debug (bool): Whether to log debug information.
            imbalance_threshold (float): Threshold for severe imbalance.
            noise_threshold (float): Threshold for noise detection.
            overlap_threshold (float): Threshold for class overlap detection.
            boundary_threshold (float): Threshold for boundary concentration detection.
            extreme_imbalance_threshold (float): Threshold for extreme imbalance.

        Returns:
            dict: Recommendations for SMOTE variants and analysis details.
        """
        # Initialize logger
        logger = self.logger

        # Step 1: Class Distribution
        class_distribution = y_train.value_counts(normalize=True)
        majority_class = class_distribution.idxmax()
        minority_class = class_distribution.idxmin()

        severe_imbalance = class_distribution[minority_class] < imbalance_threshold
        extreme_imbalance = class_distribution[minority_class] < extreme_imbalance_threshold

        if debug:
            logger.debug(f"X_train Shape: {X_train.shape}")
            logger.debug(f"Class Distribution: {class_distribution.to_dict()}")
            if extreme_imbalance:
                logger.warning(f"Extreme imbalance detected: {class_distribution[minority_class]:.2%}")

        # Step 2: Noise Analysis
        minority_samples = X_train[y_train == minority_class]
        majority_samples = X_train[y_train == majority_class]

        try:
            knn = NearestNeighbors(n_neighbors=5).fit(majority_samples)
            distances, _ = knn.kneighbors(minority_samples)
            median_distance = np.median(distances)
            noise_ratio = np.mean(distances < median_distance)
            noisy_data = noise_ratio > noise_threshold

            if debug:
                logger.debug(f"Median Distance to Nearest Neighbors: {median_distance}")
                logger.debug(f"Noise Ratio: {noise_ratio:.2%}")
        except ValueError as e:
            logger.error(f"Noise analysis error: {e}")
            noisy_data = False

        # Step 3: Overlap Analysis
        try:
            pdistances = pairwise_distances(minority_samples, majority_samples)
            overlap_metric = np.mean(pdistances < 1.0)  # Threshold can be adjusted
            overlapping_classes = overlap_metric > overlap_threshold

            if debug:
                logger.debug(f"Overlap Metric: {overlap_metric:.2%}")
        except ValueError as e:
            logger.error(f"Overlap analysis error: {e}")
            overlapping_classes = False

        # Step 4: Boundary Concentration
        try:
            boundary_ratio = np.mean(np.min(distances, axis=1) < np.percentile(distances, 25))
            boundary_concentration = boundary_ratio > boundary_threshold

            if debug:
                logger.debug(f"Boundary Concentration Ratio: {boundary_ratio:.2%}")
        except Exception as e:
            logger.error(f"Boundary concentration error: {e}")
            boundary_concentration = False

        # Step 5: Recommendations
        recommendations = []
        if severe_imbalance:
            recommendations.append("ADASYN" if not noisy_data else "SMOTEENN")
        if noisy_data:
            recommendations.append("SMOTEENN")
        if overlapping_classes:
            recommendations.append("SMOTETomek")
        if boundary_concentration:
            recommendations.append("BorderlineSMOTE")
        if not recommendations:
            recommendations.append("SMOTE")

        if debug:
            logger.debug("SMOTE Analysis Complete.")
            logger.debug(f"Recommendations: {recommendations}")

        return {
            "recommendations": recommendations,
            "details": {
                "severe_imbalance": severe_imbalance,
                "noisy_data": noisy_data,
                "overlapping_classes": overlapping_classes,
                "boundary_concentration": boundary_concentration
            }
        }

    def implement_smote(self, X_train: pd.DataFrame, y_train: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
        """
        Implement the most suitable SMOTE variant based on the dataset characteristics.

        Args:
            X_train (pd.DataFrame): Training features.
            y_train (pd.Series): Training target.

        Returns:
            tuple: Resampled X_train and y_train.
        """
        step_name = "Implementing SMOTE for Class Imbalance"
        self.logger.info(f"Step: {step_name}")

        # Calculate class distribution
        class_counts = y_train.value_counts()
        if class_counts.empty:
            self.logger.warning("No classes found in y_train. Skipping SMOTE.")
            return X_train, y_train

        majority_class = class_counts.idxmax()
        minority_class = class_counts.idxmin()
        majority_count = class_counts.max()
        minority_count = class_counts.min()
        imbalance_ratio = minority_count / majority_count
        self.logger.info(f"Class Distribution before SMOTE: {class_counts.to_dict()}")
        self.logger.info(f"Imbalance Ratio (Minority/Majority): {imbalance_ratio:.4f}")

        # If all classes have the same number of samples, skip SMOTE
        if class_counts.nunique() == 1:
            self.logger.info("All classes have the same number of samples. Skipping SMOTE.")
            return X_train, y_train

        # Identify categorical feature indices for SMOTENC
        categorical_features = []

        # For ordinal_categoricals (already encoded as ordinal integers)
        for col in self.ordinal_categoricals:
            try:
                idx = X_train.columns.get_loc(col)
                categorical_features.append(idx)
            except KeyError:
                self.logger.error(f"Categorical feature '{col}' not found in X_train columns.")
                raise

        # For nominal_categoricals (encoded as 'gender_encoded', 'city_encoded')
        for col in getattr(self, 'nominal_categorical_encoded', []):
            try:
                idx = X_train.columns.get_loc(col)
                categorical_features.append(idx)
            except KeyError:
                self.logger.error(f"Nominal categorical feature '{col}' not found in X_train columns.")
                raise

        # Determine if dataset is numerical-only
        is_numerical_only = len(categorical_features) == 0

        if is_numerical_only:
            # Analyze dataset to recommend SMOTE variant
            smote_analysis = self.analyze_smote_criteria(X_train, y_train, debug=self.debug)
            recommendations = smote_analysis['recommendations']

            # Select the first recommendation as the primary SMOTE variant
            smote_variant = recommendations[0] if recommendations else "SMOTE"

            # Initialize the chosen SMOTE variant
            smote = None
            if smote_variant == "SMOTE":
                smote = SMOTE(random_state=42)
            elif smote_variant == "BorderlineSMOTE":
                smote = BorderlineSMOTE(random_state=42)
            elif smote_variant == "SVMSMOTE":
                smote = SVMSMOTE(random_state=42)
            elif smote_variant == "ADASYN":
                smote = ADASYN(random_state=42)
            elif smote_variant == "SMOTEENN":
                smote = SMOTEENN(random_state=42)
            elif smote_variant == "SMOTETomek":
                smote = SMOTETomek(random_state=42)
            else:
                self.logger.warning(f"Unsupported SMOTE variant '{smote_variant}'. Falling back to SMOTE.")
                smote = SMOTE(random_state=42)

            reason = f"Selected {smote_variant} based on dataset analysis."

        else:
            # Initialize SMOTENC for mixed data
            smote = SMOTENC(categorical_features=categorical_features, random_state=42)
            smote_variant = "SMOTENC"
            reason = "Mixed numerical and categorical features."

        # Apply SMOTE
        try:
            X_res, y_res = smote.fit_resample(X_train, y_train)
            self.smote = smote
            self.preprocessing_steps.append(step_name)
            self.logger.info(f"Selected SMOTE Variant: {smote_variant}")
            self.logger.info(f"Reason for Selection: {reason}")
            self.logger.info(f"SMOTE Completed. Resampled X_train shape: {X_res.shape}, y_train shape: {y_res.shape}")
            if self.debug:
                self.logger.debug(f"Class distribution after SMOTE:\n{pd.Series(y_res).value_counts()}")
            return X_res, y_res
        except Exception as e:
            self.logger.error(f"SMOTE implementation failed: {e}")
            raise

    def handle_missing_values(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Handle missing values for numerical and categorical features with visualization.

        Args:
            X (pd.DataFrame): Input features.

        Returns:
            pd.DataFrame: Features with imputed missing values.
        """
        step_name = "Handling Missing Values"
        self.logger.info(f"Step: {step_name}")

        # Visualize missing data before imputation
        plt.figure(figsize=(10, 6))
        sns.heatmap(X.isnull(), cbar=False, cmap='viridis')
        plt.title('Missing Data Before Imputation')
        plt.savefig('missing_data_before_imputation.png')
        plt.close()
        self.logger.debug("Saved plot: missing_data_before_imputation.png")

        # Numerical Imputation
        if self.numericals:
            if self.model_type in ['Linear Regression', 'Logistic Regression']:
                strategy = 'mean'
            elif self.model_type in ['Time Series Models']:
                strategy = 'interpolate'  # Interpolation handled differently
            else:
                strategy = 'median'
            self.logger.debug(f"Numerical Imputation Strategy: {strategy.capitalize()}")

            if strategy == 'interpolate':
                X[self.numericals] = X[self.numericals].interpolate(method='linear').fillna(method='bfill').fillna(method='ffill')
                for col in self.numericals:
                    self.feature_reasons[col] += f'Numerical: Interpolation Imputation | '
            else:
                numerical_imputer = SimpleImputer(strategy=strategy)
                X[self.numericals] = numerical_imputer.fit_transform(X[self.numericals])
                self.numerical_imputer = numerical_imputer
                for col in self.numericals:
                    self.feature_reasons[col] += f'Numerical: {strategy.capitalize()} Imputation | '

        # Categorical Imputation
        all_categoricals = self.ordinal_categoricals + self.nominal_categoricals
        if all_categoricals:
            if self.model_type in ['Tree-Based Models']:
                strategy = 'constant'
                fill_value = 'Missing'
                self.logger.debug(f"Categorical Imputation Strategy: {strategy.capitalize()} with fill value '{fill_value}'")
                categorical_imputer = SimpleImputer(strategy=strategy, fill_value=fill_value)
                X[all_categoricals] = categorical_imputer.fit_transform(X[all_categoricals])
                self.categorical_imputer = categorical_imputer
                for col in all_categoricals:
                    self.feature_reasons[col] += f'Categorical: Constant Imputation | '
            else:
                strategy = 'most_frequent'
                self.logger.debug(f"Categorical Imputation Strategy: {strategy.capitalize()}")
                categorical_imputer = SimpleImputer(strategy=strategy)
                X[all_categoricals] = categorical_imputer.fit_transform(X[all_categoricals])
                self.categorical_imputer = categorical_imputer
                for col in all_categoricals:
                    self.feature_reasons[col] += f'Categorical: Mode Imputation | '

        # Visualize missing data after imputation
        plt.figure(figsize=(10, 6))
        sns.heatmap(X.isnull(), cbar=False, cmap='viridis')
        plt.title('Missing Data After Imputation')
        plt.savefig('missing_data_after_imputation.png')
        plt.close()
        self.logger.debug("Saved plot: missing_data_after_imputation.png")

        # Visualize distributions before and after imputation (for numerical features)
        for col in self.numericals:
            plt.figure(figsize=(12, 5))

            plt.subplot(1, 2, 1)
            sns.histplot(X[col].dropna(), kde=True, color='blue')
            plt.title(f'Distribution of {col} After Imputation')

            plt.subplot(1, 2, 2)
            sns.boxplot(x=X[col], color='orange')
            plt.title(f'Boxplot of {col} After Imputation')

            plt.tight_layout()
            plt.savefig(f'{col}_distribution_after_imputation.png')
            plt.close()
            self.logger.debug(f"Saved plots: {col}_distribution_after_imputation.png")

        self.preprocessing_steps.append(step_name)
        self.logger.info(f"Completed: {step_name}. Dataset shape: {X.shape}")

        if self.debug:
            self.logger.debug(f"DataFrame shape after {step_name}: {X.shape}")
            self.logger.debug(f"Columns after {step_name}: {all_categoricals + self.numericals}")
            for col in all_categoricals + self.numericals:
                self.logger.debug(f"Column '{col}' - Data Type: {X[col].dtype}, Sample Values: {X[col].dropna().unique()[:5]}")

        return X

    def test_normality(self, X: pd.DataFrame) -> Dict[str, Dict]:
        """
        Test normality for numerical features with visualization.

        Args:
            X (pd.DataFrame): Input features.

        Returns:
            dict: Normality test results per feature.
        """
        step_name = "Testing for Normality"
        self.logger.info(f"Step: {step_name}")
        normality_results = {}
        
        for col in self.numericals:
            data = X[col].dropna()
            skewness = data.skew()
            kurtosis = data.kurtosis()
            n = data.shape[0]

            # Choose test based on dataset size
            if n <= 5000:
                stat, p_value = shapiro(data)
                test_used = 'Shapiro-Wilk'
            else:
                result = anderson(data)
                stat = result.statistic
                p_value = 0.0  # Default if not normal
                for cv, sig in zip(result.critical_values, result.significance_level):
                    if stat < cv:
                        p_value = sig / 100
                        break
                test_used = 'Anderson-Darling'

            is_normal = p_value > 0.05
            normality_results[col] = {
                'skewness': skewness,
                'kurtosis': kurtosis,
                'p_value': p_value,
                'test_used': test_used,
                'is_normal': is_normal
            }
            self.logger.info(f"Normality Test for '{col}': p-value={p_value:.4f}, is_normal={is_normal}")
            self.logger.debug(f"Skewness: {skewness:.4f}, Kurtosis: {kurtosis:.4f}")

            # Visualization before transformation
            plt.figure(figsize=(12, 5))

            plt.subplot(1, 2, 1)
            sns.histplot(data, kde=True, color='blue')
            plt.title(f'Histogram of {col} Before Transformation')

            plt.subplot(1, 2, 2)
            sm.qqplot(data, line='s', ax=plt.gca())
            plt.title(f'QQ-Plot of {col} Before Transformation')

            plt.tight_layout()
            plt.savefig(f'{col}_normality_before.png')
            plt.close()
            self.logger.debug(f"Saved plots: {col}_normality_before.png")

            # Decision to transform
            if not is_normal:
                # Decide on transformation based on skewness
                if skewness > 0.75:
                    transformation = 'log'
                elif skewness < -0.75:
                    transformation = 'inverse'
                else:
                    transformation = 'yeo-johnson'
                normality_results[col]['transformation'] = transformation
                self.logger.info(f"Feature '{col}' is not normal. Recommended Transformation: {transformation}")
            else:
                normality_results[col]['transformation'] = None
                self.logger.info(f"Feature '{col}' is normal. No Transformation Needed.")

        self.normality_results = normality_results
        self.preprocessing_steps.append(step_name)
        self.logger.info(f"Completed: {step_name}. Dataset shape: {X.shape}")

        if self.debug:
            for col, stats in normality_results.items():
                self.logger.debug(
                    f"Feature '{col}': Skewness={stats['skewness']:.4f}, Kurtosis={stats['kurtosis']:.4f}, "
                    f"P-Value={stats['p_value']:.4f}, Test Used={stats['test_used']}, Is Normal={stats['is_normal']}"
                )

        return normality_results


    def apply_transformations(self, X: pd.DataFrame, normality_results: Dict[str, Dict]) -> pd.DataFrame:
        """
        Apply transformations to numerical features based on normality test results.

        Args:
            X (pd.DataFrame): Input features.
            normality_results (dict): Results from normality tests.

        Returns:
            pd.DataFrame: Transformed features.
        """
        step_name = "Applying Transformations Based on Normality Tests"
        self.logger.info(f"Step: {step_name}")

        # Initialize transformer storage
        self.transformers = {}

        for col, stats in normality_results.items():
            transformation = stats.get('transformation')
            if transformation:
                if transformation == 'log':
                    # Apply log transformation; add a small constant to handle zeros
                    X[col] = np.log1p(X[col])
                    self.transformers[col] = 'log1p'
                    self.feature_reasons[col] += 'Applied Log1p Transformation | '
                    self.logger.debug(f"Applied Log1p Transformation to '{col}'")
                elif transformation == 'inverse':
                    # Apply inverse transformation
                    X[col] = 1 / (X[col] + 1e-6)  # Add small constant to avoid division by zero
                    self.transformers[col] = 'inverse'
                    self.feature_reasons[col] += 'Applied Inverse Transformation | '
                    self.logger.debug(f"Applied Inverse Transformation to '{col}'")
                elif transformation == 'yeo-johnson':
                    # Apply Yeo-Johnson transformation
                    transformer = PowerTransformer(method='yeo-johnson')
                    X[col] = transformer.fit_transform(X[[col]])
                    self.transformers[col] = transformer
                    self.feature_reasons[col] += 'Applied Yeo-Johnson Transformation | '
                    self.logger.debug(f"Applied Yeo-Johnson Transformation to '{col}'")
        
        # Visualize distributions after transformation
        for col, transformer in self.transformers.items():
            plt.figure(figsize=(12, 5))

            plt.subplot(1, 2, 1)
            sns.histplot(X[col].dropna(), kde=True, color='green')
            plt.title(f'Histogram of {col} After Transformation')

            plt.subplot(1, 2, 2)
            sm.qqplot(X[col], line='s', ax=plt.gca())
            plt.title(f'QQ-Plot of {col} After Transformation')

            plt.tight_layout()
            plt.savefig(f'{col}_normality_after.png')
            plt.close()
            self.logger.debug(f"Saved plots: {col}_normality_after.png")

        self.preprocessing_steps.append(step_name)
        self.logger.info(f"Completed: {step_name}. Dataset shape: {X.shape}")

        if self.debug:
            for col, transformer in self.transformers.items():
                self.logger.debug(f"Feature '{col}' transformed using {transformer}")

        return X
    
    def handle_outliers(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Handle outliers based on the model's sensitivity.

        Args:
            X (pd.DataFrame): Input features.

        Returns:
            pd.DataFrame: Features with outliers handled.
        """
        step_name = "Handling Outliers"
        self.logger.info(f"Step: {step_name}")
        for col in self.numericals:
            initial_shape = X.shape[0]
            if self.model_type in ['Logistic Regression', 'Linear Regression']:
                # Z-Score Method
                z_scores = np.abs((X[col] - X[col].mean()) / X[col].std())
                X = X[z_scores < 3]
                self.feature_reasons[col] += 'Outliers handled with Z-Score | '
                self.logger.debug(f"Removed {initial_shape - X.shape[0]} outliers from '{col}' using Z-Score")
                initial_shape = X.shape[0]

                # Tukey's IQR Method
                Q1 = X[col].quantile(0.25)
                Q3 = X[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                before_iqr_shape = X.shape[0]
                X = X[(X[col] >= lower_bound) & (X[col] <= upper_bound)]
                after_iqr_shape = X.shape[0]
                self.feature_reasons[col] += 'Outliers handled with IQR | '
                self.logger.debug(f"Removed {before_iqr_shape - after_iqr_shape} outliers from '{col}' using IQR")

            elif self.model_type in ['SVM', 'k-NN']:
                # Tukey's IQR Method
                Q1 = X[col].quantile(0.25)
                Q3 = X[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                before_iqr_shape = X.shape[0]
                X = X[(X[col] >= lower_bound) & (X[col] <= upper_bound)]
                after_iqr_shape = X.shape[0]
                self.feature_reasons[col] += 'Outliers handled with IQR | '
                self.logger.debug(f"Removed {before_iqr_shape - after_iqr_shape} outliers from '{col}' using IQR")

                # Winsorization
                X[col] = X[col].clip(lower_bound, upper_bound)
                self.feature_reasons[col] += 'Outliers handled with Winsorization | '
                self.logger.debug(f"Winsorized '{col}' to bounds ({lower_bound}, {upper_bound})")

            elif self.model_type == 'Neural Networks':
                # Winsorization
                Q1 = X[col].quantile(0.25)
                Q3 = X[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                X[col] = X[col].clip(lower_bound, upper_bound)
                self.feature_reasons[col] += 'Outliers handled with Winsorization | '
                self.logger.debug(f"Winsorized '{col}' to bounds ({lower_bound}, {upper_bound})")

            elif self.model_type == 'Clustering':
                # Isolation Forest
                iso = IsolationForest(contamination=0.05, random_state=42)
                preds = iso.fit_predict(X[[col]])
                before_iso_shape = X.shape[0]
                X = X[preds == 1]
                after_iso_shape = X.shape[0]
                self.feature_reasons[col] += 'Outliers handled with Isolation Forest | '
                self.logger.debug(f"Removed {before_iso_shape - after_iso_shape} outliers from '{col}' using Isolation Forest")

            elif self.model_type == 'Tree-Based Models':
                # Tree-Based Models are robust to outliers; optional handling
                self.logger.debug(f"No outlier handling for '{col}' as Tree-Based Models are robust to outliers.")

            elif self.model_type == 'Time Series Models':
                # Rolling Statistics (Smoothing)
                X[col] = X[col].rolling(window=3, min_periods=1).mean()
                self.feature_reasons[col] += 'Applied Rolling Statistics (Smoothing) | '
                self.logger.debug(f"Applied Rolling Statistics to '{col}'")

                # Winsorization
                Q1 = X[col].quantile(0.25)
                Q3 = X[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                X[col] = X[col].clip(lower_bound, upper_bound)
                self.feature_reasons[col] += 'Outliers handled with Winsorization | '
                self.logger.debug(f"Winsorized '{col}' to bounds ({lower_bound}, {upper_bound})")

            # Update key feature statistics after outlier handling
            if self.debug:
                self.logger.debug(f"Post Outlier Handling '{col}':\n{X[col].describe()}")

        self.preprocessing_steps.append(step_name)
        self.logger.info(f"Completed: {step_name}. Dataset shape: {X.shape}")
        if self.debug:
            self.logger.debug(f"Columns after {step_name}: {X.columns.tolist()}")

        return X

    def choose_transformation(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Choose and apply transformations based on skewness.

        Args:
            X (pd.DataFrame): Input features.

        Returns:
            pd.DataFrame: Transformed features.
        """
        step_name = "Choosing and Applying Transformations"
        self.logger.info(f"Step: {step_name}")
        # Apply PowerTransformer to all numerical features together
        if self.numericals:
            skewed_features = [col for col in self.numericals if abs(X[col].skew()) > 0.75]
            if skewed_features:
                self.transformer = PowerTransformer(method='yeo-johnson')  # Yeo-Johnson handles zero and negative values
                X[self.numericals] = self.transformer.fit_transform(X[self.numericals])
                for col in self.numericals:
                    self.feature_reasons[col] += 'Applied PowerTransformer (Yeo-Johnson) | '
                self.preprocessing_steps.append(step_name)
                self.logger.info(f"Applied PowerTransformer to {len(self.numericals)} numerical features.")
                if self.debug:
                    self.logger.debug(f"DataFrame shape after {step_name}: {X.shape}")
                    self.logger.debug(f"Columns after {step_name}: {X.columns.tolist()}")
            else:
                self.logger.info("No significant skewness detected. No transformations applied.")
        else:
            self.logger.info("No numerical features to transform.")

        return X

    def encode_categorical(self, X_train: pd.DataFrame, X_test: Optional[pd.DataFrame]) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
        """
        Encode categorical variables using OrdinalEncoder for both ordinal and nominal features.

        Args:
            X_train (pd.DataFrame): Training features.
            X_test (pd.DataFrame): Testing features.

        Returns:
            tuple: Encoded X_train and X_test.
        """
        step_name = "Encoding Categorical Variables"
        self.logger.info(f"Step: {step_name}")

        # Define transformers for ordinal and nominal categorical features
        transformers = []
        if self.ordinal_categoricals:
            transformers.append(
                ('ordinal', OrdinalEncoder(), self.ordinal_categoricals)
            )
        if self.nominal_categoricals:
            # Use separate OrdinalEncoder for nominal features
            nominal_transformer = Pipeline(steps=[
                ('ordinal_encoder', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1))
            ])
            transformers.append(
                ('nominal', nominal_transformer, self.nominal_categoricals)
            )

        if not transformers:
            self.logger.info("No categorical variables to encode.")
            return X_train, X_test

        # Create ColumnTransformer for encoding
        self.preprocessor = ColumnTransformer(transformers=transformers, remainder='passthrough')

        # Fit and transform training data
        X_train_encoded = self.preprocessor.fit_transform(X_train)
        self.logger.debug("Fitted and transformed X_train with ColumnTransformer.")

        # Transform testing data
        if X_test is not None:
            X_test_encoded = self.preprocessor.transform(X_test)
            self.logger.debug("Transformed X_test with fitted ColumnTransformer.")
        else:
            X_test_encoded = None

        # Retrieve feature names after encoding
        encoded_feature_names = []
        if self.ordinal_categoricals:
            encoded_feature_names += self.ordinal_categoricals
        if self.nominal_categoricals:
            # Generate encoded names for nominal features
            self.nominal_categorical_encoded = [f"{col}_encoded" for col in self.nominal_categoricals]
            encoded_feature_names += self.nominal_categorical_encoded

        # Include passthrough (numericals)
        passthrough_features = [col for col in X_train.columns if col not in self.ordinal_categoricals + self.nominal_categoricals]
        encoded_feature_names += passthrough_features

        # Convert numpy arrays back to DataFrames
        X_train_encoded_df = pd.DataFrame(X_train_encoded, columns=encoded_feature_names, index=X_train.index)
        if X_test_encoded is not None:
            X_test_encoded_df = pd.DataFrame(X_test_encoded, columns=encoded_feature_names, index=X_test.index)
        else:
            X_test_encoded_df = None

        # Store encoders for inverse transformation
        self.ordinal_encoder = self.preprocessor.named_transformers_['ordinal'] if 'ordinal' in self.preprocessor.named_transformers_ else None
        self.nominal_encoder = self.preprocessor.named_transformers_['nominal'].named_steps['ordinal_encoder'] if 'nominal' in self.preprocessor.named_transformers_ else None

        # Since we're using OrdinalEncoder, maintain a list of nominal categorical encoded indices
        self.nominal_categorical_indices = [X_train_encoded_df.columns.get_loc(col) for col in self.nominal_categorical_encoded]

        self.preprocessing_steps.append(step_name)
        self.logger.info(f"Completed: Encoding Categorical Variables. X_train_encoded shape: {X_train_encoded_df.shape}")

        if self.debug:
            self.logger.debug(f"DataFrame shape after Encoding Categorical Variables: {X_train_encoded_df.shape}")
            self.logger.debug(f"Columns after Encoding Categorical Variables: {X_train_encoded_df.columns.tolist()}")
            for col in self.ordinal_categoricals:
                self.logger.debug(f"Encoded '{col}' - Sample Values: {X_train_encoded_df[col].dropna().unique()[:5]}")
            for col in self.nominal_categoricals:
                self.logger.debug(f"Encoded '{col}_encoded' - Encoded Values: {X_train_encoded_df[f'{col}_encoded'].dropna().unique()[:5]}")
            # Print encoder categories
            if self.ordinal_encoder:
                self.logger.debug(f"Ordinal Encoder Categories: {self.ordinal_encoder.categories_}")
            if self.nominal_encoder:
                self.logger.debug(f"Nominal Encoder Categories: {self.nominal_encoder.categories_}")

        return X_train_encoded_df, X_test_encoded_df

    def apply_scaling(self, X_train: pd.DataFrame, X_test: Optional[pd.DataFrame]) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
        """
        Apply scaling based on the model type.

        Args:
            X_train (pd.DataFrame): Training features.
            X_test (pd.DataFrame): Testing features.

        Returns:
            tuple: Scaled X_train and X_test
        """
        step_name = "Applying Scaling"
        self.logger.info(f"Step: {step_name}")

        scaler = None
        scaling_type = 'None'

        if self.model_type in ['Logistic Regression', 'Neural Networks']:
            scaler = StandardScaler()
            scaling_type = 'StandardScaler'
        elif self.model_type in ['SVM', 'k-NN', 'Clustering']:
            scaler = MinMaxScaler()
            scaling_type = 'MinMaxScaler'

        if scaler:
            self.scaler = scaler
            X_train[self.numericals] = scaler.fit_transform(X_train[self.numericals])
            if X_test is not None:
                X_test[self.numericals] = scaler.transform(X_test[self.numericals])
            for col in self.numericals:
                self.feature_reasons[col] += f'Scaling Applied: {scaling_type} | '
            self.preprocessing_steps.append(step_name)
            self.logger.info(f"Applied {scaling_type} to numerical features.")

            if self.debug:
                self.logger.debug(f"DataFrame shape after {step_name}: {X_train.shape}")
                self.logger.debug(f"Columns after {step_name}: {X_train.columns.tolist()}")
                self.logger.debug(f"Scaler Parameters: mean={scaler.mean_}, scale={scaler.scale_}")
                for col in self.numericals:
                    self.logger.debug(f"Scaled '{col}' - Data Type: {X_train[col].dtype}, Sample Values: {X_train[col].head().values}")
        else:
            self.logger.info("No Scaling Applied as per Model Type.")

        return X_train, X_test
    
    def choose_transformation(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Choose and apply transformations based on skewness.

        Args:
            X (pd.DataFrame): Input features.

        Returns:
            pd.DataFrame: Transformed features.
        """
        step_name = "Choosing and Applying Transformations"
        self.logger.info(f"Step: {step_name}")
        # Apply PowerTransformer to all numerical features together
        if self.numericals:
            skewed_features = [col for col in self.numericals if abs(X[col].skew()) > 0.75]
            if skewed_features:
                self.transformer = PowerTransformer(method='yeo-johnson')  # Yeo-Johnson handles zero and negative values
                X[self.numericals] = self.transformer.fit_transform(X[self.numericals])
                for col in self.numericals:
                    self.feature_reasons[col] += 'Applied PowerTransformer (Yeo-Johnson) | '
                self.preprocessing_steps.append(step_name)
                self.logger.info(f"Applied PowerTransformer to {len(self.numericals)} numerical features.")
                if self.debug:
                    self.logger.debug(f"DataFrame shape after {step_name}: {X.shape}")
                    self.logger.debug(f"Columns after {step_name}: {X.columns.tolist()}")
            else:
                self.logger.info("No significant skewness detected. No transformations applied.")
        else:
            self.logger.info("No numerical features to transform.")

        return X

    def split_dataset(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, Optional[pd.DataFrame], pd.Series, Optional[pd.Series]]:
        """
        Split the dataset into training and testing sets.

        Args:
            X (pd.DataFrame): Features.
            y (pd.Series): Target variable.

        Returns:
            tuple: X_train, X_test, y_train, y_test
        """
        step_name = "Splitting Dataset into Train and Test Sets"
        self.logger.info(f"Step: {step_name}")
        if self.perform_split:
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.2, stratify=y, random_state=42
            )
            self.preprocessing_steps.append(step_name)
            self.logger.info(f"Completed: {step_name}. X_train shape: {X_train.shape}, X_test shape: {X_test.shape}")
            if self.debug:
                self.logger.debug(f"Columns in X_train: {X_train.columns.tolist()}")
                self.logger.debug(f"Sample of y_train distribution:\n{y_train.value_counts()}")
            return X_train, X_test, y_train, y_test
        else:
            self.logger.info("Train-Test Split Skipped as perform_split=False")
            return X, None, y, None
        

    def preprocessor_recommendations(self) -> pd.DataFrame:
        """
        Generate a table of preprocessing recommendations.

        Returns:
            pd.DataFrame: Recommendations table.
        """
        step_name = "Generating Preprocessor Recommendations"
        self.logger.info(f"Step: {step_name}")
        recommendations_table = pd.DataFrame.from_dict(
            self.feature_reasons, 
            orient='index', 
            columns=['Preprocessing Reason']
        )
        self.logger.debug(f"Preprocessing Recommendations:\n{recommendations_table}")
        self.logger.info(f"Completed: {step_name}")
        self.preprocessing_steps.append(step_name)
        return recommendations_table

    def final_preprocessing(
        self, 
        X: pd.DataFrame, 
        y: pd.Series
    ) -> Tuple[pd.DataFrame, Optional[pd.DataFrame], pd.Series, Optional[pd.Series]]:
        """
        Execute the full preprocessing pipeline in the correct order with detailed steps.

        Args:
            X (pd.DataFrame): Features.
            y (pd.Series): Target variable.

        Returns:
            tuple: X_train, X_test, y_train, y_test
        """
        step_name = "Final Preprocessing Pipeline"
        self.logger.info(f"Starting: {step_name}")

        # Step 1: Handle Missing Values
        X = self.handle_missing_values(X)

        # Step 2: Test for Normality
        normality_results = self.test_normality(X)

        # Step 3: Handle Outliers
        X = self.handle_outliers(X)

        # Step 4: Apply Transformations Based on Normality Tests
        X = self.apply_transformations(X, normality_results)

        # Step 5: Split Dataset
        X_train, X_test, y_train, y_test = self.split_dataset(X, y)

        # Step 6: Encode Categorical Variables
        if self.perform_split:
            X_train, X_test = self.encode_categorical(X_train, X_test)

            # Step 7: Apply Scaling on Training and Testing Data
            X_train, X_test = self.apply_scaling(X_train, X_test)

            # Step 8: Implement SMOTENC on Training Data Only
            if y_train.value_counts().min() < y_train.value_counts().max():
                X_train, y_train = self.implement_smote(X_train, y_train)

        self.preprocessing_steps.append(step_name)
        self.logger.info(f"Completed: {step_name}")
        return X_train, X_test, y_train, y_test

    def final_inversetransform(self, X_preprocessed: pd.DataFrame, X_original: pd.DataFrame) -> pd.DataFrame:
        """
        Perform inverse transformations to revert preprocessed data back to its original form.
        """
        step_name = "Inverse Transformation of Preprocessed Data"
        self.logger.info(f"Starting: {step_name}")

        # Debug: Print columns in X_original
        self.logger.debug(f"Columns in X_original during inverse_transform: {X_original.columns.tolist()}")

        # Initialize DataFrame for inverse transformed data
        X_inverse = pd.DataFrame(index=X_preprocessed.index)

        # Inverse Scaling
        if self.scaler:
            try:
                X_inverse[self.numericals] = self.scaler.inverse_transform(X_preprocessed[self.numericals])
                self.logger.debug("Inverse Scaling Completed")
                for col in self.numericals:
                    self.feature_reasons[col] += 'Inverse Scaling Applied | '
            except Exception as e:
                self.logger.error(f"Error during inverse Scaling: {e}")
                raise
        else:
            X_inverse[self.numericals] = X_preprocessed[self.numericals]

        # Inverse Transformation (PowerTransformer) per column
        if self.transformers:
            for col in self.numericals:
                transformer = self.transformers.get(col)
                if transformer:
                    try:
                        X_inverse[col] = transformer.inverse_transform(X_inverse[[col]])
                        self.feature_reasons[col] += 'Inverse Transformation Applied | '
                        self.logger.debug(f"Inverse Transformation Completed for '{col}'")
                    except Exception as e:
                        self.logger.error(f"Error during inverse Transformation for '{col}': {e}")
                        raise
                else:
                    self.logger.warning(f"No transformer found for column '{col}'. Skipping inverse transformation.")
        else:
            self.logger.info("No transformations to inverse.")

        # Inverse Encoding for Ordinal Categorical Features
        if self.ordinal_categoricals and self.ordinal_encoder:
            try:
                X_inverse[self.ordinal_categoricals] = self.ordinal_encoder.inverse_transform(X_preprocessed[self.ordinal_categoricals])
                self.logger.debug("Inverse Ordinal Encoding for Ordinal Categorical Features Completed")
                for col in self.ordinal_categoricals:
                    self.feature_reasons[col] += 'Inverse Ordinal Encoding Applied | '
            except Exception as e:
                self.logger.error(f"Error during inverse Ordinal Encoding for Ordinal Categorical Features: {e}")
                raise

        # Inverse Encoding for Nominal Categorical Features
        if self.nominal_categoricals and self.nominal_encoder:
            try:
                # Inverse transform all nominal categorical features together
                X_inverse[self.nominal_categoricals] = self.nominal_encoder.inverse_transform(X_preprocessed[self.nominal_categorical_encoded])
                self.logger.debug("Inverse Ordinal Encoding for Nominal Categorical Features Completed")
                for col in self.nominal_categoricals:
                    self.feature_reasons[col] += 'Inverse Ordinal Encoding Applied | '
            except Exception as e:
                self.logger.error(f"Error during inverse Ordinal Encoding for Nominal Categorical Features: {e}")
                raise

        # Combine all features
        try:
            # Identify columns that were transformed
            transformed_cols = self.numericals + self.ordinal_categoricals + self.nominal_categoricals

            # Include passthrough (non-transformed) features
            # Since all features are transformed, no passthrough columns should exist
            non_transformed_cols = [col for col in X_original.columns if col not in transformed_cols]

            if non_transformed_cols:
                X_inverse = pd.concat([X_inverse, X_preprocessed[non_transformed_cols]], axis=1)

            # Reorder columns to match the original DataFrame
            X_final_inverse = X_inverse[X_original.columns]
        except Exception as e:
            self.logger.error(f"Error during combining inverse transformed data: {e}")
            raise

        self.preprocessing_steps.append(step_name)
        self.logger.info(f"Completed: {step_name}")

        return X_final_inverse



# main_preprocessing.py

import pandas as pd
import numpy as np
import pickle
from typing import List, Dict, Optional, Tuple
import logging
import os


def validate_inverse(original_df: pd.DataFrame, inverse_df: pd.DataFrame, numericals: list, categorical_features: list, tolerance: float = 1e-4):
    """
    Validate the inverse transformation by comparing original and inverse-transformed data.

    Args:
        original_df (pd.DataFrame): Original DataFrame before preprocessing.
        inverse_df (pd.DataFrame): Inverse-transformed DataFrame.
        numericals (list): List of numerical feature names.
        categorical_features (list): List of categorical feature names.
        tolerance (float): Tolerance for numerical differences.
    """
    logger = logging.getLogger('Validation')
    differences = {}

    for col in categorical_features:
        diff = original_df[col].astype(str) != inverse_df[col].astype(str)
        differences[col] = {
            'total_differences': diff.sum(),
            'percentage_differences': (diff.sum() / len(diff)) * 100
        }

    for col in numericals:
        diff = np.abs(original_df[col] - inverse_df[col]) > tolerance
        differences[col] = {
            'total_differences': diff.sum(),
            'percentage_differences': (diff.sum() / len(diff)) * 100
        }

    # Display the differences
    for col, stats in differences.items():
        print(f"Column: {col}")
        print(f" - Total Differences: {stats['total_differences']}")
        print(f" - Percentage Differences: {stats['percentage_differences']:.2f}%\n")

    # Detailed differences
    for col in differences:
        if differences[col]['total_differences'] > 0:
            print(f"Differences found in column '{col}':")
            mask = (original_df[col].astype(str) != inverse_df[col].astype(str)) if col in categorical_features else (np.abs(original_df[col] - inverse_df[col]) > tolerance)
            comparison = pd.concat([
                original_df.loc[mask, col].reset_index(drop=True).rename('Original'),
                inverse_df.loc[mask, col].reset_index(drop=True).rename('Inverse Transformed')
            ], axis=1)
            print(comparison)
            print("\n")

    # Check if indices are aligned
    if not original_df.index.equals(inverse_df.index):
        print("Warning: Indices of original and inverse transformed data do not match.")
    else:
        print("Success: Indices of original and inverse transformed data are aligned.")


def main():
    # Define the path to the single pickle file containing all metadata
    save_path = '../../data/model/pipeline/features_metadata.pkl'  # Adjust as needed
    dataset_csv_path = '../../ml-preprocessing-utils/data/dataset/test/test_ml_dataset.csv'  # Ensure this path exists

    # Define a debug flag based on user preference
    debug_flag = True  # Set to False for minimal outputs

    # Configure root logger
    logging.basicConfig(
        level=logging.DEBUG if debug_flag else logging.INFO, 
        format='%(asctime)s [%(levelname)s] %(message)s'
    )
    logger = logging.getLogger('main_preprocessing')


    # **Loading Process:**
    # Load features and metadata using manage_features
    loaded = manage_features(
        mode='load',
        save_path=save_path
    )

    # Access loaded data
    if loaded:
        features = loaded.get('features')
        ordinals = loaded.get('ordinal_categoricals')
        nominals = loaded.get('nominal_categoricals')
        nums = loaded.get('numericals')
        y_var = loaded.get('y_variable')
        loaded_dataset_path = loaded.get('dataset_csv_path')  # Correct key

        print("\n📥 Loaded Data:")
        print("Features:", features)
        print("Ordinal Categoricals:", ordinals)
        print("Nominal Categoricals:", nominals)
        print("Numericals:", nums)
        print("Y Variable:", y_var)
        print("Dataset Path:", loaded_dataset_path)

    else:
        logger.error("Failed to load features and metadata.")
        return  # Exit the main function if loading fails

    # Load the selected features data using the loaded dataset path and metadata
    try:
        final_ml_df_selected_features, column_assets = load_selected_features_data(
            loaded_data=loaded,  # Pass the entire loaded data dictionary
            debug=debug_flag
        )
    except Exception as e:
        logger.error(f"Failed to load selected features data: {e}")
        return  # Exit if data loading fails

    # Initialize the DataPreprocessor
    preprocessor = DataPreprocessor(
        model_type='Logistic Regression',
        column_assets=column_assets,
        perform_split=True,
        debug=debug_flag
    )

    # Generate and display preprocessing recommendations
    recommendations = preprocessor.preprocessor_recommendations()
    print("\nPreprocessing Recommendations:")
    print(recommendations)

    # Execute the final preprocessing
    X = final_ml_df_selected_features.drop(y_var, axis=1)
    y = final_ml_df_selected_features[y_var]
    X_train, X_test, y_train, y_test = preprocessor.final_preprocessing(X, y)

    # Display the shapes of the preprocessed datasets
    print("\nPreprocessed Datasets Shapes:")
    print(f"X_train: {X_train.shape}")
    print(f"X_test: {X_test.shape}")
    print(f"y_train: {y_train.shape}")
    print(f"y_test: {y_test.shape}")

    # Display key features after preprocessing
    if debug_flag:
        print("\nPerforming Inverse Transformation on Test Set Samples...")
        try:
            # Inverse Transform Only Test Set
            inverse_transformed_test = preprocessor.final_inversetransform(X_test, X.loc[X_test.index])

            print("\nInverse Transformed X_test:")
            print(inverse_transformed_test.head())

            # Compare inverse transformed test data with original test data
            original_test_data = X.loc[X_test.index].copy()
            inverse_transformed_subset = inverse_transformed_test.copy()

            # Perform per-column comparison
            differences = {}
            tolerance = 1e-4  # Define tolerance for numerical differences

            validate_inverse(
                original_df=original_test_data,
                inverse_df=inverse_transformed_subset,
                numericals=preprocessor.numericals,
                categorical_features=preprocessor.ordinal_categoricals + preprocessor.nominal_categoricals,
                tolerance=tolerance
            )

        except AttributeError as ae:
            print(f"\nInverse Transformation Failed: {ae}")
        except Exception as e:
            print(f"\nAn unexpected error occurred during inverse transformation: {e}")

if __name__ == "__main__":
    main()

