In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from scipy.stats import pointbiserialr
from sklearn.model_selection import train_test_split

In [None]:
def OutlierDetector(data):
    '''Detects outliers in a given pandas Series using the IQR method.
    Args:
        data (pd.Series): The input data series to check for outliers.
    Returns:
        pd.Series: A series containing the outliers.
    Raises:
        ValueError: If the input data is not a pandas Series.   
    
    '''
    
    q1 = data.quantile(0.25)
    q3 = data.quantile(0.75)
    iqr = q3 - q1
    lower, upper = q1 - 1.5*iqr, q3 + 1.5*iqr
    return data[(data < lower) | (data > upper)]


In [None]:
def DefaultFlagGenerator(df):
    """Generates a default flag based on multiple financial stress indicators.
    Args:
        df (pd.DataFrame): DataFrame containing financial indicators.
            Must include the following columns:
            - 'payment_delinquency_count'
            - 'over_indebtedness_flag'
            - 'financial_stress_score'
            - 'bnpl_debt_ratio'
            - 'credit_limit_utilisation'
    Returns:
        pd.DataFrame: The original DataFrame with an additional column 'default_flag'.
            This column is 1 if the customer is likely to default, 0 otherwise.
    """
    default_flag = (
        (df['payment_delinquency_count'] >= 3).astype(int) +
        (df['over_indebtedness_flag'] == 1).astype(int) +
        (df['financial_stress_score'] >= 9).astype(int) +
        (df['bnpl_debt_ratio'] >= 1.8).astype(int) +
        (df['credit_limit_utilisation'] >= 95).astype(int)
        ) >= 3 # Must meet at least 3 of the 5 conditions
    df['default_flag'] = default_flag.astype(int)
    return df

In [None]:
def binary_col(data):
    """Identifies binary columns in a DataFrame.
    Args:
        data (pd.DataFrame): The input DataFrame to check for binary columns.
        Returns:
        list: A list of column names that are binary (i.e., have exactly two unique values).
        Raises:
            ValueError: If the input data is not a pandas DataFrame.


    """
    binary_cols=[col for col in data.columns if data[col].nunique() == 2]
    return binary_cols

In [None]:
def continuous_col(data):
    """ Identifies continuous columns in a DataFrame.
    Args:
        data (pd.DataFrame): The input DataFrame to check for continuous columns.
    Returns:
        list: A list of column names that are continuous (i.e., have more than two unique values).
    Raises:
        ValueError: If the input data is not a pandas DataFrame.
    """
    continuous_cols=[col for col in data.columns if data[col].nunique() > 2]
    return continuous_cols
    

In [None]:
def Scaler(data):

    """Scales continuous columns in a DataFrame using StandardScaler.
    Args:
        data (pd.DataFrame): The input DataFrame to scale.
    Returns:
        pd.DataFrame: A DataFrame with binary columns unchanged and continuous columns scaled.
    Raises:
        ValueError: If the input data is not a pandas DataFrame.
    """

    binary_cols = binary_col(data)
    continuous_cols = continuous_col(data)
    scaler = StandardScaler()
   # Scale the continuous columns
    scaled_continuous = scaler.fit_transform(data[continuous_cols])
    scaled_continuous_df = pd.DataFrame(scaled_continuous, columns=continuous_cols)
    # Combine the scaled continuous columns with the binary columns
    result = pd.concat([data[binary_cols].reset_index(drop=True), scaled_continuous_df.reset_index(drop=True)], axis=1)
    return result

In [None]:
def read_csv(file_path: str) -> pd.DataFrame:
    """
    Loads a CSV from the given path and prints its .info().
    Returns the loaded DataFrame.
    """
    df = pd.read_csv(file_path)
    df.info()
    return df

In [None]:
data = pd.read_csv('Tables/BNPL.csv')
data = data.select_dtypes(include=['number'])

In [None]:
# Detect outliers in each feature column
for column in data.select_dtypes(include=['float64', 'int64']).columns:
    outliers = OutlierDetector(data[column])
    print(f"Outliers in {column}:\n", outliers)


In [None]:
# Generate proxy default flag
data_clean = DefaultFlagGenerator(data)
data_clean

In [1]:
# Scale the data
data_scaled = Scaler(data_clean)
data_scaled.to_csv('Tables/bnpl_scaled.csv', index=False)

NameError: name 'Scaler' is not defined

In [None]:
# Identify the continuous and binary columns
continuous_cols = continuous_col(data_scaled)
binary_cols = binary_col(data_scaled)

In [None]:
#check for outliers in each column
for column in data_scaled.select_dtypes(include=['float64', 'int64']).columns:
    outliers = OutlierDetector(data_scaled[column])
    if not outliers.empty:
        print(f"Outliers in {column}:\n", outliers)
    else:
        print(f"No outliers detected in {column}.")

In [None]:
#get the correlation value between BNPL usage frequency and over-indebtedness
correlation_matrix = data_scaled.corr()
correlation_value = correlation_matrix.loc['bnpl_usage_frequency', 'over_indebtedness_flag']
print(f"Correlation between BNPL usage frequency and over-indebtedness: {correlation_value}")

In [None]:
# Check for outliers in the scaled data by splitting the data into two parts:
# one with default_flag = 0 and one with default_flag = 1

outlier_indices = []
data_default_0 = data_scaled[data_scaled['default_flag'] == 0]
data_default_1 = data_scaled[data_scaled['default_flag'] == 1]

for column in continuous_cols:
    outliers_0 = OutlierDetector(data_default_0[column])
    outliers_1 = OutlierDetector(data_default_1[column])
    
    # Add indices of outliers to the list
    outlier_indices.extend(outliers_0.index.tolist())
    outlier_indices.extend(outliers_1.index.tolist())
    
    if not outliers_0.empty:
        print(f"Outliers in {column} for default_flag = 0:\n", outliers_0)
    else:
        print(f"No outliers detected in {column} for default_flag = 0.")
    
    if not outliers_1.empty:
        print(f"Outliers in {column} for default_flag = 1:\n", outliers_1)
    else:
        print(f"No outliers detected in {column} for default_flag = 1.")

# Print the list of outlier indices
print("Outlier indices:", outlier_indices)
print("Total number of outliers detected:", len(outlier_indices))

In [None]:
#drop the outliers unwanted features from the data_scaled dataframe
data_scaled_cleaned = data_scaled.drop(index=outlier_indices)
data_scaled_cleaned = data_scaled_cleaned.drop(columns=['over_indebtedness_flag'])
data_scaled_cleaned.to_csv('Tables/bnpl_scaled_cleaned.csv', index=False)
data_scaled_cleaned.info()

In [None]:
binary_cols = binary_col(data_scaled_cleaned)

Feature Engineering


In [None]:
data_scaled_cleaned

In [None]:
# Add the feature engineering stress_usage_interaction = financial_stress_score × bnpl_usage_frequency
data_scaled_cleaned['stress_usage_interaction'] = (
    data_scaled_cleaned['financial_stress_score'] * data_scaled_cleaned['bnpl_usage_frequency']
)
data_engineered = data_scaled_cleaned.copy()
data_engineered.to_csv('Tables/bnpl_engineered.csv', index=False)


In [None]:
#get the mean and std of the new feature to ensure standardisation
new_columns = [
    'stress_usage_interaction',
]
mean_std = data_engineered[new_columns].agg(['mean', 'std'])
print("Mean and Standard Deviation of New Columns:")
print(mean_std)


In [None]:
# check for outliers in the new feature
new_outlier_indices = []
for column in new_columns:
    outliers = OutlierDetector(data_engineered[column])
    new_outlier_indices.extend(outliers.index.tolist())
    if not outliers.empty:
        print(f"Outliers in {column}:\n", outliers)
    else:
        print(f"No outliers detected in {column}.")

# Print the list of outlier indices
print("New outlier indices:", new_outlier_indices)

In [None]:
# remove any duplicate outlier indices and sort them
new_outlier_indices = sorted(set(new_outlier_indices))
print("Unique sorted new outlier indices:", new_outlier_indices)
len(new_outlier_indices)


In [None]:
# Calculate point-biserial correlation for new features with default_flag
results = []
for feat in new_columns:
    r, p = pointbiserialr(data_scaled_cleaned[feat], data_scaled_cleaned['default_flag'])
    results.append({'feature': feat, 'r': r, 'p_value': p})

corr_df = pd.DataFrame(results).sort_values('r', key=abs, ascending=False)
print(corr_df)

Day 6

In [None]:
final_data = pd.read_csv('Tables/bnpl_engineered.csv', index_col=0)

# 1) split off test set
train_val, test = train_test_split(
    final_data,
    test_size=0.20,
    stratify=final_data['default_flag'],
    random_state=42
)


In [None]:
# 2) split train vs. validation from the remaining 80%
train, val = train_test_split(
    train_val,
    test_size=0.25,                   
    stratify=train_val['default_flag'],
    random_state=42
)


In [None]:

print("Default rates:",
      train['default_flag'].mean(),
      val['default_flag'].mean(),
      test['default_flag'].mean())


In [None]:
train_idx = train.index.to_list()
val_idx   = val.index.to_list()
test_idx  = test.index.to_list()
#print the indices and the number of indices in each set
print(f"Train indices: {train_idx[:10]}... ({len(train_idx)} total)")
print(f"Validation indices: {val_idx[:10]}... ({len(val_idx)} total)")
print(f"Test indices: {test_idx[:10]}... ({len(test_idx)} total)")


In [None]:
#create 3 csv files for train, val and test sets
train.to_csv('Tables/bnpl_train.csv', index=False)
val.to_csv('Tables/bnpl_val.csv', index=False)
test.to_csv('Tables/bnpl_test.csv', index=False)

print(f"Train set shape: {train.shape}")
print(f"Validation set shape: {val.shape}")
print(f"Test set shape: {test.shape}")