In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import cross_val_score
from sklearn.impute import SimpleImputer
import joblib

class LoanStatusModel:
    def __init__(self,data_path):
        self.data_path = data_path
        self.data = None
        self.X = None
        self.y = None
        self.model_log_reg = LogisticRegression(random_state=42)
        self.model_rf = RandomForestClassifier(random_state=42)
        self.preprocessor = None
        self.X_processed = None

    def load(self):
        """Load the dataset."""
        self.data = pd.read_excel(self.data_path)

        print(f"Data Loaded with shape: {self.data.shape}")


    def preprocess(self):
        """Preprocess the data."""
        # Separate features and target
        X = self.data.drop(columns=['loan_status', 'customer_id'])
        y = self.data['loan_status']

        # Extract time features from 'transaction_date' if it exists
        if 'transaction_date' in X.columns:
            X['transaction_year'] = pd.to_datetime(X['transaction_date']).dt.year
            X['transaction_month'] = pd.to_datetime(X['transaction_date']).dt.month
            X['transaction_day'] = pd.to_datetime(X['transaction_date']).dt.day
            X = X.drop(columns=['transaction_date'])  # Drop original column

        # Convert 'term' into a numerical feature by extracting the number of months
        X['term'] = X['term'].apply(lambda x: int(x.split()[0]))  # Extract numeric term

        # Handle missing values and categorical features
        numerical_features = X.select_dtypes(include=['int64', 'float64']).columns
        categorical_features = X.select_dtypes(include=['object']).columns

        # Remove 'term' from categorical features list as it's now numerical
        categorical_features = [col for col in categorical_features if col != 'term']

        # Numerical transformer: handle missing values and scale numerical features
        numerical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='mean')),
            ('scaler', StandardScaler())
        ])

        # Categorical transformer: handle missing values and apply One-Hot Encoding
        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
            ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
        ])

        # Apply transformations
        self.preprocessor = ColumnTransformer(
            transformers=[
                ('num', numerical_transformer, numerical_features),
                ('cat', categorical_transformer, categorical_features)
            ]
        )

        self.X_processed = self.preprocessor.fit_transform(X)
        self.X = self.X_processed
        self.y = y
        print(f"Preprocessing complete. Processed data shape: {self.X.shape}")

    def train(self):
        """Train both models."""
        # Split data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, test_size=0.3, random_state=42)

        # Train Logistic Regression model
        self.model_log_reg.fit(X_train, y_train)
        print("Logistic Regression Model Trained.")

        # Train Random Forest model
        self.model_rf.fit(X_train, y_train)
        print("Random Forest Model Trained.")

        # Save models to disk
        joblib.dump(self.model_log_reg, 'log_reg_model.pkl')
        joblib.dump(self.model_rf, 'rf_model.pkl')

    def test(self):
        """Test the models and generate evaluation summary."""
        # Split data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, test_size=0.3, random_state=42)

        # Predict with Logistic Regression
        y_pred_log_reg = self.model_log_reg.predict(X_test)
        log_reg_accuracy = accuracy_score(y_test, y_pred_log_reg)
        print(f"Logistic Regression Accuracy: {log_reg_accuracy}")

        # Predict with Random Forest
        y_pred_rf = self.model_rf.predict(X_test)
        rf_accuracy = accuracy_score(y_test, y_pred_rf)
        print(f"Random Forest Accuracy: {rf_accuracy}")


    def predict(self, new_data):
        """Predict the loan status for new data."""
        # Preprocess new data
        X_new_processed = self.preprocessor.transform(new_data)

        # Predict with both models
        pred_log_reg = self.model_log_reg.predict(X_new_processed)
        pred_rf = self.model_rf.predict(X_new_processed)

        return pred_log_reg, pred_rf

if __name__ == "__main__":
    # Initialize model
    model = LoanStatusModel('train_data.xlsx')

    # Load data
    model.load()

    # Preprocess data
    model.preprocess()

    # Train models
    model.train()

    # Test models
    model.test()

    # Load test data
    test_data_path = "./test_data.xlsx"
    test_data = pd.read_excel(test_data_path)

    # Preprocess the test data to align with training data preprocessing
    if 'transaction_date' in test_data.columns:
        test_data['transaction_year'] = pd.to_datetime(test_data['transaction_date']).dt.year
        test_data['transaction_month'] = pd.to_datetime(test_data['transaction_date']).dt.month
        test_data['transaction_day'] = pd.to_datetime(test_data['transaction_date']).dt.day
        test_data = test_data.drop(columns=['transaction_date'])  # Drop original column

    # Ensure 'term' is converted to numerical
    if 'term' in test_data.columns:
        test_data['term'] = test_data['term'].apply(lambda x: int(x.split()[0]))

    # Drop other unused columns
    test_data = test_data.drop(columns=['customer_id'], errors='ignore')

    # Preprocess the test data
    X_test_processed = model.preprocessor.transform(test_data)

    # Predict using both models
    preds_log_reg = model.model_log_reg.predict(X_test_processed)
    preds_rf = model.model_rf.predict(X_test_processed)

    # Display predictions
    print(f"Predictions from Logistic Regression: {preds_log_reg}")
    print(f"Predictions from Random Forest: {preds_rf}")


Data Loaded with shape: (113705, 17)
Preprocessing complete. Processed data shape: (113705, 57)
Logistic Regression Model Trained.
Random Forest Model Trained.
Logistic Regression Accuracy: 0.7646282833020638
Random Forest Accuracy: 0.7602310037523452
Predictions from Logistic Regression: [1 1 0 ... 1 1 1]
Predictions from Random Forest: [1 1 0 ... 0 1 1]
