### Libraries needed for data preprocessing 

In [23]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns

### Function to read the csv uploaded. 

In [22]:
def get_dataset():
    import pandas as pd
    try:
        dataset = input("Enter the name of the dataset: ")
        df = pd.read_csv(dataset)
        return df
    except FileNotFoundError:
        print("File not found.")
    except Exception as e:
        print(f"An error occurred: {e}")
    return None


### Function to check the sanity of the dataset

In [24]:
def sanity_check(df):
    if df is None:
        print("Data doesn't exist.")
        return

    try:
        print("Shape of the dataset:", df.shape)
        print("Dataset Info:")
        df.info()
        print("Missing values percentage per column:")
        print((df.isnull().sum() / df.shape[0] * 100).round(2))

        print("Duplicate rows count:", df.duplicated().sum())
    except Exception as e:
        print(f"Error during sanity check: {e}")


### Checking for catergorical distributions

In [None]:
def check_categorical_distributions(df):
    obj_cols = df.select_dtypes(include="object").columns
    if len(obj_cols) == 0:
        print("No categorical (object) columns found.")
        return

    for col in obj_cols:
        unique_vals = df[col].nunique()
        print(f"\n'{col}' ‚Äî {unique_vals} unique value(s)")

        if unique_vals <= 10:
            print(df[col].value_counts(dropna=False))
        else:
            print("Too many unique values to display.")


### Exploratory Data Analysis: 

In [28]:
def eda(df):
    if df is None: 
        print("Data doesn't exist.")
        return 
    try: 
        print("Description of the dataframe: ")
        print(df.describe())
        print("Decription of the categorical data present: ")
        print(df.describe(include = "object"))
    except Exception as e:
        print(f"Exception {e} occured.")


In [29]:
from sklearn.impute import SimpleImputer

def choose_imputer(df):
    num_cols = df.select_dtypes(include="number").columns
    cat_cols = df.select_dtypes(include=["object", "category", "bool"]).columns

    if len(cat_cols) > len(num_cols):
        return SimpleImputer(strategy="most_frequent")
    else:
        return SimpleImputer(strategy="mean")


In [None]:
imputer = choose_imputer(df)
df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)


In [None]:
def detect_outlier_columns(df):
    outlier_cols = []

    for col in df.select_dtypes(include="number").columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]

        # If more than, say, 1% of values are outliers, consider treating
        if len(outliers) / df.shape[0] > 0.01:
            outlier_cols.append(col)

    return outlier_cols


In [None]:
columns_with_outliers = detect_outlier_columns(df)

if columns_with_outliers:
    print(f"‚ö†Ô∏è Consider treating outliers in: {columns_with_outliers}")
else:
    print("‚úÖ No significant outliers found.")


In [None]:
from sklearn.preprocessing import RobustScaler

scaler = RobustScaler()
df_scaled = scaler.fit_transform(df[numerical_cols])


In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler

def choose_scaler(df):
    # Detect outliers using IQR rule
    def has_outliers(series):
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        lower = Q1 - 1.5 * IQR
        upper = Q3 + 1.5 * IQR
        return ((series < lower) | (series > upper)).any()
    
    numerical_cols = df.select_dtypes(include="number").columns
    
    # If many columns have outliers, use RobustScaler
    outlier_cols = [col for col in numerical_cols if has_outliers(df[col])]
    if len(outlier_cols) > len(numerical_cols) / 2:
        return RobustScaler()
    
    # If data is sparse or has negatives but no centering desired (example condition)
    # (You can customize this condition for your use case)
    if (df[numerical_cols] < 0).any().any():
        return MaxAbsScaler()
    
    # Else, if values are mostly positive and no big outliers, MinMaxScaler or StandardScaler
    # You can decide between these based on skewness or domain knowledge
    skewed_cols = [col for col in numerical_cols if abs(df[col].skew()) > 1]
    if len(skewed_cols) > len(numerical_cols) / 2:
        return MinMaxScaler()
    
    return StandardScaler()


In [None]:
import pandas as pd
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder, MultiLabelBinarizer

def auto_encode(df, target_cols=None, multilabel_cols=None, ordinal_mappings=None, max_onehot=15):
    """
    Auto encodes columns in df based on their type and provided info.
    
    Params:
    - df: input DataFrame
    - target_cols: list of columns to label encode (usually target variables)
    - multilabel_cols: list of columns containing multi-label data (iterables per cell)
    - ordinal_mappings: dict {col_name: ordered list of categories}
    - max_onehot: max unique categories to use OneHot encoding
    
    Returns:
    - df_encoded: transformed DataFrame
    - encoders: dict {col_name: fitted encoder object}
    """
    target_cols = target_cols or []
    multilabel_cols = multilabel_cols or []
    ordinal_mappings = ordinal_mappings or {}
    
    df_encoded = df.copy()
    encoders = {}
    
    for col in df.columns:
        if col in target_cols:
            le = LabelEncoder()
            df_encoded[col] = le.fit_transform(df[col])
            encoders[col] = le
            continue
        
        if col in multilabel_cols:
            mlb = MultiLabelBinarizer()
            encoded = mlb.fit_transform(df[col])
            mlb_df = pd.DataFrame(encoded, columns=[f"{col}_{cls}" for cls in mlb.classes_], index=df.index)
            df_encoded = pd.concat([df_encoded.drop(columns=[col]), mlb_df], axis=1)
            encoders[col] = mlb
            continue
        
        dtype = df[col].dtype
        n_unique = df[col].nunique()
        
        if col in ordinal_mappings:
            categories = [ordinal_mappings[col]]
            oe = OrdinalEncoder(categories=categories)
            df_encoded[col] = oe.fit_transform(df[[col]])
            encoders[col] = oe
            continue
        
        if dtype.name in ['object', 'category', 'bool']:
            if n_unique <= max_onehot:
                ohe = OneHotEncoder(handle_unknown='ignore', sparse=False)
                encoded = ohe.fit_transform(df[[col]])
                ohe_df = pd.DataFrame(encoded, columns=[f"{col}_{cat}" for cat in ohe.categories_[0]], index=df.index)
                df_encoded = pd.concat([df_encoded.drop(columns=[col]), ohe_df], axis=1)
                encoders[col] = ohe
            else:
                # fallback: ordinal encoding for high-cardinality nominal
                oe = OrdinalEncoder()
                df_encoded[col] = oe.fit_transform(df[[col]])
                encoders[col] = oe
        
        else:
            # leave numeric columns as-is or add numeric encoding later
            pass
    
    return df_encoded, encoders


def transform_new(df_new, encoders):
    """
    Use fitted encoders to transform new dataframe
    
    Params:
    - df_new: new DataFrame
    - encoders: dict of fitted encoders from auto_encode()
    
    Returns:
    - transformed DataFrame
    """
    df_transformed = df_new.copy()
    
    for col, encoder in encoders.items():
        if isinstance(encoder, LabelEncoder):
            df_transformed[col] = encoder.transform(df_new[col])
        elif isinstance(encoder, MultiLabelBinarizer):
            encoded = encoder.transform(df_new[col])
            mlb_df = pd.DataFrame(encoded, columns=[f"{col}_{cls}" for cls in encoder.classes_], index=df_new.index)
            df_transformed = pd.concat([df_transformed.drop(columns=[col]), mlb_df], axis=1)
        elif isinstance(encoder, OneHotEncoder):
            encoded = encoder.transform(df_new[[col]])
            ohe_df = pd.DataFrame(encoded, columns=[f"{col}_{cat}" for cat in encoder.categories_[0]], index=df_new.index)
            df_transformed = pd.concat([df_transformed.drop(columns=[col]), ohe_df], axis=1)
        elif isinstance(encoder, OrdinalEncoder):
            df_transformed[col] = encoder.transform(df_new[[col]])
        else:
            pass
    
    return df_transformed


In [30]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder

class DataPreprocessor:
    def __init__(self):
        self.imputer = None
        self.scaler = None
        self.encoders = {}
        self.numerical_cols = []
        self.categorical_cols = []
        
    def get_dataset(self):
        """Load dataset from CSV file."""
        try:
            dataset = input("Enter the name of the dataset: ")
            df = pd.read_csv(dataset)
            print(f"‚úÖ Successfully loaded dataset: {dataset}")
            return df
        except FileNotFoundError:
            print("‚ùå File not found. Please check the filename.")
        except Exception as e:
            print(f"‚ùå An error occurred: {e}")
        return None

    def sanity_check(self, df):
        """Perform comprehensive data quality checks."""
        if df is None:
            print("‚ùå Data doesn't exist.")
            return False
            
        try:
            print("\n" + "="*60)
            print("üìä DATA SANITY CHECK REPORT")
            print("="*60)
            
            # Basic info
            print(f"Shape of the dataset: {df.shape}")
            print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
            
            print("\nüìã Dataset Info:")
            df.info()
            
            # Missing values
            print("\nüîç Missing Values Analysis:")
            missing_percent = (df.isnull().sum() / df.shape[0] * 100).round(2)
            missing_data = pd.DataFrame({
                'Column': missing_percent.index,
                'Missing Count': df.isnull().sum(),
                'Missing %': missing_percent
            })
            missing_data = missing_data[missing_data['Missing Count'] > 0]
            
            if len(missing_data) > 0:
                print(missing_data.to_string(index=False))
            else:
                print("‚úÖ No missing values found!")
            
            # Duplicates
            duplicate_count = df.duplicated().sum()
            print(f"\nüîÑ Duplicate rows: {duplicate_count}")
            if duplicate_count > 0:
                print(f"   ({duplicate_count/len(df)*100:.2f}% of total data)")
            
            return True
            
        except Exception as e:
            print(f"‚ùå Error during sanity check: {e}")
            return False

    def check_categorical_distributions(self, df):
        """Analyze categorical column distributions."""
        if df is None:
            return
            
        obj_cols = df.select_dtypes(include=["object", "category"]).columns
        if len(obj_cols) == 0:
            print("‚ÑπÔ∏è No categorical columns found.")
            return
            
        print("\n" + "="*60)
        print("üìà CATEGORICAL DISTRIBUTIONS")
        print("="*60)
        
        for col in obj_cols:
            unique_vals = df[col].nunique()
            print(f"\n'{col}' ‚Äî {unique_vals} unique value(s)")
            
            if unique_vals <= 15:  # Show distributions for low cardinality
                print(df[col].value_counts(dropna=False).head(10))
            else:
                print("‚ö†Ô∏è High cardinality - showing top 5 values:")
                print(df[col].value_counts().head(5))
            print("-" * 40)

    def eda_summary(self, df):
        """Generate exploratory data analysis summary."""
        if df is None: 
            print("‚ùå Data doesn't exist.")
            return 
            
        try:
            print("\n" + "="*60)
            print("üìä EXPLORATORY DATA ANALYSIS")
            print("="*60)
            
            # Numerical summary
            numerical_cols = df.select_dtypes(include=[np.number]).columns
            if len(numerical_cols) > 0:
                print("\nüî¢ Numerical Columns Summary:")
                print(df[numerical_cols].describe().round(2))
            
            # Categorical summary
            categorical_cols = df.select_dtypes(include=["object", "category"]).columns
            if len(categorical_cols) > 0:
                print("\nüìù Categorical Columns Summary:")
                print(df[categorical_cols].describe())
                
        except Exception as e:
            print(f"‚ùå Exception occurred: {e}")

    def handle_missing_values(self, df):
        """Handle missing values using appropriate strategies."""
        if df.isnull().sum().sum() == 0:
            print("‚úÖ No missing values to handle!")
            return df
            
        print("\nüîß Handling Missing Values...")
        
        # Separate numerical and categorical columns
        self.numerical_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
        
        df_processed = df.copy()
        
        # Handle numerical columns
        if self.numerical_cols:
            num_imputer = SimpleImputer(strategy='median')  # More robust than mean
            df_processed[self.numerical_cols] = num_imputer.fit_transform(df[self.numerical_cols])
            print(f"   ‚úÖ Imputed {len(self.numerical_cols)} numerical columns with median")
        
        # Handle categorical columns
        if self.categorical_cols:
            cat_imputer = SimpleImputer(strategy='most_frequent')
            df_processed[self.categorical_cols] = cat_imputer.fit_transform(df[self.categorical_cols])
            print(f"   ‚úÖ Imputed {len(self.categorical_cols)} categorical columns with mode")
        
        return df_processed

    def detect_and_report_outliers(self, df):
        """Detect outliers using IQR method."""
        outlier_summary = []
        
        for col in df.select_dtypes(include=[np.number]).columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
            outlier_percentage = len(outliers) / len(df) * 100
            
            if outlier_percentage > 1:  # More than 1% outliers
                outlier_summary.append({
                    'Column': col,
                    'Outlier Count': len(outliers),
                    'Outlier %': round(outlier_percentage, 2)
                })
        
        if outlier_summary:
            print("\n‚ö†Ô∏è Outlier Detection Results:")
            outlier_df = pd.DataFrame(outlier_summary)
            print(outlier_df.to_string(index=False))
            return [item['Column'] for item in outlier_summary]
        else:
            print("‚úÖ No significant outliers found.")
            return []

    def choose_and_apply_scaler(self, df):
        """Choose appropriate scaler based on data characteristics."""
        numerical_cols = df.select_dtypes(include=[np.number]).columns
        
        if len(numerical_cols) == 0:
            print("‚ÑπÔ∏è No numerical columns to scale.")
            return df
        
        # Analyze data characteristics
        outlier_cols = []
        sparse_cols = []
        skewed_cols = []
        
        for col in numerical_cols:
            # Check for outliers
            Q1, Q3 = df[col].quantile([0.25, 0.75])
            IQR = Q3 - Q1
            outliers = ((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))).sum()
            if outliers / len(df) > 0.1:  # >10% outliers
                outlier_cols.append(col)
            
            # Check for sparsity (zeros)
            if (df[col] == 0).sum() / len(df) > 0.5:  # >50% zeros
                sparse_cols.append(col)
            
            # Check for skewness
            if abs(df[col].skew()) > 2:
                skewed_cols.append(col)
        
        # Choose scaler based on analysis
        if len(sparse_cols) > len(numerical_cols) / 2:
            self.scaler = MaxAbsScaler()
            scaler_name = "MaxAbsScaler (sparse data detected)"
        elif len(outlier_cols) > len(numerical_cols) / 2:
            self.scaler = RobustScaler()
            scaler_name = "RobustScaler (outliers detected)"
        elif len(skewed_cols) > len(numerical_cols) / 2:
            self.scaler = MinMaxScaler()
            scaler_name = "MinMaxScaler (skewed data detected)"
        else:
            self.scaler = StandardScaler()
            scaler_name = "StandardScaler (normal distribution assumed)"
        
        print(f"\nüéØ Chosen scaler: {scaler_name}")
        
        # Apply scaling
        df_scaled = df.copy()
        df_scaled[numerical_cols] = self.scaler.fit_transform(df[numerical_cols])
        print(f"   ‚úÖ Scaled {len(numerical_cols)} numerical columns")
        
        return df_scaled

    def auto_encode_categorical(self, df, target_col=None, max_categories=10):
        """Automatically encode categorical variables."""
        categorical_cols = df.select_dtypes(include=["object", "category"]).columns
        
        if target_col and target_col in categorical_cols:
            categorical_cols = categorical_cols.drop(target_col)
        
        if len(categorical_cols) == 0:
            print("‚ÑπÔ∏è No categorical columns to encode.")
            return df
        
        print(f"\nüî§ Encoding {len(categorical_cols)} categorical columns...")
        df_encoded = df.copy()
        
        for col in categorical_cols:
            unique_count = df[col].nunique()
            
            if unique_count <= max_categories:
                # Use One-Hot Encoding for low cardinality
                encoder = OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore')
                encoded_data = encoder.fit_transform(df[[col]])
                
                # Create column names
                feature_names = [f"{col}_{cat}" for cat in encoder.categories_[0][1:]]  # Skip first due to drop='first'
                encoded_df = pd.DataFrame(encoded_data, columns=feature_names, index=df.index)
                
                # Add to main dataframe
                df_encoded = pd.concat([df_encoded.drop(columns=[col]), encoded_df], axis=1)
                self.encoders[col] = encoder
                print(f"   ‚úÖ One-hot encoded '{col}' ({unique_count} categories)")
                
            else:
                # Use Ordinal Encoding for high cardinality
                encoder = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
                df_encoded[col] = encoder.fit_transform(df[[col]]).flatten()
                self.encoders[col] = encoder
                print(f"   ‚úÖ Ordinal encoded '{col}' ({unique_count} categories)")
        
        # Handle target column separately if specified
        if target_col and target_col in df.columns:
            if df[target_col].dtype == 'object':
                le = LabelEncoder()
                df_encoded[target_col] = le.fit_transform(df[target_col])
                self.encoders[target_col] = le
                print(f"   ‚úÖ Label encoded target column '{target_col}'")
        
        return df_encoded

    def remove_duplicates(self, df):
        """Remove duplicate rows."""
        initial_shape = df.shape[0]
        df_cleaned = df.drop_duplicates()
        removed_count = initial_shape - df_cleaned.shape[0]
        
        if removed_count > 0:
            print(f"üóëÔ∏è Removed {removed_count} duplicate rows")
        else:
            print("‚úÖ No duplicates found")
            
        return df_cleaned

    def preprocess_pipeline(self, target_col=None):
        """Complete preprocessing pipeline."""
        print("üöÄ Starting Data Preprocessing Pipeline...")
        print("="*60)
        
        # Step 1: Load data
        df = self.get_dataset()
        if df is None:
            return None
        
        # Step 2: Sanity check
        if not self.sanity_check(df):
            return None
        
        # Step 3: Check categorical distributions
        self.check_categorical_distributions(df)
        
        # Step 4: EDA summary
        self.eda_summary(df)
        
        # Step 5: Remove duplicates
        df = self.remove_duplicates(df)
        
        # Step 6: Handle missing values
        df = self.handle_missing_values(df)
        
        # Step 7: Detect outliers
        outlier_cols = self.detect_and_report_outliers(df)
        
        # Step 8: Scale numerical features
        df = self.choose_and_apply_scaler(df)
        
        # Step 9: Encode categorical features
        df = self.auto_encode_categorical(df, target_col=target_col)
        
        print("\n" + "="*60)
        print("‚úÖ PREPROCESSING COMPLETED!")
        print("="*60)
        print(f"Final dataset shape: {df.shape}")
        print(f"Final columns: {list(df.columns)}")
        
        return df

# Usage Example
if __name__ == "__main__":
    # Initialize preprocessor
    preprocessor = DataPreprocessor()
    
    # Run complete pipeline
    # If you have a target column, specify it: target_col='your_target_column'
    processed_df = preprocessor.preprocess_pipeline(target_col=None)
    
    if processed_df is not None:
        print("\nüéâ Data preprocessing successful!")
        print("Your data is now ready for machine learning!")
    else:
        print("\n‚ùå Preprocessing failed. Please check your data and try again.")

üöÄ Starting Data Preprocessing Pipeline...


Enter the name of the dataset:  iris.csv


‚úÖ Successfully loaded dataset: iris.csv

üìä DATA SANITY CHECK REPORT
Shape of the dataset: (150, 5)
Memory usage: 0.01 MB

üìã Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   sepal_length  150 non-null    float64
 1   sepal_width   150 non-null    float64
 2   petal_length  150 non-null    float64
 3   petal_width   150 non-null    float64
 4   species       150 non-null    object 
dtypes: float64(4), object(1)
memory usage: 6.0+ KB

üîç Missing Values Analysis:
‚úÖ No missing values found!

üîÑ Duplicate rows: 3
   (2.00% of total data)

üìà CATEGORICAL DISTRIBUTIONS

'species' ‚Äî 3 unique value(s)
species
setosa        50
versicolor    50
virginica     50
Name: count, dtype: int64
----------------------------------------

üìä EXPLORATORY DATA ANALYSIS

üî¢ Numerical Columns Summary:
       sepal_length  sepal_width

In [None]:
def recommend_methods(n_features, n_samples, has_target):
    """
    Recommend feature selection/modeling approaches based on dataset characteristics.
    
    Args:
        n_features (int): Number of features
        n_samples (int): Number of samples
        has_target (bool): Whether target variable exists (supervised)
    
    Returns:
        dict: Recommended methods grouped by type
    """
    recommendations = {
        "feature_selection": [],
        "modeling": []
    }
    
    if not has_target:
        # Unsupervised methods / filter methods
        recommendations["feature_selection"].append("Variance Threshold")
        recommendations["feature_selection"].append("Unsupervised clustering-based selection")
        # For modeling (unsupervised)
        recommendations["modeling"].append("KMeans")
        recommendations["modeling"].append("PCA")
        recommendations["modeling"].append("Autoencoders")
    
    else:
        # Supervised
        
        # Feature selection method choice
        if n_features > 100 or n_samples > 5000:
            # Large dataset ‚Äî prefer fast filter methods
            recommendations["feature_selection"].append("Variance Threshold")
            recommendations["feature_selection"].append("Generic Univariate Feature Selection (e.g., SelectKBest with chi2, ANOVA)")
            recommendations["feature_selection"].append("Mutual Information")
            # Modeling options for big data
            recommendations["modeling"].append("Linear / Logistic Regression")
            recommendations["modeling"].append("Ridge Regression")
            recommendations["modeling"].append("Random Forest (for embedded feature importance)")
            recommendations["modeling"].append("XGBoost / LightGBM")
        
        else:
            # Smaller dataset ‚Äî can try wrapper and embedded methods
            recommendations["feature_selection"].append("Recursive Feature Elimination (RFE) with SVM or Random Forest")
            recommendations["feature_selection"].append("Sequential Feature Selector (SFS)")
            recommendations["feature_selection"].append("Generic Univariate Feature Selection")
            
            # Modeling
            recommendations["modeling"].append("SVM (SVC / SVR)")
            recommendations["modeling"].append("K-Nearest Neighbors (KNN)")
            recommendations["modeling"].append("Linear / Logistic Regression")
            recommendations["modeling"].append("Ridge Regression")
            recommendations["modeling"].append("Random Forest")
            recommendations["modeling"].append("Gradient Boosting Machines (XGBoost, LightGBM)")
    
    return recommendations


In [32]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder

class DataPreprocessor:
    def __init__(self):
        self.imputer = None
        self.scaler = None
        self.encoders = {}
        self.numerical_cols = []
        self.categorical_cols = []
        
    def recommend_methods(self, n_features, n_samples, has_target):
        """
        Recommend feature selection/modeling approaches based on dataset characteristics.
        
        Args:
            n_features (int): Number of features
            n_samples (int): Number of samples
            has_target (bool): Whether target variable exists (supervised)
        
        Returns:
            dict: Recommended methods grouped by type
        """
        recommendations = {
            "feature_selection": [],
            "modeling": []
        }
        
        if not has_target:
            # Unsupervised methods / filter methods
            recommendations["feature_selection"].append("Variance Threshold")
            recommendations["feature_selection"].append("Unsupervised clustering-based selection")
            # For modeling (unsupervised)
            recommendations["modeling"].append("KMeans")
            recommendations["modeling"].append("PCA")
            recommendations["modeling"].append("Autoencoders")
        
        else:
            # Supervised
            
            # Feature selection method choice
            if n_features > 100 or n_samples > 5000:
                # Large dataset ‚Äî prefer fast filter methods
                recommendations["feature_selection"].append("Variance Threshold")
                recommendations["feature_selection"].append("Generic Univariate Feature Selection (e.g., SelectKBest with chi2, ANOVA)")
                recommendations["feature_selection"].append("Mutual Information")
                # Modeling options for big data
                recommendations["modeling"].append("Linear / Logistic Regression")
                recommendations["modeling"].append("Ridge Regression")
                recommendations["modeling"].append("Random Forest (for embedded feature importance)")
                recommendations["modeling"].append("XGBoost / LightGBM")
            
            else:
                # Smaller dataset ‚Äî can try wrapper and embedded methods
                recommendations["feature_selection"].append("Recursive Feature Elimination (RFE) with SVM or Random Forest")
                recommendations["feature_selection"].append("Sequential Feature Selector (SFS)")
                recommendations["feature_selection"].append("Generic Univariate Feature Selection")
                
                # Modeling
                recommendations["modeling"].append("SVM (SVC / SVR)")
                recommendations["modeling"].append("K-Nearest Neighbors (KNN)")
                recommendations["modeling"].append("Linear / Logistic Regression")
                recommendations["modeling"].append("Ridge Regression")
                recommendations["modeling"].append("Random Forest")
                recommendations["modeling"].append("Gradient Boosting Machines (XGBoost, LightGBM)")
        
        return recommendations

    def get_dataset(self):
        """Load dataset from CSV file."""
        try:
            dataset = input("Enter the name of the dataset: ")
            df = pd.read_csv(dataset)
            print(f"‚úÖ Successfully loaded dataset: {dataset}")
            return df
        except FileNotFoundError:
            print("‚ùå File not found. Please check the filename.")
        except Exception as e:
            print(f"‚ùå An error occurred: {e}")
        return None

    def sanity_check(self, df):
        """Perform comprehensive data quality checks."""
        if df is None:
            print("‚ùå Data doesn't exist.")
            return False
            
        try:
            print("\n" + "="*60)
            print("üìä DATA SANITY CHECK REPORT")
            print("="*60)
            
            # Basic info
            print(f"Shape of the dataset: {df.shape}")
            print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
            
            print("\nüìã Dataset Info:")
            df.info()
            
            # Missing values
            print("\nüîç Missing Values Analysis:")
            missing_percent = (df.isnull().sum() / df.shape[0] * 100).round(2)
            missing_data = pd.DataFrame({
                'Column': missing_percent.index,
                'Missing Count': df.isnull().sum(),
                'Missing %': missing_percent
            })
            missing_data = missing_data[missing_data['Missing Count'] > 0]
            
            if len(missing_data) > 0:
                print(missing_data.to_string(index=False))
            else:
                print("‚úÖ No missing values found!")
            
            # Duplicates
            duplicate_count = df.duplicated().sum()
            print(f"\nüîÑ Duplicate rows: {duplicate_count}")
            if duplicate_count > 0:
                print(f"   ({duplicate_count/len(df)*100:.2f}% of total data)")
            
            return True
            
        except Exception as e:
            print(f"‚ùå Error during sanity check: {e}")
            return False

    def check_categorical_distributions(self, df):
        """Analyze categorical column distributions."""
        if df is None:
            return
            
        obj_cols = df.select_dtypes(include=["object", "category"]).columns
        if len(obj_cols) == 0:
            print("‚ÑπÔ∏è No categorical columns found.")
            return
            
        print("\n" + "="*60)
        print("üìà CATEGORICAL DISTRIBUTIONS")
        print("="*60)
        
        for col in obj_cols:
            unique_vals = df[col].nunique()
            print(f"\n'{col}' ‚Äî {unique_vals} unique value(s)")
            
            if unique_vals <= 15:  # Show distributions for low cardinality
                print(df[col].value_counts(dropna=False).head(10))
            else:
                print("‚ö†Ô∏è High cardinality - showing top 5 values:")
                print(df[col].value_counts().head(5))
            print("-" * 40)

    def eda_summary(self, df):
        """Generate exploratory data analysis summary."""
        if df is None: 
            print("‚ùå Data doesn't exist.")
            return 
            
        try:
            print("\n" + "="*60)
            print("üìä EXPLORATORY DATA ANALYSIS")
            print("="*60)
            
            # Numerical summary
            numerical_cols = df.select_dtypes(include=[np.number]).columns
            if len(numerical_cols) > 0:
                print("\nüî¢ Numerical Columns Summary:")
                print(df[numerical_cols].describe().round(2))
            
            # Categorical summary
            categorical_cols = df.select_dtypes(include=["object", "category"]).columns
            if len(categorical_cols) > 0:
                print("\nüìù Categorical Columns Summary:")
                print(df[categorical_cols].describe())
                
        except Exception as e:
            print(f"‚ùå Exception occurred: {e}")

    def handle_missing_values(self, df):
        """Handle missing values using appropriate strategies."""
        if df.isnull().sum().sum() == 0:
            print("‚úÖ No missing values to handle!")
            return df
            
        print("\nüîß Handling Missing Values...")
        
        # Separate numerical and categorical columns
        self.numerical_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
        
        df_processed = df.copy()
        
        # Handle numerical columns
        if self.numerical_cols:
            num_imputer = SimpleImputer(strategy='median')  # More robust than mean
            df_processed[self.numerical_cols] = num_imputer.fit_transform(df[self.numerical_cols])
            print(f"   ‚úÖ Imputed {len(self.numerical_cols)} numerical columns with median")
        
        # Handle categorical columns
        if self.categorical_cols:
            cat_imputer = SimpleImputer(strategy='most_frequent')
            df_processed[self.categorical_cols] = cat_imputer.fit_transform(df[self.categorical_cols])
            print(f"   ‚úÖ Imputed {len(self.categorical_cols)} categorical columns with mode")
        
        return df_processed

    def detect_and_report_outliers(self, df):
        """Detect outliers using IQR method."""
        outlier_summary = []
        
        for col in df.select_dtypes(include=[np.number]).columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
            outlier_percentage = len(outliers) / len(df) * 100
            
            if outlier_percentage > 1:  # More than 1% outliers
                outlier_summary.append({
                    'Column': col,
                    'Outlier Count': len(outliers),
                    'Outlier %': round(outlier_percentage, 2)
                })
        
        if outlier_summary:
            print("\n‚ö†Ô∏è Outlier Detection Results:")
            outlier_df = pd.DataFrame(outlier_summary)
            print(outlier_df.to_string(index=False))
            return [item['Column'] for item in outlier_summary]
        else:
            print("‚úÖ No significant outliers found.")
            return []

    def choose_and_apply_scaler(self, df):
        """Choose appropriate scaler based on data characteristics."""
        numerical_cols = df.select_dtypes(include=[np.number]).columns
        
        if len(numerical_cols) == 0:
            print("‚ÑπÔ∏è No numerical columns to scale.")
            return df
        
        # Analyze data characteristics
        outlier_cols = []
        sparse_cols = []
        skewed_cols = []
        
        for col in numerical_cols:
            # Check for outliers
            Q1, Q3 = df[col].quantile([0.25, 0.75])
            IQR = Q3 - Q1
            outliers = ((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))).sum()
            if outliers > len(df) * 0.01:
                outlier_cols.append(col)
            
            # Check for sparsity (mostly zeros)
            zero_ratio = (df[col] == 0).sum() / len(df)
            if zero_ratio > 0.5:
                sparse_cols.append(col)
            
            # Check skewness
            if abs(df[col].skew()) > 1:
                skewed_cols.append(col)
        
        # Decide scaler
        scaler_choice = None
        if len(outlier_cols) > 0:
            scaler_choice = "RobustScaler"
            self.scaler = RobustScaler()
        elif len(sparse_cols) > 0:
            scaler_choice = "MaxAbsScaler"
            self.scaler = MaxAbsScaler()
        else:
            scaler_choice = "StandardScaler"
            self.scaler = StandardScaler()
        
        print(f"\n‚öôÔ∏è Scaling data using {scaler_choice}")
        
        df_scaled = df.copy()
        df_scaled[numerical_cols] = self.scaler.fit_transform(df[numerical_cols])
        
        return df_scaled

    def encode_categorical(self, df):
        """Encode categorical columns."""
        df_encoded = df.copy()
        self.categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
        
        if len(self.categorical_cols) == 0:
            print("‚ÑπÔ∏è No categorical columns to encode.")
            return df_encoded
        
        print("\nüî¢ Encoding Categorical Columns...")
        
        for col in self.categorical_cols:
            unique_vals = df[col].nunique()
            if unique_vals <= 2:
                # Binary encoding
                le = LabelEncoder()
                df_encoded[col] = le.fit_transform(df[col])
                self.encoders[col] = le
                print(f"   - Label encoded '{col}' (binary)")
            else:
                # OneHot encoding (can be changed to ordinal if needed)
                ohe = OneHotEncoder(sparse_output=False, drop='first')
                transformed = ohe.fit_transform(df[[col]])
                new_cols = [f"{col}_{cat}" for cat in ohe.categories_[0][1:]]
                df_ohe = pd.DataFrame(transformed, columns=new_cols, index=df.index)
                df_encoded = pd.concat([df_encoded.drop(columns=[col]), df_ohe], axis=1)
                self.encoders[col] = ohe
                print(f"   - One-hot encoded '{col}' with {len(new_cols)} new columns")
        
        return df_encoded

    def preprocess(self, df):
        """Run full preprocessing pipeline."""
        if df is None:
            print("‚ùå No data to preprocess.")
            return None
        
        print("üöÄ Starting preprocessing pipeline...")
        
        # Sanity check
        if not self.sanity_check(df):
            return None
        
        # Handle missing values
        df = self.handle_missing_values(df)
        
        # Detect outliers (report only)
        self.detect_and_report_outliers(df)
        
        # Scale numerical
        df = self.choose_and_apply_scaler(df)
        
        # Encode categorical
        df = self.encode_categorical(df)
        
        print("‚úÖ Preprocessing complete!")
        
        return df

    def recommend_after_preprocessing(self, df, target_col=None):
        """Recommend methods based on preprocessed data shape and presence of target."""
        n_samples, n_features = df.shape
        
        has_target = False
        if target_col and target_col in df.columns:
            has_target = True
            # Exclude target from feature count
            n_features -= 1
        
        recommendations = self.recommend_methods(n_features, n_samples, has_target)
        
        print("\n" + "="*60)
        print("üí° RECOMMENDED METHODS")
        print("="*60)
        
        print("\nüîπ Feature Selection Methods:")
        for method in recommendations['feature_selection']:
            print(f" - {method}")
            
        print("\nüîπ Modeling Methods:")
        for method in recommendations['modeling']:
            print(f" - {method}")
        
        return recommendations


# Example usage
if __name__ == "__main__":
    dp = DataPreprocessor()
    
    df = dp.get_dataset()
    if df is not None:
        df_processed = dp.preprocess(df)
        
        # Optionally specify target column for recommendations
        target_column = input("Enter target column name (or press Enter if none): ").strip()
        target_column = target_column if target_column else None
        
        dp.recommend_after_preprocessing(df_processed, target_column)


Enter the name of the dataset:  iris.csv


‚úÖ Successfully loaded dataset: iris.csv
üöÄ Starting preprocessing pipeline...

üìä DATA SANITY CHECK REPORT
Shape of the dataset: (150, 5)
Memory usage: 0.01 MB

üìã Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   sepal_length  150 non-null    float64
 1   sepal_width   150 non-null    float64
 2   petal_length  150 non-null    float64
 3   petal_width   150 non-null    float64
 4   species       150 non-null    object 
dtypes: float64(4), object(1)
memory usage: 6.0+ KB

üîç Missing Values Analysis:
‚úÖ No missing values found!

üîÑ Duplicate rows: 3
   (2.00% of total data)
‚úÖ No missing values to handle!

‚ö†Ô∏è Outlier Detection Results:
     Column  Outlier Count  Outlier %
sepal_width              4       2.67

‚öôÔ∏è Scaling data using RobustScaler

üî¢ Encoding Categorical Columns...
   - One-hot encoded 'spec

Enter target column name (or press Enter if none):  



üí° RECOMMENDED METHODS

üîπ Feature Selection Methods:
 - Variance Threshold
 - Unsupervised clustering-based selection

üîπ Modeling Methods:
 - KMeans
 - PCA
 - Autoencoders


In [33]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder

class DataPreprocessor:
    def __init__(self):
        self.imputer = None
        self.scaler = None
        self.encoders = {}
        self.numerical_cols = []
        self.categorical_cols = []
        
    def recommend_methods(self, n_features, n_samples, has_target):
        recommendations = {
            "feature_selection": [],
            "modeling": []
        }
        
        if not has_target:
            recommendations["feature_selection"].append("Variance Threshold")
            recommendations["feature_selection"].append("Unsupervised clustering-based selection")
            recommendations["modeling"].append("KMeans")
            recommendations["modeling"].append("PCA")
            recommendations["modeling"].append("Autoencoders")
        else:
            if n_features > 100 or n_samples > 5000:
                recommendations["feature_selection"].append("Variance Threshold")
                recommendations["feature_selection"].append("Generic Univariate Feature Selection (e.g., SelectKBest with chi2, ANOVA)")
                recommendations["feature_selection"].append("Mutual Information")
                recommendations["modeling"].append("Linear / Logistic Regression")
                recommendations["modeling"].append("Ridge Regression")
                recommendations["modeling"].append("Random Forest (for embedded feature importance)")
                recommendations["modeling"].append("XGBoost / LightGBM")
            else:
                recommendations["feature_selection"].append("Recursive Feature Elimination (RFE) with SVM or Random Forest")
                recommendations["feature_selection"].append("Sequential Feature Selector (SFS)")
                recommendations["feature_selection"].append("Generic Univariate Feature Selection")
                recommendations["modeling"].append("SVM (SVC / SVR)")
                recommendations["modeling"].append("K-Nearest Neighbors (KNN)")
                recommendations["modeling"].append("Linear / Logistic Regression")
                recommendations["modeling"].append("Ridge Regression")
                recommendations["modeling"].append("Random Forest")
                recommendations["modeling"].append("Gradient Boosting Machines (XGBoost, LightGBM)")
        
        return recommendations

    def get_dataset(self):
        try:
            dataset = input("Enter the name of the dataset: ")
            df = pd.read_csv(dataset)
            print(f"‚úÖ Successfully loaded dataset: {dataset}")
            return df
        except FileNotFoundError:
            print("‚ùå File not found. Please check the filename.")
        except Exception as e:
            print(f"‚ùå An error occurred: {e}")
        return None

    def detect_target_column(self, df):
        """Auto-detect target column based on heuristics."""
        common_targets = ['target', 'label', 'class', 'species', 'outcome', 'y']
        for col in common_targets:
            if col in df.columns:
                print(f"‚ÑπÔ∏è Auto-detected target column: '{col}'")
                return col
        
        # Find categorical columns
        categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
        if len(categorical_cols) == 1:
            print(f"‚ÑπÔ∏è Auto-detected target column: '{categorical_cols[0]}' (only categorical column)")
            return categorical_cols[0]
        
        if len(categorical_cols) > 1:
            unique_counts = df[categorical_cols].nunique()
            target_candidate = unique_counts.idxmin()
            print(f"‚ÑπÔ∏è Auto-detected target column: '{target_candidate}' (categorical with least unique values)")
            return target_candidate
        
        # Default to last column if no categorical found
        print(f"‚ÑπÔ∏è Defaulting to last column as target: '{df.columns[-1]}'")
        return df.columns[-1]

    def sanity_check(self, df):
        if df is None:
            print("‚ùå Data doesn't exist.")
            return False
            
        try:
            print("\n" + "="*60)
            print("üìä DATA SANITY CHECK REPORT")
            print("="*60)
            print(f"Shape of the dataset: {df.shape}")
            print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
            print("\nüìã Dataset Info:")
            df.info()
            print("\nüîç Missing Values Analysis:")
            missing_percent = (df.isnull().sum() / df.shape[0] * 100).round(2)
            missing_data = pd.DataFrame({
                'Column': missing_percent.index,
                'Missing Count': df.isnull().sum(),
                'Missing %': missing_percent
            })
            missing_data = missing_data[missing_data['Missing Count'] > 0]
            if len(missing_data) > 0:
                print(missing_data.to_string(index=False))
            else:
                print("‚úÖ No missing values found!")
            duplicate_count = df.duplicated().sum()
            print(f"\nüîÑ Duplicate rows: {duplicate_count}")
            if duplicate_count > 0:
                print(f"   ({duplicate_count/len(df)*100:.2f}% of total data)")
            return True
        except Exception as e:
            print(f"‚ùå Error during sanity check: {e}")
            return False

    def check_categorical_distributions(self, df):
        if df is None:
            return
        obj_cols = df.select_dtypes(include=["object", "category"]).columns
        if len(obj_cols) == 0:
            print("‚ÑπÔ∏è No categorical columns found.")
            return
        print("\n" + "="*60)
        print("üìà CATEGORICAL DISTRIBUTIONS")
        print("="*60)
        for col in obj_cols:
            unique_vals = df[col].nunique()
            print(f"\n'{col}' ‚Äî {unique_vals} unique value(s)")
            if unique_vals <= 15:
                print(df[col].value_counts(dropna=False).head(10))
            else:
                print("‚ö†Ô∏è High cardinality - showing top 5 values:")
                print(df[col].value_counts().head(5))
            print("-" * 40)

    def eda_summary(self, df):
        if df is None:
            print("‚ùå Data doesn't exist.")
            return
        try:
            print("\n" + "="*60)
            print("üìä EXPLORATORY DATA ANALYSIS")
            print("="*60)
            numerical_cols = df.select_dtypes(include=[np.number]).columns
            if len(numerical_cols) > 0:
                print("\nüî¢ Numerical Columns Summary:")
                print(df[numerical_cols].describe().round(2))
            categorical_cols = df.select_dtypes(include=["object", "category"]).columns
            if len(categorical_cols) > 0:
                print("\nüìù Categorical Columns Summary:")
                print(df[categorical_cols].describe())
        except Exception as e:
            print(f"‚ùå Exception occurred: {e}")

    def handle_missing_values(self, df):
        if df.isnull().sum().sum() == 0:
            print("‚úÖ No missing values to handle!")
            return df
        print("\nüîß Handling Missing Values...")
        self.numerical_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
        df_processed = df.copy()
        if self.numerical_cols:
            num_imputer = SimpleImputer(strategy='median')
            df_processed[self.numerical_cols] = num_imputer.fit_transform(df[self.numerical_cols])
            print(f"   ‚úÖ Imputed {len(self.numerical_cols)} numerical columns with median")
        if self.categorical_cols:
            cat_imputer = SimpleImputer(strategy='most_frequent')
            df_processed[self.categorical_cols] = cat_imputer.fit_transform(df[self.categorical_cols])
            print(f"   ‚úÖ Imputed {len(self.categorical_cols)} categorical columns with mode")
        return df_processed

    def detect_and_report_outliers(self, df):
        outlier_summary = []
        for col in df.select_dtypes(include=[np.number]).columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
            outlier_percentage = len(outliers) / len(df) * 100
            if outlier_percentage > 1:
                outlier_summary.append({
                    'Column': col,
                    'Outlier Count': len(outliers),
                    'Outlier %': round(outlier_percentage, 2)
                })
        if outlier_summary:
            print("\n‚ö†Ô∏è Outlier Detection Results:")
            outlier_df = pd.DataFrame(outlier_summary)
            print(outlier_df.to_string(index=False))
            return [item['Column'] for item in outlier_summary]
        else:
            print("‚úÖ No significant outliers found.")
            return []

    def choose_and_apply_scaler(self, df):
        numerical_cols = df.select_dtypes(include=[np.number]).columns
        if len(numerical_cols) == 0:
            print("‚ÑπÔ∏è No numerical columns to scale.")
            return df
        outlier_cols = []
        sparse_cols = []
        skewed_cols = []
        for col in numerical_cols:
            Q1, Q3 = df[col].quantile([0.25, 0.75])
            IQR = Q3 - Q1
            outliers = ((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))).sum()
            if outliers > len(df) * 0.01:
                outlier_cols.append(col)
            zero_ratio = (df[col] == 0).sum() / len(df)
            if zero_ratio > 0.5:
                sparse_cols.append(col)
            if abs(df[col].skew()) > 1:
                skewed_cols.append(col)
        scaler_choice = None
        if len(outlier_cols) > 0:
            scaler_choice = "RobustScaler"
            self.scaler = RobustScaler()
        elif len(sparse_cols) > 0:
            scaler_choice = "MaxAbsScaler"
            self.scaler = MaxAbsScaler()
        else:
            scaler_choice = "StandardScaler"
            self.scaler = StandardScaler()
        print(f"\n‚öôÔ∏è Scaling data using {scaler_choice}")
        df_scaled = df.copy()
        df_scaled[numerical_cols] = self.scaler.fit_transform(df[numerical_cols])
        return df_scaled

    def encode_categorical(self, df):
        df_encoded = df.copy()
        self.categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
        if len(self.categorical_cols) == 0:
            print("‚ÑπÔ∏è No categorical columns to encode.")
            return df_encoded
        print("\nüî¢ Encoding Categorical Columns...")
        for col in self.categorical_cols:
            unique_vals = df[col].nunique()
            if unique_vals <= 2:
                le = LabelEncoder()
                df_encoded[col] = le.fit_transform(df[col])
                self.encoders[col] = le
                print(f"   - Label encoded '{col}' (binary)")
            else:
                ohe = OneHotEncoder(sparse_output=False, drop='first')
                transformed = ohe.fit_transform(df[[col]])
                new_cols = [f"{col}_{cat}" for cat in ohe.categories_[0][1:]]
                df_ohe = pd.DataFrame(transformed, columns=new_cols, index=df.index)
                df_encoded = pd.concat([df_encoded.drop(columns=[col]), df_ohe], axis=1)
                self.encoders[col] = ohe
                print(f"   - One-hot encoded '{col}' with {unique_vals} unique values")
        return df_encoded

    def preprocess(self, df):
        if df is None:
            return None
        # Auto-detect target
        target_col = self.detect_target_column(df)
        # Identify feature columns (excluding target)
        features = df.drop(columns=[target_col])
        target = df[target_col]

        print(f"\nTarget column detected: '{target_col}'")
        print(f"Number of features: {features.shape[1]}, Number of samples: {df.shape[0]}")

        # Sanity check and EDA
        if not self.sanity_check(df):
            return None
        self.check_categorical_distributions(df)
        self.eda_summary(df)

        # Handle missing values
        df_imputed = self.handle_missing_values(df)

        # Detect outliers
        outlier_cols = self.detect_and_report_outliers(df_imputed)

        # Scale numerical features
        df_scaled = self.choose_and_apply_scaler(df_imputed)

        # Encode categorical features
        df_encoded = self.encode_categorical(df_scaled)

        # Summary recommendations
        has_target = target_col is not None and target_col in df.columns
        recs = self.recommend_methods(
            n_features=features.shape[1],
            n_samples=df.shape[0],
            has_target=has_target
        )
        print("\n" + "="*60)
        print("üí° RECOMMENDATIONS")
        print("="*60)
        print(f"Feature Selection: {recs['feature_selection']}")
        print(f"Modeling Techniques: {recs['modeling']}")
        print("="*60)

        return df_encoded, target_col

# Usage example:
if __name__ == "__main__":
    dp = DataPreprocessor()
    df = dp.get_dataset()
    if df is not None:
        processed_df, target_col = dp.preprocess(df)


Enter the name of the dataset:  iris.csv


‚úÖ Successfully loaded dataset: iris.csv
‚ÑπÔ∏è Auto-detected target column: 'species'

Target column detected: 'species'
Number of features: 4, Number of samples: 150

üìä DATA SANITY CHECK REPORT
Shape of the dataset: (150, 5)
Memory usage: 0.01 MB

üìã Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   sepal_length  150 non-null    float64
 1   sepal_width   150 non-null    float64
 2   petal_length  150 non-null    float64
 3   petal_width   150 non-null    float64
 4   species       150 non-null    object 
dtypes: float64(4), object(1)
memory usage: 6.0+ KB

üîç Missing Values Analysis:
‚úÖ No missing values found!

üîÑ Duplicate rows: 3
   (2.00% of total data)

üìà CATEGORICAL DISTRIBUTIONS

'species' ‚Äî 3 unique value(s)
species
setosa        50
versicolor    50
virginica     50
Name: count, dtype: int64
-----------

In [34]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder

class DataPreprocessor:
    def __init__(self):
        self.imputer = None
        self.scaler = None
        self.encoders = {}
        self.numerical_cols = []
        self.categorical_cols = []
        
    def recommend_methods(self, n_features, n_samples, has_target):
        recommendations = {
            "feature_selection": [],
            "modeling": []
        }
        
        if not has_target:
            recommendations["feature_selection"].append("Variance Threshold")
            recommendations["feature_selection"].append("Unsupervised clustering-based selection")
            recommendations["modeling"].append("KMeans")
            recommendations["modeling"].append("PCA")
            recommendations["modeling"].append("Autoencoders")
        else:
            if n_features > 100 or n_samples > 5000:
                recommendations["feature_selection"].append("Variance Threshold")
                recommendations["feature_selection"].append("Generic Univariate Feature Selection (e.g., SelectKBest with chi2, ANOVA)")
                recommendations["feature_selection"].append("Mutual Information")
                recommendations["modeling"].append("Linear / Logistic Regression")
                recommendations["modeling"].append("Ridge Regression")
                recommendations["modeling"].append("Random Forest (for embedded feature importance)")
                recommendations["modeling"].append("XGBoost / LightGBM")
            else:
                recommendations["feature_selection"].append("Recursive Feature Elimination (RFE) with SVM or Random Forest")
                recommendations["feature_selection"].append("Sequential Feature Selector (SFS)")
                recommendations["feature_selection"].append("Generic Univariate Feature Selection")
                recommendations["modeling"].append("SVM (SVC / SVR)")
                recommendations["modeling"].append("K-Nearest Neighbors (KNN)")
                recommendations["modeling"].append("Linear / Logistic Regression")
                recommendations["modeling"].append("Ridge Regression")
                recommendations["modeling"].append("Random Forest")
                recommendations["modeling"].append("Gradient Boosting Machines (XGBoost, LightGBM)")
        
        return recommendations

    def get_dataset(self):
        try:
            dataset = input("Enter the name of the dataset file (CSV): ")
            df = pd.read_csv(dataset)
            print(f"Successfully loaded dataset: {dataset}")
            return df
        except FileNotFoundError:
            print("File not found. Please check the filename.")
        except Exception as e:
            print(f"An error occurred: {e}")
        return None

    def detect_target_column(self, df):
        """Auto-detect target column based on heuristics."""
        common_targets = ['target', 'label', 'class', 'species', 'outcome', 'y']
        for col in common_targets:
            if col in df.columns:
                print(f"Auto-detected target column: '{col}'")
                return col
        
        categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
        if len(categorical_cols) == 1:
            print(f"Auto-detected target column: '{categorical_cols[0]}' (only categorical column)")
            return categorical_cols[0]
        
        if len(categorical_cols) > 1:
            unique_counts = df[categorical_cols].nunique()
            target_candidate = unique_counts.idxmin()
            print(f"Auto-detected target column: '{target_candidate}' (categorical with least unique values)")
            return target_candidate
        
        print(f"Defaulting to last column as target: '{df.columns[-1]}'")
        return df.columns[-1]

    def sanity_check(self, df):
        if df is None:
            print("Data doesn't exist.")
            return False
            
        try:
            print("\nDATA SANITY CHECK REPORT")
            print(f"Shape of the dataset: {df.shape}")
            print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
            print("\nDataset Info:")
            df.info()
            print("\nMissing Values Analysis:")
            missing_percent = (df.isnull().sum() / df.shape[0] * 100).round(2)
            missing_data = pd.DataFrame({
                'Column': missing_percent.index,
                'Missing Count': df.isnull().sum(),
                'Missing %': missing_percent
            })
            missing_data = missing_data[missing_data['Missing Count'] > 0]
            if len(missing_data) > 0:
                print(missing_data.to_string(index=False))
            else:
                print("No missing values found.")
            duplicate_count = df.duplicated().sum()
            print(f"\nDuplicate rows: {duplicate_count}")
            if duplicate_count > 0:
                print(f"({duplicate_count/len(df)*100:.2f}% of total data)")
            return True
        except Exception as e:
            print(f"Error during sanity check: {e}")
            return False

    def check_categorical_distributions(self, df):
        if df is None:
            return
        obj_cols = df.select_dtypes(include=["object", "category"]).columns
        if len(obj_cols) == 0:
            print("No categorical columns found.")
            return
        print("\nCATEGORICAL DISTRIBUTIONS")
        for col in obj_cols:
            unique_vals = df[col].nunique()
            print(f"\n'{col}' ‚Äî {unique_vals} unique value(s)")
            if unique_vals <= 15:
                print(df[col].value_counts(dropna=False).head(10))
            else:
                print("High cardinality - showing top 5 values:")
                print(df[col].value_counts().head(5))

    def eda_summary(self, df):
        if df is None:
            print("Data doesn't exist.")
            return
        try:
            print("\nEXPLORATORY DATA ANALYSIS")
            numerical_cols = df.select_dtypes(include=[np.number]).columns
            if len(numerical_cols) > 0:
                print("\nNumerical Columns Summary:")
                print(df[numerical_cols].describe().round(2))
            categorical_cols = df.select_dtypes(include=["object", "category"]).columns
            if len(categorical_cols) > 0:
                print("\nCategorical Columns Summary:")
                print(df[categorical_cols].describe())
        except Exception as e:
            print(f"Exception occurred: {e}")

    def handle_missing_values(self, df):
        if df.isnull().sum().sum() == 0:
            print("No missing values to handle.")
            return df
        print("\nHandling Missing Values...")
        self.numerical_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
        df_processed = df.copy()
        if self.numerical_cols:
            num_imputer = SimpleImputer(strategy='median')
            df_processed[self.numerical_cols] = num_imputer.fit_transform(df[self.numerical_cols])
            print(f"Imputed {len(self.numerical_cols)} numerical columns with median")
        if self.categorical_cols:
            cat_imputer = SimpleImputer(strategy='most_frequent')
            df_processed[self.categorical_cols] = cat_imputer.fit_transform(df[self.categorical_cols])
            print(f"Imputed {len(self.categorical_cols)} categorical columns with mode")
        return df_processed

    def detect_and_report_outliers(self, df):
        outlier_summary = []
        for col in df.select_dtypes(include=[np.number]).columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
            outlier_percentage = len(outliers) / len(df) * 100
            if outlier_percentage > 1:
                outlier_summary.append({
                    'Column': col,
                    'Outlier Count': len(outliers),
                    'Outlier %': round(outlier_percentage, 2)
                })
        if outlier_summary:
            print("\nOutlier Detection Results:")
            outlier_df = pd.DataFrame(outlier_summary)
            print(outlier_df.to_string(index=False))
            return [item['Column'] for item in outlier_summary]
        else:
            print("No significant outliers found.")
            return []

    def choose_and_apply_scaler(self, df):
        numerical_cols = df.select_dtypes(include=[np.number]).columns
        if len(numerical_cols) == 0:
            print("No numerical columns to scale.")
            return df
        outlier_cols = []
        sparse_cols = []
        skewed_cols = []
        for col in numerical_cols:
            Q1, Q3 = df[col].quantile([0.25, 0.75])
            IQR = Q3 - Q1
            outliers = ((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))).sum()
            if outliers > len(df) * 0.01:
                outlier_cols.append(col)
            zero_ratio = (df[col] == 0).sum() / len(df)
            if zero_ratio > 0.5:
                sparse_cols.append(col)
            if abs(df[col].skew()) > 1:
                skewed_cols.append(col)
        scaler_choice = None
        if len(outlier_cols) > 0:
            scaler_choice = "RobustScaler"
            self.scaler = RobustScaler()
        elif len(sparse_cols) > 0:
            scaler_choice = "MaxAbsScaler"
            self.scaler = MaxAbsScaler()
        else:
            scaler_choice = "StandardScaler"
            self.scaler = StandardScaler()
        print(f"\nScaling data using {scaler_choice}")
        df_scaled = df.copy()
        df_scaled[numerical_cols] = self.scaler.fit_transform(df[numerical_cols])
        return df_scaled

    def encode_categorical(self, df):
        df_encoded = df.copy()
        self.categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
        if len(self.categorical_cols) == 0:
            print("No categorical columns to encode.")
            return df_encoded
        print("\nEncoding Categorical Columns...")
        for col in self.categorical_cols:
            unique_vals = df[col].nunique()
            if unique_vals <= 2:
                le = LabelEncoder()
                df_encoded[col] = le.fit_transform(df[col])
                self.encoders[col] = le
                print(f"Label encoded '{col}' (binary)")
            else:
                ohe = OneHotEncoder(sparse_output=False, drop='first')
                transformed = ohe.fit_transform(df[[col]])
                new_cols = [f"{col}_{cat}" for cat in ohe.categories_[0][1:]]
                df_ohe = pd.DataFrame(transformed, columns=new_cols, index=df.index)
                df_encoded = pd.concat([df_encoded.drop(columns=[col]), df_ohe], axis=1)
                self.encoders[col] = ohe
                print(f"One-hot encoded '{col}' with {unique_vals} unique values")
        return df_encoded

    def preprocess(self, df):
        if df is None:
            return None
        target_col = self.detect_target_column(df)
        features = df.drop(columns=[target_col])
        target = df[target_col]

        print(f"\nTarget column detected: '{target_col}'")
        print(f"Number of features: {features.shape[1]}, Number of samples: {df.shape[0]}")

        if not self.sanity_check(df):
            return None
        self.check_categorical_distributions(df)
        self.eda_summary(df)

        df_imputed = self.handle_missing_values(df)
        outlier_cols = self.detect_and_report_outliers(df_imputed)
        df_scaled = self.choose_and_apply_scaler(df_imputed)
        df_encoded = self.encode_categorical(df_scaled)

        has_target = target_col is not None and target_col in df.columns
        recs = self.recommend_methods(
            n_features=features.shape[1],
            n_samples=df.shape[0],
            has_target=has_target
        )
        print("\nRECOMMENDATIONS")
        print(f"Feature Selection Methods: {recs['feature_selection']}")
        print(f"Modeling Techniques: {recs['modeling']}\n")

        return df_encoded, target_col

# Usage example:
if __name__ == "__main__":
    dp = DataPreprocessor()
    df = dp.get_dataset()
    if df is not None:
        processed_df, target_col = dp.preprocess(df)


Enter the name of the dataset file (CSV):  iris.csv


Successfully loaded dataset: iris.csv
Auto-detected target column: 'species'

Target column detected: 'species'
Number of features: 4, Number of samples: 150

DATA SANITY CHECK REPORT
Shape of the dataset: (150, 5)
Memory usage: 0.01 MB

Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   sepal_length  150 non-null    float64
 1   sepal_width   150 non-null    float64
 2   petal_length  150 non-null    float64
 3   petal_width   150 non-null    float64
 4   species       150 non-null    object 
dtypes: float64(4), object(1)
memory usage: 6.0+ KB

Missing Values Analysis:
No missing values found.

Duplicate rows: 3
(2.00% of total data)

CATEGORICAL DISTRIBUTIONS

'species' ‚Äî 3 unique value(s)
species
setosa        50
versicolor    50
virginica     50
Name: count, dtype: int64

EXPLORATORY DATA ANALYSIS

Numerical Columns Summary: