<a href="https://colab.research.google.com/github/BelayAbAb/AI-Powered-Credit-Scoring-Model-/blob/Refactor-Codebase-for-Modularity-and-Maintainability/Risk%20Probability%20and%20Credit%20Score%20Mapping_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Colab notebooks execute code on Google's cloud servers, meaning you can leverage the power of Google hardware, including [GPUs and TPUs](#using-accelerated-hardware), regardless of the power of your machine. All you need is a browser.

For example, if you find yourself waiting for **pandas** code to finish running and want to go faster, you can switch to a GPU Runtime and use libraries like [RAPIDS cuDF](https://rapids.ai/cudf-pandas) that provide zero-code-change acceleration.

In [None]:
# Step 1: Install necessary dependencies
!pip install -q seaborn matplotlib scikit-learn gdown

# Step 2: Import necessary libraries
from mpl_toolkits.mplot3d import Axes3D
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import seaborn as sns
import gdown  # This will allow downloading from Google Drive

# Step 3: Download the file using the shared Google Drive link

# Shared link: https://drive.google.com/file/d/1OpwfxIO8aeDDsdSEgGrnyn1F6MFw4O6x/view?usp=sharing
# Extract the file ID from the link (the ID is the part between /d/ and /view)
file_id = '1OpwfxIO8aeDDsdSEgGrnyn1F6MFw4O6x'

# Construct the download URL
download_url = f'https://drive.google.com/uc?export=download&id={file_id}'

# Download the file using gdown
gdown.download(download_url, 'data.csv', quiet=False)

# Step 4: Load the CSV file from the local directory
df = pd.read_csv('data.csv')  # The file will be downloaded to the current directory
df.dataframeName = 'data.csv'

# Step 5: Check the shape of the data
nRow, nCol = df.shape
print(f'There are {nRow} rows and {nCol} columns in {df.dataframeName}')

# Step 6: Take a quick look at the data
print(df.head(5))

# Step 7: Exploratory Data Analysis (EDA)

# Filter out non-numeric columns for correlation and other numerical operations
numeric_df = df.select_dtypes(include=[np.number])

# Define output directory in Google Drive
output_dir = '/content/drive/MyDrive/1fINHoR_jYkPkHB-7HPm1fxQqIxeFOKnR/EDA_Results/'
os.makedirs(output_dir, exist_ok=True)

# Distribution of numeric columns - Save the plot to Google Drive
def plotPerColumnDistribution(df, nRows, nCols):
    df.hist(figsize=(nRows, nCols))
    plt.savefig(os.path.join(output_dir, 'distribution_plot.png'))
    plt.close()

plotPerColumnDistribution(numeric_df, 10, 5)

# Correlation matrix of numeric columns - Save the plot to Google Drive
def plotCorrelationMatrix(df, nRows):
    corr = df.corr()
    plt.figure(figsize=(nRows, nRows))
    sns.heatmap(corr, annot=True, cmap='coolwarm')
    plt.savefig(os.path.join(output_dir, 'correlation_matrix.png'))
    plt.close()

plotCorrelationMatrix(numeric_df, 8)

# Scatter and density plots (only for numeric columns) - Save the plot to Google Drive
def plotScatterMatrix(df, nRows, nCols):
    sns.pairplot(df, height=2.5)
    plt.savefig(os.path.join(output_dir, 'scatter_matrix.png'))
    plt.close()

plotScatterMatrix(numeric_df, 12, 10)

# Save the processed data as CSV to Google Drive
output_csv_path = os.path.join(output_dir, 'processed_data.csv')
df.to_csv(output_csv_path, index=False)

# Conclusion message
print(f"All output files have been saved to: {output_dir}")
print("This concludes the exploratory data analysis!")


Downloading...
From: https://drive.google.com/uc?export=download&id=1OpwfxIO8aeDDsdSEgGrnyn1F6MFw4O6x
To: /content/data.csv
100%|██████████| 17.4M/17.4M [00:00<00:00, 211MB/s]


There are 95662 rows and 16 columns in data.csv
         TransactionId         BatchId       AccountId       SubscriptionId  \
0  TransactionId_76871   BatchId_36123  AccountId_3957   SubscriptionId_887   
1  TransactionId_73770   BatchId_15642  AccountId_4841  SubscriptionId_3829   
2  TransactionId_26203   BatchId_53941  AccountId_4229   SubscriptionId_222   
3    TransactionId_380  BatchId_102363   AccountId_648  SubscriptionId_2185   
4  TransactionId_28195   BatchId_38780  AccountId_4841  SubscriptionId_3829   

        CustomerId CurrencyCode  CountryCode    ProviderId     ProductId  \
0  CustomerId_4406          UGX          256  ProviderId_6  ProductId_10   
1  CustomerId_4406          UGX          256  ProviderId_4   ProductId_6   
2  CustomerId_4683          UGX          256  ProviderId_6   ProductId_1   
3   CustomerId_988          UGX          256  ProviderId_1  ProductId_21   
4   CustomerId_988          UGX          256  ProviderId_4   ProductId_6   

      ProductCategor

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import gdown
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    confusion_matrix,
    ConfusionMatrixDisplay
)
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import warnings
import os

# Suppress specific warnings
warnings.filterwarnings("ignore")

# Step 1: Download the dataset from Google Drive
file_id = '1OpwfxIO8aeDDsdSEgGrnyn1F6MFw4O6x'
download_url = f'https://drive.google.com/uc?export=download&id={file_id}'
gdown.download(download_url, 'data.csv', quiet=False)

# Step 2: Load the dataset
data = pd.read_csv('data.csv')

# Check for missing values and data types
print("Data types:\n", data.dtypes)
print("Missing values:\n", data.isnull().sum())

# Step 3: Split the data into features (X) and target (y)
# We assume 'FraudResult' is the target variable for binary classification
X = data.drop(columns=['FraudResult', 'TransactionId', 'BatchId', 'AccountId', 'SubscriptionId', 'CustomerId', 'TransactionStartTime'])
y = data['FraudResult']

# Step 4: Split the data into training (70%) and testing (30%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# Step 5: Define preprocessing steps for numerical and categorical features
numerical_features = X_train.select_dtypes(include=['float64', 'int64']).columns.tolist()
categorical_features = X_train.select_dtypes(include=['object']).columns.tolist()

print("Numerical features:", numerical_features)
print("Categorical features:", categorical_features)

# Create a ColumnTransformer for preprocessing
preprocessor = ColumnTransformer(
    transformers=[('num', StandardScaler(), numerical_features),
                  ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ])

# Step 6: Define models with increased max_iter for Logistic Regression
models = {
    'Logistic Regression': LogisticRegression(max_iter=5000),  # Increased max_iter
    'Random Forest': RandomForestClassifier(),
    'Gradient Boosting': GradientBoostingClassifier()
}

# Hyperparameter tuning parameters
param_grid = {
    'Logistic Regression': {
        'model__C': [0.01, 0.1, 1, 10, 100],
        'model__solver': ['liblinear', 'lbfgs', 'saga', 'newton-cg']
    },
    'Random Forest': {
        'model__n_estimators': [50, 100, 200],
        'model__max_depth': [None, 10, 20, 30],
        'model__min_samples_split': [2, 5, 10]
    },
    'Gradient Boosting': {
        'model__n_estimators': [50, 100, 200],
        'model__learning_rate': [0.01, 0.1, 0.2],
        'model__max_depth': [3, 5, 7]
    }
}

# Step 7: Train and evaluate each model
results = {}

for model_name, model in models.items():
    # Create a pipeline with preprocessing and model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                                ('model', model)])

    # Perform hyperparameter tuning
    if model_name == 'Logistic Regression':
        grid_search = GridSearchCV(pipeline, param_grid[model_name], cv=5, scoring='f1', verbose=1)
        grid_search.fit(X_train, y_train)
    else:
        grid_search = RandomizedSearchCV(pipeline, param_grid[model_name], n_iter=10, cv=5, scoring='f1', random_state=42)
        grid_search.fit(X_train, y_train)

    # Store the best model and its score
    best_model = grid_search.best_estimator_
    results[model_name] = {
        'best_model': best_model,
        'best_score': grid_search.best_score_,
    }

    # Evaluate on the test set
    y_pred = best_model.predict(X_test)
    results[model_name]['accuracy'] = accuracy_score(y_test, y_pred)
    results[model_name]['precision'] = precision_score(y_test, y_pred)
    results[model_name]['recall'] = recall_score(y_test, y_pred)
    results[model_name]['f1_score'] = f1_score(y_test, y_pred)
    results[model_name]['roc_auc'] = roc_auc_score(y_test, best_model.predict_proba(X_test)[:, 1])

    # Step 8: Confusion Matrix Visualization
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Not Fraud', 'Fraud'])

    # Save confusion matrix plot
    output_folder = '/content/drive/MyDrive/your_folder_path_here/'
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    cm_output_path = os.path.join(output_folder, f"cm_{model_name}.jpg")
    disp.plot(cmap=plt.cm.Blues)
    plt.title(f'Confusion Matrix for {model_name}')
    plt.savefig(cm_output_path)
    plt.close()

# Step 9: Prepare data for plotting
metric_names = ['Best CV Score', 'Accuracy', 'Precision', 'Recall', 'F1 Score', 'ROC AUC']
metric_values = {model: [results[model]['best_score'],
                         results[model]['accuracy'],
                         results[model]['precision'],
                         results[model]['recall'],
                         results[model]['f1_score'],
                         results[model]['roc_auc']] for model in models.keys()}

# Plotting
fig, ax = plt.subplots(figsize=(10, 6))
width = 0.15  # Width of the bars
x = np.arange(len(metric_names))  # the label locations

for i, model in enumerate(models.keys()):
    ax.bar(x + i * width, metric_values[model], width, label=model)

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_ylabel('Scores')
ax.set_title('Model Training and Evaluation Metrics')
ax.set_xticks(x + width / 2)
ax.set_xticklabels(metric_names)
ax.legend()

# Save the plot as JPG in the specified local folder
output_plot_path = os.path.join(output_folder, "model_evaluation_metrics.jpg")
plt.tight_layout()
plt.savefig(output_plot_path)
plt.close()

# Step 10: Save the results to a CSV file
output_results_path = os.path.join(output_folder, "model_results.csv")
results_df = pd.DataFrame.from_dict(results, orient='index')
results_df.to_csv(output_results_path)

print("Model evaluation metrics and results saved successfully.")


Downloading...
From: https://drive.google.com/uc?export=download&id=1OpwfxIO8aeDDsdSEgGrnyn1F6MFw4O6x
To: /content/data.csv
100%|██████████| 17.4M/17.4M [00:00<00:00, 110MB/s] 


Data types:
 TransactionId            object
BatchId                  object
AccountId                object
SubscriptionId           object
CustomerId               object
CurrencyCode             object
CountryCode               int64
ProviderId               object
ProductId                object
ProductCategory          object
ChannelId                object
Amount                  float64
Value                     int64
TransactionStartTime     object
PricingStrategy           int64
FraudResult               int64
dtype: object
Missing values:
 TransactionId           0
BatchId                 0
AccountId               0
SubscriptionId          0
CustomerId              0
CurrencyCode            0
CountryCode             0
ProviderId              0
ProductId               0
ProductCategory         0
ChannelId               0
Amount                  0
Value                   0
TransactionStartTime    0
PricingStrategy         0
FraudResult             0
dtype: int64
Numerical featu

In [8]:
import pandas as pd
import numpy as np

# Mock dataset for testing (you should adapt it to your actual data)
np.random.seed(42)
df_cleaned = pd.DataFrame({
    'ProductCategory': np.random.choice(['A', 'B', 'C', 'D'], size=1000),
    'Recency': np.random.randint(1, 365, size=1000),  # Days since last purchase
    'Frequency': np.random.randint(1, 50, size=1000),  # Number of transactions
    'Monetary': np.random.randint(100, 5000, size=1000),  # Monetary value of transactions
})

# Mock binary target variable for demonstration (1 for good, 0 for bad)
df_cleaned['Default'] = np.random.choice([0, 1], size=len(df_cleaned), p=[0.7, 0.3])

# Function to calculate WoE for categorical features
def calculate_woe(data, target, feature):
    """
    Function to calculate Weight of Evidence (WoE) for categorical variables.

    Caution: Ensure that the feature has enough data points per category.
    Small sample sizes for some categories can lead to unreliable WoE values.
    """
    # Create a DataFrame for WoE calculation
    woe_df = data.groupby(feature)[target].agg(['count', 'sum']).reset_index()
    woe_df.columns = [feature, 'Total', 'Good']

    # Calculate Bad
    woe_df['Bad'] = woe_df['Total'] - woe_df['Good']

    # Calculate proportions
    total_good = woe_df['Good'].sum()
    total_bad = woe_df['Bad'].sum()

    # Calculate WoE
    woe_df['Good_Percentage'] = woe_df['Good'] / total_good
    woe_df['Bad_Percentage'] = woe_df['Bad'] / total_bad
    woe_df['WoE'] = np.log(woe_df['Good_Percentage'] / woe_df['Bad_Percentage']).replace([-np.inf, np.inf], 0)

    return woe_df[[feature, 'WoE']]

# Function to apply WoE binning for continuous features (e.g., Recency, Frequency, Monetary)
def binning_woe(data, target, feature, bins=5):
    """
    Function to bin continuous features and calculate WoE for each bin.

    Caution: Binning can be a very subjective process. The choice of the number of bins
    and bin edges can significantly impact the results. Consider domain knowledge when
    defining the binning strategy.
    """
    # Bin continuous variable into intervals (e.g., Recency, Frequency, Monetary)
    data['bin'] = pd.cut(data[feature], bins, right=False)

    # Calculate WoE for each bin
    return calculate_woe(data, target, 'bin')

# Apply WoE Binning to 'ProductCategory' (categorical feature)
woe_product_category = calculate_woe(df_cleaned, 'Default', 'ProductCategory')

# Merge WoE values back to the original DataFrame
df_cleaned = df_cleaned.merge(woe_product_category, on='ProductCategory', how='left')
df_cleaned.rename(columns={'WoE': 'WoE_ProductCategory'}, inplace=True)

# Apply WoE Binning to continuous features: 'Recency', 'Frequency', 'Monetary'
woe_recency = binning_woe(df_cleaned, 'Default', 'Recency')
woe_frequency = binning_woe(df_cleaned, 'Default', 'Frequency')
woe_monetary = binning_woe(df_cleaned, 'Default', 'Monetary')

# Merge WoE values for Recency, Frequency, and Monetary back to the DataFrame
# Keep the 'bin' column for subsequent merges (no drop for the first two merges)
df_cleaned = df_cleaned.merge(woe_recency, left_on='bin', right_on='bin', how='left')  # Do not drop 'bin' yet
df_cleaned.rename(columns={'WoE': 'WoE_Recency'}, inplace=True)

df_cleaned = df_cleaned.merge(woe_frequency, left_on='bin', right_on='bin', how='left')  # Do not drop 'bin' yet
df_cleaned.rename(columns={'WoE': 'WoE_Frequency'}, inplace=True)

# Now, drop 'bin' after the last merge for Monetary (the last step)
df_cleaned = df_cleaned.merge(woe_monetary, left_on='bin', right_on='bin', how='left').drop('bin', axis=1)
df_cleaned.rename(columns={'WoE': 'WoE_Monetary'}, inplace=True)

# Display the first few rows of the modified DataFrame with WoE values
print(df_cleaned[['ProductCategory', 'WoE_ProductCategory', 'Recency', 'WoE_Recency', 'Frequency', 'WoE_Frequency', 'Monetary', 'WoE_Monetary']].head())

# Save the modified DataFrame with WoE values
woe_output_path = r"C:\Users\User\Desktop\woe_data.csv"
df_cleaned.to_csv(woe_output_path, index=False)

print(f"Woe data saved as {woe_output_path}")

# Step 2: Define a Default Estimator (proxy variable) based on the WoE values and RFMS
# Create a composite score from the WoE values for Recency, Frequency, and Monetary
df_cleaned['Risk_Score'] = df_cleaned['WoE_Recency'] + df_cleaned['WoE_Frequency'] + df_cleaned['WoE_Monetary']

# Create a 'Risk' label: High risk (bad) if Risk_Score < threshold, Low risk (good) otherwise
threshold = df_cleaned['Risk_Score'].median()  # Median can be a simple threshold, or use another strategy
df_cleaned['Risk'] = np.where(df_cleaned['Risk_Score'] < threshold, 1, 0)  # 1 for high risk (bad), 0 for low risk (good)

# Display the Risk segmentation
print(df_cleaned[['Recency', 'Frequency', 'Monetary', 'Risk_Score', 'Risk']].head())

# Additional caution about RFMS:
# RFMS (Recency, Frequency, Monetary) is a common approach in marketing and credit scoring.
# Ensure that your RFMS model aligns with industry best practices (e.g., Basel II Capital Accord)
# and consult financial regulations to meet compliance.


  ProductCategory  WoE_ProductCategory  Recency  WoE_Recency  Frequency  \
0               C            -0.020074      145          NaN         29   
1               D             0.091892      201          NaN         12   
2               A            -0.059567      212          NaN         36   
3               C            -0.020074      220          NaN         35   
4               C            -0.020074      240          NaN         12   

   WoE_Frequency  Monetary  WoE_Monetary  
0            NaN      2330      0.072193  
1            NaN      4140     -0.020298  
2            NaN      4683     -0.020298  
3            NaN      3797      0.043083  
4            NaN      2937      0.072193  
Woe data saved as C:\Users\User\Desktop\woe_data.csv
   Recency  Frequency  Monetary  Risk_Score  Risk
0      145         29      2330         NaN     0
1      201         12      4140         NaN     0
2      212         36      4683         NaN     0
3      220         35      3797       

In [9]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import os
import warnings

# Suppress specific warnings
warnings.filterwarnings("ignore")

# Step 1: Load the dataset
df_cleaned = pd.read_csv('data.csv')

# Step 2: Split the data into features (X) and target (y)
X = df_cleaned.drop(columns=['FraudResult', 'TransactionId', 'BatchId', 'AccountId', 'SubscriptionId', 'CustomerId', 'TransactionStartTime'])
y = df_cleaned['FraudResult']

# Step 3: Split the data into training (70%) and testing (30%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# Step 4: Preprocessing
numerical_features = X_train.select_dtypes(include=['float64', 'int64']).columns.tolist()
categorical_features = X_train.select_dtypes(include=['object']).columns.tolist()

# Create a ColumnTransformer for preprocessing
preprocessor = ColumnTransformer(
    transformers=[('num', StandardScaler(), numerical_features),
                  ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ])

# Step 5: Define models
models = {
    'Logistic Regression': LogisticRegression(max_iter=5000),
    'Random Forest': RandomForestClassifier(),
    'Gradient Boosting': GradientBoostingClassifier()
}

# Hyperparameter tuning parameters
param_grid = {
    'Logistic Regression': {
        'model__C': [0.01, 0.1, 1, 10, 100],
        'model__solver': ['liblinear', 'lbfgs', 'saga', 'newton-cg']
    },
    'Random Forest': {
        'model__n_estimators': [50, 100, 200],
        'model__max_depth': [None, 10, 20, 30],
        'model__min_samples_split': [2, 5, 10]
    },
    'Gradient Boosting': {
        'model__n_estimators': [50, 100, 200],
        'model__learning_rate': [0.01, 0.1, 0.2],
        'model__max_depth': [3, 5, 7]
    }
}

# Step 6: Train and evaluate each model
results = {}

for model_name, model in models.items():
    # Create a pipeline with preprocessing and model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                                ('model', model)])

    # Perform hyperparameter tuning using GridSearchCV or RandomizedSearchCV
    if model_name == 'Logistic Regression':
        grid_search = GridSearchCV(pipeline, param_grid[model_name], cv=5, scoring='f1', verbose=1)
        grid_search.fit(X_train, y_train)
    else:
        grid_search = RandomizedSearchCV(pipeline, param_grid[model_name], n_iter=10, cv=5, scoring='f1', random_state=42)
        grid_search.fit(X_train, y_train)

    # Store the best model and its score
    best_model = grid_search.best_estimator_
    results[model_name] = {
        'best_model': best_model,
        'best_score': grid_search.best_score_
    }

    # Evaluate on the test set
    y_pred = best_model.predict(X_test)
    results[model_name]['accuracy'] = accuracy_score(y_test, y_pred)
    results[model_name]['precision'] = precision_score(y_test, y_pred)
    results[model_name]['recall'] = recall_score(y_test, y_pred)
    results[model_name]['f1_score'] = f1_score(y_test, y_pred)
    results[model_name]['roc_auc'] = roc_auc_score(y_test, best_model.predict_proba(X_test)[:, 1])

    # Confusion Matrix Visualization
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Not Fraud', 'Fraud'])

    # Save confusion matrix plot
    output_folder = '/content/drive/MyDrive/your_folder_path_here/'
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    cm_output_path = os.path.join(output_folder, f"cm_{model_name}.jpg")
    disp.plot(cmap=plt.cm.Blues)
    plt.title(f'Confusion Matrix for {model_name}')
    plt.savefig(cm_output_path)
    plt.close()

# Step 7: Risk Probability and Credit Score Mapping

# Get the predicted probabilities for the test set
y_pred_prob = best_model.predict_proba(X_test)[:, 1]  # Probability for the positive class (Fraud/High Risk)

# Categorize customers into risk groups based on probabilities
risk_groups = pd.cut(y_pred_prob, bins=[0, 0.2, 0.4, 0.6, 1.0], labels=['Very Low Risk', 'Low Risk', 'Moderate Risk', 'High Risk'])

# Map risk groups to credit score
# Assuming that higher risk corresponds to a lower credit score
credit_score_mapping = {
    'Very Low Risk': 800,  # Best credit score
    'Low Risk': 700,
    'Moderate Risk': 600,
    'High Risk': 500  # Worst credit score
}

# Map the risk groups to credit scores
credit_scores = risk_groups.map(credit_score_mapping)

# Add the risk group and credit score columns to the dataframe
df_cleaned.loc[X_test.index, 'Risk_Group'] = risk_groups
df_cleaned.loc[X_test.index, 'Credit_Score'] = credit_scores

# Display the first few rows of the data with risk groups and credit scores
print(df_cleaned[['CustomerId', 'Risk_Group', 'Credit_Score']].head())

# Save the data with risk groups and credit scores
output_risk_path = os.path.join(output_folder, 'customer_risk_scores.csv')
df_cleaned[['CustomerId', 'Risk_Group', 'Credit_Score']].to_csv(output_risk_path, index=False)

print(f"Customer risk data with credit scores saved at {output_risk_path}")

# Step 8: Plot the Risk Probability Distribution

plt.figure(figsize=(10, 6))
plt.hist(y_pred_prob, bins=20, color='skyblue', edgecolor='black')
plt.title('Risk Probability Distribution', fontsize=16)
plt.xlabel('Risk Probability', fontsize=14)
plt.ylabel('Frequency', fontsize=14)

# Save the histogram as a .jpg image
output_hist_path = os.path.join(output_folder, 'risk_probability_distribution.jpg')
plt.savefig(output_hist_path, format='jpg', dpi=300)
plt.close()

print(f"Risk probability distribution plot saved at {output_hist_path}")


Fitting 5 folds for each of 20 candidates, totalling 100 fits
        CustomerId     Risk_Group Credit_Score
0  CustomerId_4406            NaN          NaN
1  CustomerId_4406            NaN          NaN
2  CustomerId_4683  Very Low Risk          800
3   CustomerId_988            NaN          NaN
4   CustomerId_988            NaN          NaN
Customer risk data with credit scores saved at /content/drive/MyDrive/your_folder_path_here/customer_risk_scores.csv
Risk probability distribution plot saved at /content/drive/MyDrive/your_folder_path_here/risk_probability_distribution.jpg


In [12]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import os
import warnings

# Suppress specific warnings
warnings.filterwarnings("ignore")

# Step 1: Load the dataset
# Modify the path to your actual file location

# If you're using Google Colab or Jupyter with Google Drive:
df_cleaned = pd.read_csv('/content/drive/MyDrive/your_folder_path_here/data.csv')

# If the file is in the same directory as your script, use:
# df_cleaned = pd.read_csv('data.csv')

# Alternatively, you can specify the full path of the dataset if it's in a different folder:
# df_cleaned = pd.read_csv('/path/to/your/folder/data.csv')

# Step 2: Split the data into features (X) and target (y)
X = df_cleaned.drop(columns=['FraudResult', 'TransactionId', 'BatchId', 'AccountId', 'SubscriptionId', 'CustomerId', 'TransactionStartTime'])
y = df_cleaned['FraudResult']

# Step 3: Split the data into training (70%) and testing (30%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# Step 4: Preprocessing
numerical_features = X_train.select_dtypes(include=['float64', 'int64']).columns.tolist()
categorical_features = X_train.select_dtypes(include=['object']).columns.tolist()

# Create a ColumnTransformer for preprocessing
preprocessor = ColumnTransformer(
    transformers=[('num', StandardScaler(), numerical_features),
                  ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ])

# Step 5: Define models
models = {
    'Logistic Regression': LogisticRegression(max_iter=5000),
    'Random Forest': RandomForestClassifier(),
    'Gradient Boosting': GradientBoostingClassifier()
}

# Hyperparameter tuning parameters
param_grid = {
    'Logistic Regression': {
        'model__C': [0.01, 0.1, 1, 10, 100],
        'model__solver': ['liblinear', 'lbfgs', 'saga', 'newton-cg']
    },
    'Random Forest': {
        'model__n_estimators': [50, 100, 200],
        'model__max_depth': [None, 10, 20, 30],
        'model__min_samples_split': [2, 5, 10]
    },
    'Gradient Boosting': {
        'model__n_estimators': [50, 100, 200],
        'model__learning_rate': [0.01, 0.1, 0.2],
        'model__max_depth': [3, 5, 7]
    }
}

# Step 6: Train and evaluate each model
results = {}

for model_name, model in models.items():
    # Create a pipeline with preprocessing and model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                                ('model', model)])

    # Perform hyperparameter tuning using GridSearchCV or RandomizedSearchCV
    if model_name == 'Logistic Regression':
        grid_search = GridSearchCV(pipeline, param_grid[model_name], cv=5, scoring='f1', verbose=1)
        grid_search.fit(X_train, y_train)
    else:
        grid_search = RandomizedSearchCV(pipeline, param_grid[model_name], n_iter=10, cv=5, scoring='f1', random_state=42)
        grid_search.fit(X_train, y_train)

    # Store the best model and its score
    best_model = grid_search.best_estimator_
    results[model_name] = {
        'best_model': best_model,
        'best_score': grid_search.best_score_
    }

    # Evaluate on the test set
    y_pred = best_model.predict(X_test)
    results[model_name]['accuracy'] = accuracy_score(y_test, y_pred)
    results[model_name]['precision'] = precision_score(y_test, y_pred)
    results[model_name]['recall'] = recall_score(y_test, y_pred)
    results[model_name]['f1_score'] = f1_score(y_test, y_pred)
    results[model_name]['roc_auc'] = roc_auc_score(y_test, best_model.predict_proba(X_test)[:, 1])

    # Confusion Matrix Visualization
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Not Fraud', 'Fraud'])

    # Save confusion matrix plot
    output_folder = '/content/drive/MyDrive/your_folder_path_here/'
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    cm_output_path = os.path.join(output_folder, f"cm_{model_name}.jpg")
    disp.plot(cmap=plt.cm.Blues)
    plt.title(f'Confusion Matrix for {model_name}')
    plt.savefig(cm_output_path)
    plt.close()

# Step 7: Risk Probability and Credit Score Mapping

# Get the predicted probabilities for the test set
best_model = results['Random Forest']['best_model']  # Assume RandomForest performs best, adjust if needed
y_pred_prob = best_model.predict_proba(X_test)[:, 1]  # Probability for the positive class (Fraud/High Risk)

# Categorize customers into risk groups based on probabilities
risk_groups = pd.cut(y_pred_prob, bins=[0, 0.2, 0.4, 0.6, 1.0], labels=['Very Low Risk', 'Low Risk', 'Moderate Risk', 'High Risk'])

# Map risk groups to credit score
credit_score_mapping = {
    'Very Low Risk': 800,  # Best credit score
    'Low Risk': 700,
    'Moderate Risk': 600,
    'High Risk': 500  # Worst credit score
}

# Map the risk groups to credit scores
credit_scores = risk_groups.map(credit_score_mapping)

# Add the risk group and credit score columns to the dataframe
df_cleaned.loc[X_test.index, 'Risk_Group'] = risk_groups
df_cleaned.loc[X_test.index, 'Credit_Score'] = credit_scores

# Display the first few rows of the data with risk groups and credit scores
print(df_cleaned[['CustomerId', 'Risk_Group', 'Credit_Score']].head())

# Save the data with risk groups and credit scores
output_risk_path = os.path.join(output_folder, 'customer_risk_scores.csv')
df_cleaned[['CustomerId', 'Risk_Group', 'Credit_Score']].to_csv(output_risk_path, index=False)

print(f"Customer risk data with credit scores saved at {output_risk_path}")


# Step 8: Summarize the output: Risk Groups and Statistics

# Group by risk group and summarize
risk_summary = df_cleaned[['CustomerId', 'Risk_Group', 'Credit_Score']].groupby('Risk_Group').agg(
    count=('CustomerId', 'count'),
    # Convert 'Credit_Score' to numeric before calculating the mean to avoid errors
    avg_credit_score=('Credit_Score', lambda x: pd.to_numeric(x, errors='coerce').mean()),
    min_credit_score=('Credit_Score', 'min'),
    max_credit_score=('Credit_Score', 'max'),
    risk_probability_range=('Credit_Score', lambda x: (x.min(), x.max()))
).reset_index()

# Display the risk group summary
print("\nRisk Group Summary:")
print(risk_summary)

# Optionally, you can also save this summary to a CSV file for further use
output_summary_path = '/content/drive/MyDrive/your_folder_path_here/risk_group_summary.csv'
risk_summary.to_csv(output_summary_path, index=False)

print(f"Risk group summary saved at {output_summary_path}")

# ... (rest of the code remains the same) ...


# Step 9: Plot the Risk Probability Distribution

plt.figure(figsize=(10, 6))
plt.hist(y_pred_prob, bins=20, color='skyblue', edgecolor='black')
plt.title('Risk Probability Distribution', fontsize=16)
plt.xlabel('Risk Probability', fontsize=14)
plt.ylabel('Frequency', fontsize=14)

# Save the histogram as a .jpg image
output_hist_path = os.path.join(output_folder, 'risk_probability_distribution.jpg')
plt.savefig(output_hist_path, format='jpg', dpi=300)
plt.close()

print(f"Risk probability distribution plot saved at {output_hist_path}")


Fitting 5 folds for each of 20 candidates, totalling 100 fits
        CustomerId     Risk_Group Credit_Score
0  CustomerId_4406            NaN          NaN
1  CustomerId_4406            NaN          NaN
2  CustomerId_4683  Very Low Risk          800
3   CustomerId_988            NaN          NaN
4   CustomerId_988            NaN          NaN
Customer risk data with credit scores saved at /content/drive/MyDrive/your_folder_path_here/customer_risk_scores.csv

Risk Group Summary:
      Risk_Group  count avg_credit_score min_credit_score max_credit_score  \
0  Very Low Risk  26042              800              800              800   
1       Low Risk     12              700              700              700   
2  Moderate Risk      9              600              600              600   
3      High Risk     52              500              500              500   

  risk_probability_range  
0             (800, 800)  
1             (700, 700)  
2             (600, 600)  
3             (500,

In [13]:
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

# Sample Data (replace this with your actual dataset)
# Assuming 'df_cleaned' is your dataset and 'FraudResult' is your target variable
# X = df_cleaned.drop(columns=['FraudResult', 'other_columns'])
# y = df_cleaned['FraudResult']

# For illustration, I'll use dummy data (replace it with your actual data)
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=10, random_state=42)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Create a model pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),  # Optional preprocessing step (scaling)
    ('model', RandomForestClassifier(random_state=42))
])

# Train the model
pipeline.fit(X_train, y_train)

# Save the trained model to a .joblib file
joblib.dump(pipeline, 'credit_risk_model.joblib')

print("Model saved as 'credit_risk_model.joblib'")

Model saved as 'credit_risk_model.joblib'


In [15]:
!pip install fastapi
!pip install uvicorn
!pip install pydantic

Collecting fastapi
  Downloading fastapi-0.115.5-py3-none-any.whl.metadata (27 kB)
Collecting starlette<0.42.0,>=0.40.0 (from fastapi)
  Downloading starlette-0.41.2-py3-none-any.whl.metadata (6.0 kB)
Downloading fastapi-0.115.5-py3-none-any.whl (94 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.9/94.9 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading starlette-0.41.2-py3-none-any.whl (73 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.3/73.3 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: starlette, fastapi
Successfully installed fastapi-0.115.5 starlette-0.41.2
Collecting uvicorn
  Downloading uvicorn-0.32.0-py3-none-any.whl.metadata (6.6 kB)
Downloading uvicorn-0.32.0-py3-none-any.whl (63 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.7/63.7 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: uvicorn
Successfully installed uvicorn-0.32.0
