In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from typing import List, Union

warnings.filterwarnings('ignore')

In [5]:
"""
Credit Score Data Preprocessing Pipeline
======================================
This script handles comprehensive data cleaning, imputation, and feature engineering 
for credit scoring dataset with temporal customer data.
"""


class CreditDataPreprocessor:
    """
    A comprehensive data preprocessing pipeline for credit scoring data.

    This class handles:
    - Data cleaning and corruption removal
    - Missing value imputation with customer-aware logic
    - Feature engineering and encoding
    - Outlier detection and treatment
    """

    def __init__(self, file_path: str):
        """Initialize the preprocessor with data file."""
        self.file_path = file_path
        self.df = None
        self.setup_display_options()

    def setup_display_options(self):
        """Configure pandas display options for better data inspection."""
        pd.set_option('display.max_columns', 1000)
        pd.set_option('display.max_rows', 1000)
        pd.set_option('display.float_format', lambda x: '%.5f' % x)

    def load_data(self) -> pd.DataFrame:
        """Load and perform initial data preparation."""
        print("Loading data...")
        self.df = pd.read_csv(self.file_path)
        self.df.columns = self.df.columns.str.lower()
        print(f"Data loaded: {self.df.shape}")
        return self.df

    def remove_irrelevant_columns(self) -> pd.DataFrame:
        """Remove columns that don't contribute to credit scoring."""
        columns_to_drop = ['id', 'name', 'ssn']
        self.df = self.df.drop(columns_to_drop, axis=1)
        print(f"Dropped irrelevant columns: {columns_to_drop}")
        return self.df

    def clean_corrupted_data(self) -> pd.DataFrame:
        """
        Remove corrupted and invalid data patterns found in the dataset.

        Handles:
        - Extreme values wrapped in underscores
        - Random character patterns
        - Empty strings and various null representations
        """
        print("Cleaning corrupted data...")

        # Handle specific extreme corrupted values
        extreme_value_mapping = {
            '__-333333333333333333333333333__': np.nan,
            '__10000__': np.nan
        }
        self.df.replace(extreme_value_mapping, inplace=True)

        # Define invalid patterns found in the data
        invalid_patterns = ['', 'nan', '!@9#%8', '#F%$D@*&8', 'NM', 'nm']

        # Strip underscores and replace invalid patterns
        self.df = self.df.applymap(
            lambda x: x if x is np.nan or not isinstance(x, str)
            else str(x).strip('_')
        ).replace(invalid_patterns, np.nan)

        print("Corrupted data patterns cleaned")
        return self.df

    def convert_data_types(self) -> pd.DataFrame:
        """Convert columns to appropriate data types after cleaning."""
        print("Converting data types...")

        # Numeric columns that should be converted
        numeric_conversions = {
            'age': int,
            'annual_income': float,
            'num_of_loan': int,
            'num_of_delayed_payment': float,
            'changed_credit_limit': float,
            'outstanding_debt': float,
            'amount_invested_monthly': float,
            'monthly_balance': float
        }

        for col, dtype in numeric_conversions.items():
            try:
                self.df[col] = self.df[col].astype(dtype)
            except ValueError as e:
                print(f"Warning: Could not convert {col} to {dtype}: {e}")

        return self.df

    def standardize_string_columns(self) -> pd.DataFrame:
        """Standardize string columns to lowercase with underscores."""
        string_columns = list(self.df.dtypes[self.df.dtypes == 'object'].index)

        for col in string_columns:
            self.df[col] = self.df[col].str.lower().str.replace(' ', '_')

        print(f"Standardized {len(string_columns)} string columns")
        return self.df

    def handle_customer_stable_features(self) -> pd.DataFrame:
        """
        Handle features that should be stable within each customer's timeline.
        Uses forward/backward fill within customer groups.
        """
        print("Handling customer-stable features...")

        stable_features = ['occupation']

        for feature in stable_features:
            # Forward fill then backward fill within customer groups
            self.df[feature] = self.df.groupby(
                'customer_id')[feature].fillna(method='ffill')
            self.df[feature] = self.df.groupby(
                'customer_id')[feature].fillna(method='bfill')

        return self.df

    def clean_age_column(self) -> pd.DataFrame:
        """Clean age column with business logic validation."""
        print("Cleaning age column...")

        # Set unrealistic ages to NaN (based on data analysis: 14-60 range)
        self.df.loc[(self.df['age'] < 14) | (
            self.df['age'] > 60), 'age'] = np.nan

        # Fill missing ages within customer groups
        self.df['age'] = self.df.groupby('customer_id')['age'].fillna(
            method='ffill').fillna(method='bfill')

        return self.df

    def parse_credit_history_age(self, x) -> Union[float, int]:
        """Parse credit history age from text format to months."""
        if pd.isna(x) or str(x).lower() == 'na':
            return np.nan

        parts = str(x).replace('_', ' ').split(' and ')
        years = int(parts[0].split(' ')[0]) * 12
        months = int(parts[1].split(' ')[0])

        return years + months

    def fill_credit_history_sequential(self, group) -> pd.Series:
        """
        Fill missing credit history values considering monthly progression.
        Credit history should increment by 1 each month.
        """
        group = group.copy()

        # Forward fill with increment
        for i in range(1, len(group)):
            if pd.isna(group.iloc[i]) and not pd.isna(group.iloc[i-1]):
                group.iloc[i] = group.iloc[i-1] + 1

        # Backward fill with decrement
        for i in range(len(group)-2, -1, -1):
            if pd.isna(group.iloc[i]) and not pd.isna(group.iloc[i+1]):
                group.iloc[i] = group.iloc[i+1] - 1

        return group

    def handle_credit_history_age(self) -> pd.DataFrame:
        """Transform and impute credit history age column."""
        print("Processing credit history age...")

        # Replace 'na' strings with proper NaN
        self.df['credit_history_age'] = self.df['credit_history_age'].replace({
                                                                              'na': np.nan})

        # Parse text format to numeric (months)
        self.df['credit_history_age'] = self.df['credit_history_age'].apply(
            self.parse_credit_history_age)

        # Apply sequential filling within customer groups
        self.df['credit_history_age'] = self.df.groupby('customer_id')['credit_history_age'].apply(
            self.fill_credit_history_sequential
        ).reset_index(level=0, drop=True)

        return self.df

    def impute_customer_grouped_features(self) -> pd.DataFrame:
        """Impute missing values using customer-grouped statistics."""
        print("Imputing customer-grouped features...")

        # Features that should be filled within customer groups
        customer_grouped_features = {
            'monthly_inhand_salary': 'ffill_bfill',
            'credit_mix': 'ffill_bfill',
            'payment_of_min_amount': 'mode',
            'payment_behaviour': 'mode_safe',
            'num_of_delayed_payment': 'median',
            'changed_credit_limit': 'median'
        }

        for feature, method in customer_grouped_features.items():
            if method == 'ffill_bfill':
                self.df[feature] = self.df.groupby(
                    'customer_id')[feature].fillna(method='ffill')
                self.df[feature] = self.df.groupby(
                    'customer_id')[feature].fillna(method='bfill')

            elif method == 'mode':
                self.df[feature] = self.df.groupby('customer_id')[feature].transform(
                    lambda x: x.mode()[0] if not x.mode().empty else np.nan
                )

            elif method == 'mode_safe':
                self.df[feature] = self.df.groupby('customer_id')[feature].transform(
                    lambda x: x.fillna(
                        x.mode()[0] if not x.mode().empty else 'unknown')
                )

            elif method == 'median':
                self.df[feature] = self.df.groupby('customer_id')[feature].transform(
                    lambda x: x.median() if not x.isnull().all() else np.nan
                )

        return self.df

    def handle_remaining_missing_values(self) -> pd.DataFrame:
        """Handle remaining missing values with appropriate strategies."""
        print("Handling remaining missing values...")

        # Mean imputation for balance-related features
        mean_imputation_cols = ['monthly_balance', 'amount_invested_monthly']
        for col in mean_imputation_cols:
            self.df[col] = self.df.groupby('customer_id')[col].transform(
                lambda x: x.fillna(x.mean())
            )

        # Median imputation for count-based features with zero handling
        median_imputation_cols = ['num_of_loan', 'num_credit_inquiries',
                                  'num_bank_accounts', 'total_emi_per_month']

        for col in median_imputation_cols:
            # Customer-level median first
            self.df[col] = self.df.groupby('customer_id')[col].transform(
                lambda x: x.median() if not x.isnull().all() else np.nan
            )

            # Replace invalid zeros with NaN, then global median imputation
            self.df[col] = self.df[col].replace(0, np.nan)
            self.df[col].fillna(self.df[col].median(), inplace=True)

            # Convert to integer for count-based features
            if col in ['num_of_loan', 'num_credit_inquiries', 'num_bank_accounts']:
                self.df[col] = self.df[col].astype(int)

        return self.df

    def engineer_loan_features(self) -> pd.DataFrame:
        """Create binary features for different loan types."""
        print("Engineering loan type features...")

        # Parse loan types and create binary features
        loan_type_split = self.df['type_of_loan'].str.split(
            r', and |, | and |,'
        ).dropna()

        # Extract all unique loan types
        loan_types_list = [
            item.removeprefix('_and_').strip('_')
            for sublist in loan_type_split.tolist()
            for item in sublist
        ]
        unique_loan_types = set(loan_types_list)

        # Create binary features for each loan type
        for loan_type in unique_loan_types:
            if pd.notna(loan_type):
                feature_name = f'has_{loan_type}'

                # Check if customer has this loan type
                self.df[feature_name] = self.df['type_of_loan'].apply(
                    lambda x: int(loan_type in x) if pd.notna(x) else np.nan
                )

                # Fill missing values with mode
                mode_value = self.df[feature_name].mode().iloc[0]
                self.df[feature_name].fillna(mode_value, inplace=True)

        return self.df

    def create_dummy_variables(self) -> pd.DataFrame:
        """Create dummy variables for categorical features."""
        print("Creating dummy variables...")

        categorical_features = [
            ('month', 'month'),
            ('occupation', 'occupation'),
            ('credit_mix', 'credit_mix'),
            ('payment_of_min_amount', 'payment_of_min_amount'),
            ('payment_behaviour', None)  # No prefix for payment_behaviour
        ]

        for feature, prefix in categorical_features:
            if feature in self.df.columns:
                if prefix:
                    dummies = pd.get_dummies(
                        self.df[feature],
                        prefix=prefix,
                        drop_first=True,
                        dtype=int
                    )
                else:
                    dummies = pd.get_dummies(
                        self.df[feature],
                        drop_first=True,
                        dtype=int
                    )

                self.df = pd.concat([self.df, dummies], axis=1)

        return self.df

    def encode_target_variable(self) -> pd.DataFrame:
        """Encode the target variable (credit_score) to numeric values."""
        print("Encoding target variable...")

        # Map credit scores to numeric values (ordinal encoding)
        credit_score_mapping = {"poor": 0, "good": 1, "standard": 2}
        self.df['credit_score'] = self.df['credit_score'].replace(
            credit_score_mapping)

        print("Credit score distribution:")
        print(self.df['credit_score'].value_counts().sort_index())

        return self.df

    def drop_original_categorical_columns(self) -> pd.DataFrame:
        """Drop original categorical columns after creating dummy variables."""
        columns_to_drop = [
            'type_of_loan', 'month', 'occupation', 'credit_mix',
            'payment_of_min_amount', 'payment_behaviour', 'customer_id'
        ]

        existing_columns_to_drop = [
            col for col in columns_to_drop if col in self.df.columns]
        self.df = self.df.drop(existing_columns_to_drop, axis=1)

        print(
            f"Dropped original categorical columns: {existing_columns_to_drop}")
        return self.df

    def detect_and_treat_outliers(self, columns: List[str], method: str = 'iqr') -> pd.DataFrame:
        """
        Detect and treat outliers in specified columns.

        Args:
            columns: List of column names to check for outliers
            method: Method for outlier detection ('iqr' for IQR method)
        """
        print(f"Treating outliers in {len(columns)} columns...")

        for col in columns:
            if col not in self.df.columns:
                continue

            # Calculate IQR bounds
            q1 = self.df[col].quantile(0.25)
            q3 = self.df[col].quantile(0.75)
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr

            # Count outliers
            outliers_count = len(self.df[
                (self.df[col] < lower_bound) | (self.df[col] > upper_bound)
            ])

            if outliers_count > 0:
                print(f"{col}: {outliers_count} outliers detected")

                # Treat outliers based on column characteristics
                if col == 'total_emi_per_month':
                    # Use median of non-outliers for EMI
                    non_outlier_median = self.df[
                        (self.df[col] >= lower_bound) & (
                            self.df[col] <= upper_bound)
                    ][col].median()

                    self.df.loc[self.df[col] > upper_bound,
                                col] = non_outlier_median

                else:
                    # Use customer-grouped mode for other features
                    outlier_mask = (self.df[col] > upper_bound) | (
                        self.df[col] < lower_bound)
                    self.df.loc[outlier_mask, col] = self.df.groupby('customer_id')[col].transform(
                        lambda x: x.mode()[0] if not x.mode(
                        ).empty else x.median()
                    )[outlier_mask]

        return self.df

    def generate_data_summary(self) -> pd.DataFrame:
        """Generate final data summary and statistics."""
        print("\n" + "="*50)
        print("DATA PREPROCESSING SUMMARY")
        print("="*50)
        print(f"Final dataset shape: {self.df.shape}")
        print(f"Missing values per column:")
        missing_values = self.df.isnull().sum()
        if missing_values.sum() > 0:
            print(missing_values[missing_values > 0])
        else:
            print("No missing values remaining!")

        print(f"\nData types:")
        print(self.df.dtypes.value_counts())

        return self.df.describe()

    def run_full_pipeline(self) -> pd.DataFrame:
        """Execute the complete preprocessing pipeline."""
        print("Starting Credit Score Data Preprocessing Pipeline...")
        print("="*60)

        # Load and initial cleaning
        self.load_data()
        self.remove_irrelevant_columns()
        self.clean_corrupted_data()
        self.convert_data_types()
        self.standardize_string_columns()

        # Handle missing values with domain knowledge
        self.handle_customer_stable_features()
        self.clean_age_column()
        self.handle_credit_history_age()
        self.impute_customer_grouped_features()
        self.handle_remaining_missing_values()

        # Feature engineering
        self.engineer_loan_features()
        self.create_dummy_variables()
        self.encode_target_variable()

        # Outlier treatment for key numerical columns
        numerical_columns = [
            'num_credit_card', 'interest_rate', 'num_credit_inquiries',
            'annual_income', 'total_emi_per_month'
        ]
        self.detect_and_treat_outliers(numerical_columns)
        
        # Final cleanup
        self.drop_original_categorical_columns()
        
        # Generate summary
        summary = self.generate_data_summary()

        print("\nPreprocessing pipeline completed successfully!")
        return self.df

In [6]:
# Initialize and run preprocessing pipeline
preprocessor = CreditDataPreprocessor("./train.csv")
processed_df = preprocessor.run_full_pipeline()
# Optional: Save processed data
processed_df.to_csv("processed_credit_data.csv", index=False)
print("Processed data saved to 'processed_credit_data.csv'")

Starting Credit Score Data Preprocessing Pipeline...
Loading data...
Data loaded: (100000, 28)
Dropped irrelevant columns: ['id', 'name', 'ssn']
Cleaning corrupted data...
Corrupted data patterns cleaned
Converting data types...
Standardized 9 string columns
Handling customer-stable features...
Cleaning age column...
Processing credit history age...
Imputing customer-grouped features...
Handling remaining missing values...
Engineering loan type features...
Creating dummy variables...
Encoding target variable...
Credit score distribution:
credit_score
0    28998
1    17828
2    53174
Name: count, dtype: int64
Treating outliers in 5 columns...
num_credit_card: 2271 outliers detected
interest_rate: 2034 outliers detected
num_credit_inquiries: 696 outliers detected
annual_income: 2783 outliers detected
total_emi_per_month: 6200 outliers detected
Dropped original categorical columns: ['type_of_loan', 'month', 'occupation', 'credit_mix', 'payment_of_min_amount', 'payment_behaviour', 'custome

In [None]:
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.preprocessing import StandardScaler

from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score,make_scorer, f1_score

In [12]:
X = processed_df.drop('credit_score', axis=1)
y = processed_df['credit_score']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [13]:
integer_columns = ['age',
                   'num_bank_accounts',
                   'num_credit_card',
                   'interest_rate',
                   'num_of_loan',
                   'delay_from_due_date',
                   'num_of_delayed_payment',
                   'num_credit_inquiries',
                   'credit_history_age']

float_columns = ['credit_utilization_ratio',
                 'annual_income',
                 'monthly_inhand_salary',
                 'changed_credit_limit',
                 'outstanding_debt',
                 'total_emi_per_month',
                 'amount_invested_monthly',
                 'monthly_balance']

numerical_columns = integer_columns + float_columns

scaler = StandardScaler()
X_train[numerical_columns] = scaler.fit_transform(X_train[numerical_columns])
X_test[numerical_columns] = scaler.transform(X_test[numerical_columns])

In [15]:
models = {
    "Random Forest": RandomForestClassifier(),
    "XGBoost": XGBClassifier()
}

params = {
    "Random Forest": {
        'n_estimators': [16, 32, 64, 128],
        'max_depth': [None, 10, 20],
       # 'min_samples_split': [5, 10],
        #'min_samples_leaf': [2, 5],
        #'bootstrap': [True, False],
    },
    "XGBoost": {
        'learning_rate': [0.1, 0.05, 0.001],
        'n_estimators': [16, 32, 64, 128],
        #'max_depth': [3, 4, 5],
        'subsample': [0.7, 0.8]
    }
}

f1_scorer = make_scorer(f1_score, average='weighted')

grid_searches = {}
for model_name, model in models.items():
    grid_search = GridSearchCV(
        model,
        params[model_name],
        cv=3,  # Use the number of desired cross-validation folds
        scoring=f1_scorer,
        n_jobs=-1,  # Use all available CPU cores
        verbose=2,
    )
    grid_searches[model_name] = grid_search

In [16]:
grid_searches

{'Random Forest': GridSearchCV(cv=3, estimator=RandomForestClassifier(), n_jobs=-1,
              param_grid={'max_depth': [None, 10, 20],
                          'n_estimators': [16, 32, 64, 128]},
              scoring=make_scorer(f1_score, response_method='predict', average=weighted),
              verbose=2),
 'XGBoost': GridSearchCV(cv=3,
              estimator=XGBClassifier(base_score=None, booster=None,
                                      callbacks=None, colsample_bylevel=None,
                                      colsample_bynode=None,
                                      colsample_bytree=None, device=None,
                                      early_stopping_rounds=None,
                                      enable_categorical=False, eval_metric=None,
                                      feature_types=None, feature_weights=None,
                                      gamma=None, grow_policy=None,
                                      importance_type=None,
              

In [17]:
best_models = {}
for model_name, grid_search in grid_searches.items():
    grid_search.fit(X_train, y_train)  # X_train and y_train are your training data
    best_models[model_name] = grid_search.best_estimator_

Fitting 3 folds for each of 12 candidates, totalling 36 fits


KeyboardInterrupt: 

In [None]:
best_models

In [None]:
best_f1_score = -1  # Initialize with a low value
best_model = None

for model_name, grid_search in grid_searches.items():
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")
    print(f"Best F1-score for {model_name}: {grid_search.best_score_}")
    print("=="*25,"\n")

    if grid_search.best_score_ > best_f1_score:
        best_f1_score = grid_search.best_score_
        best_model = grid_search.best_estimator_

if best_model is not None:
    print("Best model based on F1-score:")
    print(best_model)
    print(f"Best F1-score: {best_f1_score}")

In [None]:
print(best_model)

In [None]:
y_pred = best_model.predict(X_test)
confusion = confusion_matrix(y_test, y_pred)
report = classification_report(y_test, y_pred)

# Plot a heatmap for the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(confusion, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title(f'Confusion Matrix for {best_model}')
plt.show()

# Print the classification report
print(f"Classification Report for {best_model}:\n")
print(report)