In [24]:
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE, ADASYN
from imblearn.under_sampling import RandomUnderSampler, NearMiss
from imblearn.combine import SMOTETomek, SMOTEENN
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
import joblib
import os
from typing import Tuple, Dict, Any
from collections import Counter

import shap
from lime.lime_tabular import LimeTabularExplainer


In [None]:
class DataPreprocessor(ABC):
    """Interface for data preprocessing"""
    @abstractmethod
    def preprocess(self, data: pd.DataFrame) -> pd.DataFrame:
        pass

class FeatureEngineer(ABC):
    """Interface for feature engineering"""
    @abstractmethod
    def engineer_features(self, data: pd.DataFrame) -> pd.DataFrame:
        pass

class DataResampler(ABC):
    """Interface for data resampling"""
    @abstractmethod
    def resample(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
        pass

class ModelTrainer(ABC):
    """Interface for model training"""
    @abstractmethod
    def train(self, X_train: pd.DataFrame, y_train: pd.Series):
        pass
    
    @abstractmethod
    def predict(self, X_test: pd.DataFrame):
        pass

class ModelSaver(ABC):
    """Interface for model persistence"""
    @abstractmethod
    def save(self, model, filepath: str):
        pass
    
    @abstractmethod
    def load(self, filepath: str):
        pass

class CreditRiskPreprocessor(DataPreprocessor):
    def __init__(self, feature_columns=None):
        self.feature_columns = feature_columns
        self.scaler = StandardScaler()
        self.label_encoder = LabelEncoder()
    
    def preprocess(self, data: pd.DataFrame) -> pd.DataFrame:
        processed_data = data.copy()
        
        # Handle missing values
        for col in processed_data.columns:
            if processed_data[col].dtype in ['object']:
                processed_data[col].fillna(processed_data[col].mode()[0], inplace=True)
            else:
                processed_data[col].fillna(processed_data[col].median(), inplace=True)
        
        # Encode categorical variables
        categorical_cols = processed_data.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            processed_data[col] = self.label_encoder.fit_transform(processed_data[col].astype(str))
            
        #Handle dependents column value 3+
        # Replace '3+' with 3 and convert to integer
        processed_data['Dependents'] = processed_data['Dependents'].replace('3+', 3).astype(int)
        
        return processed_data

class CreditRiskFeatureEngineer(FeatureEngineer):
    def engineer_features(self, data: pd.DataFrame) -> pd.DataFrame:
        engineered_data = data.copy()
        
        # Create new features
        engineered_data['Total_Income'] = engineered_data['ApplicantIncome'] + engineered_data['CoapplicantIncome']
        engineered_data['Loan_Income_Ratio'] = engineered_data['LoanAmount'] / (engineered_data['Total_Income'] + 1)
        
        # Log transformations
        log_cols = ['ApplicantIncome', 'LoanAmount', 'Total_Income']
        for col in log_cols:
            if col in engineered_data.columns:
                engineered_data[f'{col}_Log'] = np.log1p(engineered_data[col])
        
        # Polynomial features
        engineered_data['Income_Squared'] = engineered_data['Total_Income'] ** 2
        engineered_data['Loan_Amount_Squared'] = engineered_data['LoanAmount'] ** 2
        
        return engineered_data

class SMOTETomekResampler(DataResampler):
    def __init__(self, random_state: int = 42):
        self.resampler = SMOTETomek(random_state=random_state)
    
    def resample(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
        return self.resampler.fit_resample(X, y)

class SMOTEResampler(DataResampler):
    def __init__(self, random_state: int = 42):
        self.resampler = SMOTE(random_state=random_state)
    
    def resample(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
        return self.resampler.fit_resample(X, y)

class ADASYNResampler(DataResampler):
    def __init__(self, random_state: int = 42):
        self.resampler = ADASYN(random_state=random_state)
    
    def resample(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
        return self.resampler.fit_resample(X, y)

class RandomUndersampler(DataResampler):
    def __init__(self, random_state: int = 42):
        self.resampler = RandomUnderSampler(random_state=random_state)
    
    def resample(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
        return self.resampler.fit_resample(X, y)

class BaseModelTrainer:
    """Base class for model trainers"""
    def __init__(self):
        self.model = None
        self.best_params = None
        self.cv_results = None
    
    def get_best_params(self):
        return self.best_params
    
    def get_cv_results(self):
        return self.cv_results
    
class XGBoostModelTrainer(BaseModelTrainer):
    def __init__(self):
        super().__init__()
        self.param_grid = {
            'max_depth': [3, 4, 5],
            'learning_rate': [0.01, 0.05, 0.1],
            'n_estimators': [100, 200],
            'min_child_weight': [1, 3],
            'gamma': [0, 0.1],
            'subsample': [0.8, 1.0],
            'colsample_bytree': [0.8, 1.0],
            'max_delta_step': [1, 2, 4]
        }
    
    def train(self, X_train: pd.DataFrame, y_train: pd.Series):
        # Calculate scale_pos_weight
        pos_weight = sum(y_train == 0) / sum(y_train == 1)
        
        # Update param_grid with scale_pos_weight
        full_param_grid = self.param_grid.copy()
        full_param_grid['scale_pos_weight'] = [1, pos_weight]
        
        print("Starting XGBoost hyperparameter tuning...")
        print(f"Calculated scale_pos_weight: {pos_weight}")
        
        # Create a validation split from training data
        X_train_split, X_valid_split, y_train_split, y_valid_split = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
        )
        
        # Define fit parameters including eval set
        fit_params = {
            'eval_set': [(X_valid_split, y_valid_split)],
            'early_stopping_rounds': 10,
            'verbose': 0  # Reduce verbosity since we're doing multiple fits
        }
        
        grid_search = GridSearchCV(
            estimator=xgb.XGBClassifier(
                random_state=42,
                objective='binary:logistic',
                eval_metric='auc'
            ),
            param_grid=full_param_grid,
            cv=5,
            scoring='f1',
            n_jobs=-1,
            verbose=1,
            error_score='raise'
        )
        
        grid_search.fit(X_train_split, y_train_split)
        
        self.model = grid_search.best_estimator_
        self.best_params = grid_search.best_params_
        self.cv_results = grid_search.cv_results_
        
        # Final fit on the entire training data with best parameters
        print("\nFitting final model with best parameters...")
        self.model = xgb.XGBClassifier(
            **self.best_params,
            random_state=42,
            objective='binary:logistic',
            eval_metric='auc'
        )
        
        # Create validation set for final model
        X_train_final, X_valid_final, y_train_final, y_valid_final = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
        )
        
        self.model.fit(
            X_train_final,
            y_train_final,
            eval_set=[(X_valid_final, y_valid_final)],
#             early_stopping_rounds=10,
            verbose=1
        )
        
        print("\nBest parameters found:")
        for param, value in self.best_params.items():
            print(f"{param}: {value}")
        
        return self
    
    def predict(self, X_test: pd.DataFrame):
        if self.model is None:
            raise ValueError("Model not trained yet")
        return self.model.predict(X_test)
    
    def predict_proba(self, X_test: pd.DataFrame):
        if self.model is None:
            raise ValueError("Model not trained yet")
        return self.model.predict_proba(X_test)

class RandomForestModelTrainer(BaseModelTrainer):
    def __init__(self):
        super().__init__()
        self.param_grid = {
            'n_estimators': [100, 200, 300],
            'max_depth': [10, 20, 30, None],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'max_features': ['sqrt', 'log2'],
            'class_weight': ['balanced', 'balanced_subsample']  # Added to handle class imbalance
        }
    
    def train(self, X_train: pd.DataFrame, y_train: pd.Series):
        # Create validation split
        X_train_split, X_valid_split, y_train_split, y_valid_split = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
        )
        
        print("Starting Random Forest hyperparameter tuning...")
        
        grid_search = GridSearchCV(
            estimator=RandomForestClassifier(random_state=42),
            param_grid=self.param_grid,
            cv=5,
            scoring='f1',
            n_jobs=-1,
            verbose=1
        )
        
        grid_search.fit(X_train_split, y_train_split)
        self.model = grid_search.best_estimator_
        self.best_params = grid_search.best_params_
        self.cv_results = grid_search.cv_results_
        
        # Final fit on entire training data with best parameters
        print("\nFitting final Random Forest model with best parameters...")
        self.model = RandomForestClassifier(
            **self.best_params,
            random_state=42
        )
        
        self.model.fit(X_train, y_train)
        
        print("\nBest parameters found:")
        for param, value in self.best_params.items():
            print(f"{param}: {value}")
        
        return self

class LogisticRegressionModelTrainer(BaseModelTrainer):
    def __init__(self):
        super().__init__()
        self.param_grid = {
            'C': [0.001, 0.01, 0.1, 1, 10, 100],
            'penalty': ['l1', 'l2'],
            'solver': ['liblinear', 'saga'],
            'class_weight': [None, 'balanced']
        }
    
    def train(self, X_train: pd.DataFrame, y_train: pd.Series):
        grid_search = GridSearchCV(
            estimator=LogisticRegression(random_state=42),
            param_grid=self.param_grid,
            cv=5,
            scoring='f1',
            n_jobs=-1,
            verbose=1
        )
        
        grid_search.fit(X_train, y_train)
        self.model = grid_search.best_estimator_
        self.best_params = grid_search.best_params_
        self.cv_results = grid_search.cv_results_
        return self

class JobLibModelSaver(ModelSaver):
    def save(self, model, filepath: str):
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        joblib.dump(model, filepath)
    
    def load(self, filepath: str):
        return joblib.load(filepath)

class CreditRiskAnalyzer:
    """Main class orchestrating the credit risk analysis process"""
    def __init__(self, 
                 preprocessor: DataPreprocessor,
                 feature_engineer: FeatureEngineer,
                 model_trainer: ModelTrainer,
                 model_saver: ModelSaver,
                 resampler: DataResampler = None):
        self.preprocessor = preprocessor
        self.feature_engineer = feature_engineer
        self.model_trainer = model_trainer
        self.model_saver = model_saver
        self.resampler = resampler
        self.feature_columns = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        
    def prepare_data(self, data: pd.DataFrame, target_column: str='Loan_Status', test_size: float=0.2):
        """Prepare data with enhanced feature selection and validation"""
        # Preprocess
        processed_data = self.preprocessor.preprocess(data)

        # Engineer features
        engineered_data = self.feature_engineer.engineer_features(processed_data)

        # Validate features exist
        required_features = [
            'Loan_Amount_Term', 'Credit_History', 'Loan_Income_Ratio',
            'LoanAmount_Log', 'Total_Income_Log',
            'Income_Squared', 'Loan_Amount_Squared', 'Married', 'Dependents'
        ]

        missing_features = [col for col in required_features if col not in engineered_data.columns]
        if missing_features:
            raise ValueError(f"Missing required features: {missing_features}")

        # Define feature columns
        self.feature_columns = [col for col in required_features if col in engineered_data.columns]

        # Prepare features and target
        X = engineered_data[self.feature_columns]
        y = engineered_data[target_column]

        # Print class distribution before split
        print(f"\nClass distribution before split:\n{Counter(y)}")

        # Split data
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=test_size, random_state=42, stratify=y
        )

        # Print class distribution after split
        print(f"\nTraining set class distribution:\n{Counter(self.y_train)}")
        print(f"Test set class distribution:\n{Counter(self.y_test)}")

        # Apply resampling if resampler is provided
        if self.resampler:
            self.X_train, self.y_train = self.resampler.resample(self.X_train, self.y_train)
            print(f"\nClass distribution after resampling:\n{Counter(self.y_train)}")

        return self
    
    
    def compare_resampling_techniques(self) -> pd.DataFrame:
        """Compare different resampling techniques with various models and log class distributions"""
        if self.X_train is None or self.y_train is None:
            raise ValueError("Data not prepared yet. Call prepare_data first.")

        # Log original class distribution
        original_dist = Counter(self.y_train)
        print(f"Original class distribution:\n{original_dist}")

        resamplers = {
            'No Resampling': None,
            'SMOTE': SMOTEResampler(),
            'ADASYN': ADASYNResampler(),
            'RandomUndersampler': RandomUndersampler(),
            'SMOTETomek': SMOTETomekResampler()
        }

        models = {
            'XGBoost': xgb.XGBClassifier(random_state=42),
            'RandomForest': RandomForestClassifier(random_state=42),
            'LogisticRegression': LogisticRegression(random_state=42)
        }

        results = []

        for resampler_name, resampler in resamplers.items():
            X_train, y_train = self.X_train.copy(), self.y_train.copy()

            if resampler:
                X_train, y_train = resampler.resample(X_train, y_train)
                resampled_dist = Counter(y_train)
                print(f"\n{resampler_name} class distribution:\n{resampled_dist}")

            for model_name, model in models.items():
                model.fit(X_train, y_train)
                y_pred = model.predict(self.X_test)

                # Handle zero division in metrics
                results.append({
                    'Resampling': resampler_name,
                    'Model': model_name,
                    'Accuracy': model.score(self.X_test, self.y_test),
                    'F1 Score': f1_score(self.y_test, y_pred, zero_division=0),
                    'Precision': precision_score(self.y_test, y_pred, zero_division=0),
                    'Recall': recall_score(self.y_test, y_pred, zero_division=0),
                    'Samples After Resampling': len(y_train)
                })

        results_df = pd.DataFrame(results)
        self._plot_resampling_comparison(results_df)
        return results_df
    
    def _plot_resampling_comparison(self, results_df: pd.DataFrame):
        """Plot resampling comparison results"""
        plt.figure(figsize=(15, 10))
        
        metrics = ['F1 Score', 'Precision', 'Recall', 'Accuracy']
        for i, metric in enumerate(metrics, 1):
            plt.subplot(2, 2, i)
            sns.barplot(x='Resampling', y=metric, hue='Model', data=results_df)
            plt.title(f'{metric} Comparison')
            plt.xticks(rotation=45, ha='right')
        
        plt.tight_layout()
        plt.show()
    
    def train_model(self):
        """Train the model on resampled data with hyperparameter tuning"""
        if self.X_train is None or self.y_train is None:
            raise ValueError("Data not prepared yet. Call prepare_data first.")
        
        # Apply resampling first
        print("Applying SMOTETomek resampling...")
        X_resampled, y_resampled = self.resampler.resample(self.X_train, self.y_train)
        
        # Print class distribution after resampling
        print("\nClass distribution after resampling:")
        print(Counter(y_resampled))
        
        # Train model with hyperparameter tuning on resampled data
        print("\nPerforming hyperparameter tuning on resampled data...")
        self.model_trainer.train(X_resampled, y_resampled)
        
        # Print best parameters
        print("\nBest parameters found:")
        print(self.model_trainer.get_best_params())
        
        # Evaluate on test set
        y_pred = self.model_trainer.predict(self.X_test)
        metrics = {
            'Accuracy': accuracy_score(self.y_test, y_pred),
            'F1 Score': f1_score(self.y_test, y_pred),
            'Precision': precision_score(self.y_test, y_pred),
            'Recall': recall_score(self.y_test, y_pred)
        }
        print("\nTest set performance metrics:")
        for metric, value in metrics.items():
            print(f"{metric}: {value:.4f}")
        
        return self
    
    def evaluate_model(self):
        """Evaluate the trained model"""
        if not hasattr(self.model_trainer, 'model') or self.model_trainer.model is None:
            raise ValueError("Model not trained yet")

        y_pred = self.model_trainer.predict(self.X_test)
        y_pred_proba = self.model_trainer.model.predict_proba(self.X_test)[:, 1]  # Probabilities for ROC-AUC

        # Calculate metrics
        metrics = {
            'accuracy': self.model_trainer.model.score(self.X_test, self.y_test),
            'f1': f1_score(self.y_test, y_pred),
            'precision': precision_score(self.y_test, y_pred),
            'recall': recall_score(self.y_test, y_pred)
        }

        # Confusion Matrix
        cm = confusion_matrix(self.y_test, y_pred)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=self.model_trainer.model.classes_)
        disp.plot(cmap='Blues')
        plt.title('Confusion Matrix')
        plt.show()

        # ROC Curve and AUC
        fpr, tpr, _ = roc_curve(self.y_test, y_pred_proba)
        roc_auc = auc(fpr, tpr)
        plt.figure()
        plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC Curve (AUC = {roc_auc:.2f})')
        plt.plot([0, 1], [0, 1], color='gray', linestyle='--', lw=1)
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc='lower right')
        plt.grid()
        plt.show()

        # Add ROC-AUC to metrics
        metrics['roc_auc'] = roc_auc

        # Feature Importance using SHAP
        import shap
        explainer = shap.TreeExplainer(self.model_trainer.model)
        shap_values = explainer.shap_values(self.X_test)

        # Summary plot
        shap.summary_plot(shap_values, self.X_test, plot_type="bar")
        plt.title('Feature Importance (SHAP)')
        plt.show()

        # Feature Interpretation using LIME
        from lime.lime_tabular import LimeTabularExplainer
        lime_explainer = LimeTabularExplainer(
            training_data=self.X_test.values,
            feature_names=self.X_test.columns,
            class_names=self.model_trainer.model.classes_,
            mode='classification'
        )
        # Explain a single prediction (example: first instance in test set)
        i = 0 
        lime_exp = lime_explainer.explain_instance(
            data_row=self.X_test.iloc[i].values,
            predict_fn=self.model_trainer.model.predict_proba
        )
        lime_exp.show_in_notebook(show_table=True)
        lime_exp.save_to_file('lime_explanation.html')

        return metrics
    
    
    def save_model(self, filepath: str):
        """Save the trained model"""
        if not hasattr(self.model_trainer, 'model') or self.model_trainer.model is None:
            raise ValueError("Model not trained yet. Call train_model() first.")
        
        print(f"Saving model to {filepath}...")
        self.model_saver.save(self.model_trainer.model, filepath)
        print("Model saved successfully!")
        return self
    
    def load_model(self, filepath: str):
        """Load a trained model"""
        loaded_model = self.model_saver.load(filepath)
        self.model_trainer.model = loaded_model
        return self
    
    def predict_new_data(self, new_data: pd.DataFrame) -> pd.DataFrame:
        """Make predictions on new data"""
        # Preprocess new data
        processed_data = self.preprocessor.preprocess(new_data)
        engineered_data = self.feature_engineer.engineer_features(processed_data)
        
        # Select features
        X_new = engineered_data[self.feature_columns]
        
        # Make predictions
        predictions = self.model_trainer.predict(X_new)
        probabilities = self.model_trainer.predict_proba(X_new)[:, 1]
        
        # Prepare results
        results = new_data.copy()
        results['Predicted_Loan_Status'] = predictions
        results['Loan_Approval_Probability'] = probabilities
        
        return results
    
def main():
    # Initialize components
    preprocessor = CreditRiskPreprocessor()
    feature_engineer = CreditRiskFeatureEngineer()
    resampler = SMOTETomekResampler()
    xgb_trainer = XGBoostModelTrainer()
    rf_trainer = RandomForestModelTrainer()
    model_saver = JobLibModelSaver()

    # Create two analyzers, one for each model
    xgb_analyzer = CreditRiskAnalyzer(
        preprocessor=preprocessor,
        feature_engineer=feature_engineer,
        model_trainer=xgb_trainer,
        model_saver=model_saver,
        resampler=resampler
    )

    rf_analyzer = CreditRiskAnalyzer(
        preprocessor=preprocessor,
        feature_engineer=feature_engineer,
        model_trainer=rf_trainer,
        model_saver=model_saver,
        resampler=resampler
    )

    # Load and prepare training data
    train_data = pd.read_csv('credit-worthiness-prediction/train.csv')

    # Train and evaluate XGBoost
    print("\n=== Training and Evaluating XGBoost Model ===")
    xgb_analyzer.prepare_data(train_data)
    xgb_analyzer.train_model()
    xgb_metrics = xgb_analyzer.evaluate_model()
    print("\nXGBoost Model Metrics:")
    for metric, value in xgb_metrics.items():
        print(f"{metric}: {value}")

    # Train and evaluate Random Forest
    print("\n=== Training and Evaluating Random Forest Model ===")
    rf_analyzer.prepare_data(train_data)
    rf_analyzer.train_model()
    rf_metrics = rf_analyzer.evaluate_model()
    print("\nRandom Forest Model Metrics:")
    for metric, value in rf_metrics.items():
        print(f"{metric}: {value}")

    # Save both models
    xgb_analyzer.save_model('models/xgboost_model.joblib')
    rf_analyzer.save_model('models/random_forest_model.joblib')

    # Make predictions on test data
    test_data = pd.read_csv('credit-worthiness-prediction/test.csv')
    
    # Get predictions from both models
    xgb_predictions = xgb_analyzer.predict_new_data(test_data)
    rf_predictions = rf_analyzer.predict_new_data(test_data)

    # Combine predictions
    combined_predictions = pd.DataFrame({
        'xgboost_predictions': xgb_predictions['loan_status'],
        'random_forest_predictions': rf_predictions['loan_status']
    })
    
    combined_predictions.to_csv('predicted_loan_status.csv', index=False)
    print("\nPredictions saved to 'predicted_loan_status.csv'")

if __name__ == "__main__":
    main()


Class distribution before split:
Counter({1: 4913, 0: 985})

Training set class distribution:
Counter({1: 3930, 0: 788})
Test set class distribution:
Counter({1: 983, 0: 197})

Class distribution after resampling:
Counter({0: 3129, 1: 3129})
Training model on resampled data with hyperparameter tuning...
Applying SMOTETomek resampling...

Class distribution after resampling:
Counter({0: 3037, 1: 3037})

Performing hyperparameter tuning on resampled data...
Starting XGBoost hyperparameter tuning...
Calculated scale_pos_weight: 1.0
Fitting 5 folds for each of 1728 candidates, totalling 8640 fits
