In [23]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
import unittest

def preprocess_data(df, target_column=None):
    """
    Applies a comprehensive set of data preprocessing steps to the input DataFrame.
    Includes rigorous type checking and handles potential errors.

    Args:
        df (pd.DataFrame): The input DataFrame to preprocess.
        target_column (str, optional): The name of the target variable column.
                                       If provided, it will be separated before preprocessing
                                       and rejoined at the end. Defaults to None.

    Returns:
        tuple: A tuple containing the preprocessed features (pd.DataFrame) and
               the target variable (pd.Series) if target_column is provided,
               otherwise just the preprocessed features (pd.DataFrame).

    Raises:
        TypeError: If the input 'df' is not a pandas DataFrame.
        ValueError: If 'target_column' is not found in the DataFrame.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    X = df.copy()
    y = None
    if target_column is not None:
        if target_column not in X.columns:
            raise ValueError(f"Target column '{target_column}' not found in DataFrame.")
        y = X[target_column]
        X = X.drop(columns=[target_column])

    numerical_cols = X.select_dtypes(include=np.number).columns.tolist()
    categorical_cols = X.select_dtypes(include='object').columns.tolist()

    numerical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, numerical_cols),
            ('cat', categorical_transformer, categorical_cols)])

    try:
        X_processed = preprocessor.fit_transform(X)
        feature_names = preprocessor.get_feature_names_out(input_features=X.columns)
        X_processed_df = pd.DataFrame(X_processed, columns=feature_names, index=X.index)
    except Exception as e:
        raise RuntimeError(f"Error during preprocessing: {e}")

    if y is not None:
        return X_processed_df, y
    else:
        return X_processed_df

def handle_outliers_iqr(df, columns, threshold=1.5):
    """
    Handles outliers in specified numerical columns of a DataFrame using the IQR method.
    Includes type and value checks, and handles cases where columns might not exist.

    Args:
        df (pd.DataFrame): The input DataFrame.
        columns (list): A list of column names to handle outliers in.
        threshold (float): The multiplier for the IQR to define outlier boundaries.

    Returns:
        pd.DataFrame: The DataFrame with outliers capped at the IQR boundaries.

    Raises:
        TypeError: If 'df' is not a pandas DataFrame or 'columns' is not a list.
        ValueError: If 'threshold' is not a positive number.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    if not isinstance(columns, list):
        raise TypeError("Input 'columns' must be a list.")
    if not isinstance(threshold, (int, float)) or threshold <= 0:
        raise ValueError("Input 'threshold' must be a positive number.")

    df_cleaned = df.copy()
    for col in columns:
        if col in df_cleaned.columns:
            if pd.api.types.is_numeric_dtype(df_cleaned[col]):
                Q1 = df_cleaned[col].quantile(0.25)
                Q3 = df_cleaned[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR
                df_cleaned[col] = np.where(df_cleaned[col] < lower_bound, lower_bound, df_cleaned[col])
                df_cleaned[col] = np.where(df_cleaned[col] > upper_bound, upper_bound, df_cleaned[col])
            else:
                print(f"Warning: Column '{col}' is not numerical and outlier handling was skipped.")
        else:
            print(f"Warning: Column '{col}' not found in DataFrame.")
    return df_cleaned

def remove_duplicate_rows(df):
    """
    Removes duplicate rows from a DataFrame.
    Includes type checking.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: The DataFrame with duplicate rows removed.

    Raises:
        TypeError: If the input 'df' is not a pandas DataFrame.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    df_no_duplicates = df.drop_duplicates().reset_index(drop=True)
    return df_no_duplicates

def handle_imbalanced_data(X, y, method='oversampling', random_state=None):
    """
    Handles imbalanced datasets using either oversampling or undersampling.
    Includes type and value checks, and handles cases where the method is not recognized.

    Args:
        X (pd.DataFrame): The feature matrix.
        y (pd.Series): The target variable.
        method (str): 'oversampling' or 'undersampling'. Defaults to 'oversampling'.
        random_state (int, optional): Random seed for reproducibility. Defaults to None.

    Returns:
        tuple: The resampled feature matrix (pd.DataFrame) and target variable (pd.Series).

    Raises:
        TypeError: If 'X' is not a pandas DataFrame or 'y' is not a pandas Series.
        ValueError: If 'method' is not 'oversampling' or 'undersampling'.
    """
    if not isinstance(X, pd.DataFrame):
        raise TypeError("Input 'X' must be a pandas DataFrame.")
    if not isinstance(y, pd.Series):
        raise TypeError("Input 'y' must be a pandas Series.")
    if method not in ['oversampling', 'undersampling']:
        raise ValueError("Input 'method' must be either 'oversampling' or 'undersampling'.")

    if method == 'oversampling':
        smote = SMOTE(random_state=random_state)
        try:
            X_resampled, y_resampled = smote.fit_resample(X, y)
            return pd.DataFrame(X_resampled, columns=X.columns), pd.Series(y_resampled)
        except Exception as e:
            raise RuntimeError(f"Error during oversampling: {e}")
    elif method == 'undersampling':
        rus = RandomUnderSampler(random_state=random_state)
        try:
            X_resampled, y_resampled = rus.fit_resample(X, y)
            return pd.DataFrame(X_resampled, columns=X.columns), pd.Series(y_resampled)
        except Exception as e:
            raise RuntimeError(f"Error during undersampling: {e}")

class TestDataPreprocessing(unittest.TestCase):
    def setUp(self):
        self.sample_data = pd.DataFrame({
            'numerical_col1': [1, 5, 2, np.nan, 7, 100, 3],
            'numerical_col2': [0.1, 0.5, 0.2, 0.8, np.nan, 0.3, 0.6],
            'categorical_col1': ['A', 'B', 'A', 'C', 'B', 'A', np.nan],
            'categorical_col2': ['X', 'Y', 'X', 'Z', 'Y', 'X', 'Y'],
            'target_variable': [0, 1, 0, 1, 0, 1, 0]
        })

    def test_preprocess_data_type_error(self):
        with self.assertRaises(TypeError):
            preprocess_data("not a dataframe")

    def test_preprocess_data_target_not_found(self):
        with self.assertRaises(ValueError):
            preprocess_data(self.sample_data, target_column='non_existent_column')

    def test_preprocess_data_output_shape(self):
        processed_X, y = preprocess_data(self.sample_data, target_column='target_variable')
        self.assertEqual(processed_X.shape[0], self.sample_data.shape[0])
        self.assertGreater(processed_X.shape[1], self.sample_data.drop(columns=['target_variable']).shape[1]) # Due to one-hot encoding
        self.assertEqual(y.shape[0], self.sample_data.shape[0])

    def test_handle_outliers_iqr_type_error(self):
        with self.assertRaises(TypeError):
            handle_outliers_iqr(self.sample_data, "not a list")
        with self.assertRaises(TypeError):
            handle_outliers_iqr("not a dataframe", ['numerical_col1'])
        with self.assertRaises(ValueError):
            handle_outliers_iqr(self.sample_data, ['numerical_col1'], 0)

    def test_handle_outliers_iqr_column_not_found(self):
        df_modified = handle_outliers_iqr(self.sample_data.copy(), ['non_existent_column'])
        self.assertTrue(df_modified.equals(self.sample_data)) # Should return original if column not found

    def test_handle_outliers_iqr_non_numerical_column(self):
        df_modified = handle_outliers_iqr(self.sample_data.copy(), ['categorical_col1'])
        self.assertTrue(df_modified.equals(self.sample_data)) # Should skip non-numerical

    def test_remove_duplicate_rows_type_error(self):
        with self.assertRaises(TypeError):
            remove_duplicate_rows("not a dataframe")

    def test_remove_duplicate_rows_functionality(self):
        df_with_duplicates = pd.concat([self.sample_data, self.sample_data.iloc[[0]]]).reset_index(drop=True)
        df_unique = remove_duplicate_rows(df_with_duplicates)
        self.assertEqual(df_unique.shape[0], self.sample_data.shape[0])

    def test_handle_imbalanced_data_type_error(self):
        X = self.sample_data.drop(columns=['target_variable'])
        y = self.sample_data['target_variable']
        with self.assertRaises(TypeError):
            handle_imbalanced_data("not a dataframe", y)
        with self.assertRaises(TypeError):
            handle_imbalanced_data(X, "not a series")
        with self.assertRaises(ValueError):
            handle_imbalanced_data(X, y, method='invalid_method')

    def test_handle_imbalanced_data_oversampling(self):
        X = self.sample_data.drop(columns=['target_variable'])
        y = self.sample_data['target_variable']
        X_resampled, y_resampled = handle_imbalanced_data(X, y, method='oversampling', random_state=42)
        self.assertGreaterEqual(y_resampled.value_counts().min(), y.value_counts().min())

    def test_handle_imbalanced_data_undersampling(self):
        X = self.sample_data.drop(columns=['target_variable'])
        y = self.sample_data['target_variable']
        X_resampled, y_resampled = handle_imbalanced_data(X, y, method='undersampling', random_state=42)
        self.assertLessEqual(y_resampled.value_counts().max(), y.value_counts().max())

if _name_ == "_main_":
    # Sample DataFrame (replace with your actual data loading)
    data = {
        'numerical_col1': [1, 5, 2, np.nan, 7, 100, 3],
        'numerical_col2': [0.1, 0.5, 0.2, 0.8, np.nan, 0.3, 0.6],
        'categorical_col1': ['A', 'B', 'A', 'C', 'B', 'A', np.nan],
        'categorical_col2': ['X', 'Y', 'X', 'Z', 'Y', 'X', 'Y'],
        'target_variable': [0, 1, 0, 1, 0, 1, 0]
    }
    df = pd.DataFrame(data)

    print("Original DataFrame:")
    print(df)
    print("\n" + "="*50 + "\n")

    # Separate target variable if it exists
    TARGET_COLUMN = 'target_variable'
    try:
        if TARGET_COLUMN in df.columns:
            X, y = df.drop(columns=[TARGET_COLUMN]), df[TARGET_COLUMN]
        else:
            X, y = df, None

        # Handle missing values and scale/encode
        X_processed = preprocess_data(X)
        print("DataFrame after missing value handling, scaling, and encoding:")
        print(X_processed)
        print("\n" + "="*50 + "\n")

        # Handle outliers in numerical columns
        numerical_cols = X.select_dtypes(include=np.number).columns.tolist()
        X_processed_no_outliers = handle_outliers_iqr(X_processed, numerical_cols)
        print("DataFrame after outlier handling (IQR method):")
        print(X_processed_no_outliers)
        print("\n" + "="*50 + "\n")

        # Remove duplicate rows
        X_processed_unique = remove_duplicate_rows(X_processed_no_outliers)
        print("DataFrame after removing duplicate rows:")
        print(X_processed_unique)
        print("\n" + "="*50 + "\n")

        # Handle imbalanced data if a target variable exists
        if y is not None:
            X_balanced, y_balanced = handle_imbalanced_data(X_processed_unique, y, method='oversampling', random_state=42)
            print("DataFrame after handling imbalanced data (oversampling):")
            print("Resampled Features:")
            print(X_balanced)
            print("\nResampled Target:")
            print(y_balanced.value_counts())
            print("\n" + "="*50 + "\n")

        # Run unit tests
        print("\n" + "="*50 + "\n")
        print("Running Unit Tests:")
        suite = unittest.TestLoader().loadTestsFromTestCase(TestDataPreprocessing)
        unittest.TextTestRunner(verbosity=2).run(suite)

    except (TypeError, ValueError, RuntimeError) as e:
        print(f"An error occurred during processing: {e}")

NameError: name '_name_' is not defined