In [1]:
# 1. Import core dependent libraries
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")  # Ignore irrelevant warnings to avoid cluttered output

# ---------------------- 2. Data loading and redundant column cleaning ----------------------
# Load the dataset
df = pd.read_csv("german_credit.csv")

# Clean the redundant Unnamed: 0 index column (a common redundant column in this dataset, avoid interfering with analysis)
if "Unnamed: 0" in df.columns:
    df.drop("Unnamed: 0", axis=1, inplace=True)

# Verify the existence of core fields (avoid KeyError in advance)
required_cols = ["Risk", "Age", "Credit amount", "Duration", "Housing"]
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
    raise ValueError(f"The dataset lacks core risk control fields: {missing_cols}. Please confirm that the dataset is the complete german_credit.csv")

In [2]:
# ---------------------- 3. Initial Data Exploration (Understand Data Structure and Quality) ----------------------
print("="*60)
print("1. Initial Data Exploration Results")
print("="*60)
print(f"‚úÖ Data Scale: Total Rows={df.shape[0]}, Total Columns={df.shape[1]}")
print("\nüìå First 5 Rows of Data (Field Structure):")
print(df.head())
print("\nüìå Field Information (Including Missing Value Statistics):")
df.info()
print("\nüìå Value Distribution of Core Fields (Business Logic Verification):")
print(f"Risk Status (Risk) Distribution:\n{df['Risk'].value_counts()}")
print(f"\nHousing Type (Housing) Distribution:\n{df['Housing'].value_counts()}")
print(f"\nLoan Term (Duration, months) Distribution:\n{df['Duration'].value_counts().sort_index().head(5)}")

1. Initial Data Exploration Results
‚úÖ Data Scale: Total Rows=1000, Total Columns=10

üìå First 5 Rows of Data (Field Structure):
   Credit History  Age  Gender  Job Housing Saving accounts  Credit amount  \
0               4   67    male    2     own             NaN           1169   
1               2   22  female    2     own          little           5951   
2               4   49    male    1     own          little           2096   
3               2   45    male    2    free          little           7882   
4               3   53    male    2    free          little           4870   

   Duration              Purpose  Risk  
0         6             radio/TV  good  
1        48             radio/TV   bad  
2        12            education  good  
3        42  furniture/equipment  good  
4        24                  car   bad  

üìå Field Information (Including Missing Value Statistics):
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (tota

In [3]:
# ---------------------- 4. Core Data Cleaning ----------------------
print("\n" + "="*60)
print("2. Risk Control Data Cleaning Process")
print("="*60)
clean_before = df.shape[0]  # Record the number of rows before cleaning

# 4.1 Missing Value Handling
missing_cols = df.columns[df.isnull().sum() > 0].tolist()
print(f"\nüîß Missing Value Handling - Fields to be Processed: {missing_cols}") 
for col in missing_cols:
    mode_val = df[col].mode()[0]  # Mode filling
    df[col].fillna(mode_val, inplace=True)
    print(f"‚Üí {col} Field: Filled with mode„Äå{mode_val}„Äç, Number of missing values after filling={df[col].isnull().sum()}")

# 4.2 Abnormal Value Handling (Eliminate invalid data based on credit business rules)
print(f"\nüîß Abnormal Value Handling - Business Rule Filtering:")
# Rule 1: Age 18-65 years old (No loan eligibility under 18, weak repayment ability over 65)
df = df[(df["Age"] >= 18) & (df["Age"] <= 65)]
# Rule 2: Loan amount > 0 (Invalid data with no business significance)
df = df[df["Credit amount"] > 0]
# Rule 3: Loan term 1-60 months
df = df[(df["Duration"] >= 1) & (df["Duration"] <= 60)]
print(f"‚Üí After eliminating abnormal data, remaining rows={df.shape[0]} (Original rows={clean_before}, Elimination rate={(clean_before-df.shape[0])/clean_before:.2%})")

# 4.3 Risk Field Conversion (Adapt to risk control indicator calculation: 0=Normal, 1=Overdue)
df["loan_status"] = df["Risk"].map({"Good": 0, "Bad": 1})
print(f"\nüîß Risk Status Conversion: Good‚Üí0 (Normal), Bad‚Üí1 (Overdue)")
print(f"‚Üí Distribution after conversion:\n{df['loan_status'].value_counts()}")

# 4.4 Save Cleaned Data
df.to_csv("cleaned_german_credit.csv", index=False, encoding="utf-8-sig")
print(f"\n‚úÖ Cleaning Completed: Clean data saved to cleaned_german_credit.csv")


2. Risk Control Data Cleaning Process

üîß Missing Value Handling - Fields to be Processed: ['Saving accounts']
‚Üí Saving accounts Field: Filled with mode„Äålittle„Äç, Number of missing values after filling=0

üîß Abnormal Value Handling - Business Rule Filtering:
‚Üí After eliminating abnormal data, remaining rows=981 (Original rows=1000, Elimination rate=1.90%)

üîß Risk Status Conversion: Good‚Üí0 (Normal), Bad‚Üí1 (Overdue)
‚Üí Distribution after conversion:
Series([], Name: count, dtype: int64)

‚úÖ Cleaning Completed: Clean data saved to cleaned_german_credit.csv


In [4]:
# ---------------------- 5. Calculation of Core Risk Control Indicators ----------------------

# 1. Check unique values of the Risk column (Confirm values are good/bad)
print("Unique values of Risk column: ", df["Risk"].unique())

# 2. Map Risk to loan_status (good‚Üí0, bad‚Üí1)
df["loan_status"] = df["Risk"].map({"good": 0, "bad": 1})

# 3. Verify conversion results
print("Unique values of loan_status after conversion: ", df["loan_status"].unique())
print("Number of normal customers: ", df[df["loan_status"] == 0].shape[0])
print("Number of overdue customers: ", df[df["loan_status"] == 1].shape[0])


print("\n" + "="*60)
print("3. Calculation of Core Risk Control Indicators (Core Project Output)")
print("="*60)
total_cust = df.shape[0]
overdue_cust = df[df["loan_status"] == 1].shape[0]
overdue_rate = overdue_cust / total_cust

# 5.1 Overall Overdue Rate
print(f"üìä Overall Overdue Rate: {overdue_rate:.2%} (Overdue customers {overdue_cust} / Total customers {total_cust})")

# 5.2 Overdue Rate by Housing Type (Identify high-risk customer groups)
overdue_by_housing = df.groupby("Housing")["loan_status"].agg(
    Customer_Count="count",
    Overdue_Count="sum",
    Overdue_Rate=lambda x: x.sum()/x.count()
).round(4)
overdue_by_housing["Overdue_Rate"] = overdue_by_housing["Overdue_Rate"].map(lambda x: f"{x:.2%}")
print(f"\nüìä Overdue Rate by Housing Type:")
print(overdue_by_housing)

# 5.3 Overdue Rate by Loan Term (Verify the business common sense that "longer terms mean higher risks")
df["term_group"] = pd.cut(
    df["Duration"], 
    bins=[0, 12, 36, 60], 
    labels=["Short-term (1-12 months)", "Medium-term (13-36 months)", "Long-term (37-60 months)"]
)
overdue_by_term = df.groupby("term_group")["loan_status"].agg(
    Customer_Count="count",
    Overdue_Count="sum",
    Overdue_Rate=lambda x: x.sum()/x.count()
).round(4)
overdue_by_term["Overdue_Rate"] = overdue_by_term["Overdue_Rate"].map(lambda x: f"{x:.2%}")
print(f"\nüìä Overdue Rate by Loan Term:")
print(overdue_by_term)

Unique values of Risk column:  ['bad' 'good']
Unique values of loan_status after conversion:  [1 0]
Number of normal customers:  687
Number of overdue customers:  294

3. Calculation of Core Risk Control Indicators (Core Project Output)
üìä Overall Overdue Rate: 29.97% (Overdue customers 294 / Total customers 981)

üìä Overdue Rate by Housing Type:
         Customer_Count  Overdue_Count Overdue_Rate
Housing                                            
free                102             43       42.16%
own                 701            182       25.96%
rent                178             69       38.76%

üìä Overdue Rate by Loan Term:
                            Customer_Count  Overdue_Count Overdue_Rate
term_group                                                            
Short-term (1-12 months)               347             73       21.04%
Medium-term (13-36 months)             548            177       32.30%
Long-term (37-60 months)                86             44       51.16%

In [5]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

plt.rcParams['axes.unicode_minus'] = False  # Ensure normal display of the minus sign only
plt.style.use("seaborn-v0_8-whitegrid")
# Professional color scheme (Green=Normal / Red=Overdue / Blue=Neutral)
COLORS = ["#2E8B57", "#DC143C", "#4682B4"]

# ===================== Data Preparation =====================
# Pie chart data: Overall credit risk distribution
labels = ["Good Customers", "Bad (Overdue) Customers"]
sizes = [df[df["loan_status"] == 0].shape[0], df[df["loan_status"] == 1].shape[0]]
total_cust = df.shape[0]

# Bar chart data: Overdue rate by housing type
housing_data = overdue_by_housing.copy()
housing_data["overdue_rate_val"] = housing_data["Overdue_Rate"].str.replace("%", "").astype(float)

# Box plot data: Loan amount distribution by risk status
box_data = [
    df[df["loan_status"] == 0]["Credit amount"].dropna(),
    df[df["loan_status"] == 1]["Credit amount"].dropna()
]
box_labels = ["Good", "Bad (Overdue)"]

# ===================== Plotting (1 row, 3 columns, unchanged structure, full English labels) =====================
fig, axes = plt.subplots(1, 3, figsize=(18, 6))

# 1. Pie Chart: Overall Credit Risk Distribution
axes[0].pie(
    sizes, labels=labels, colors=COLORS[:2], autopct='%1.1f%%',
    explode=(0, 0.05), shadow=False, startangle=90, pctdistance=0.85,
    textprops={'fontsize': 10}
)
axes[0].set_title(f'Overall Credit Risk Distribution\n(Total: {total_cust} Customers)', 
                  fontsize=12, fontweight='bold', pad=20)
axes[0].axis('equal')  # Ensure the pie chart is a perfect circle

# 2. Bar Chart: Overdue Rate by Housing Type
bars = axes[1].bar(
    housing_data.index, housing_data["overdue_rate_val"],
    color=COLORS, alpha=0.8, edgecolor='black', linewidth=0.5
)
# Add value labels on top of the bars
for bar in bars:
    height = bar.get_height()
    if height > 0:
        axes[1].text(
            bar.get_x() + bar.get_width()/2., height + 0.5,
            f'{height:.1f}%', ha='center', va='bottom', fontweight='bold', fontsize=10
        )
axes[1].set_title('Overdue Rate by Housing Type', fontsize=12, fontweight='bold', pad=20)
axes[1].set_xlabel('Housing Type', fontsize=11, fontweight='bold')
axes[1].set_ylabel('Overdue Rate (%)', fontsize=11, fontweight='bold')
axes[1].set_ylim(0, housing_data["overdue_rate_val"].max() + 3)
axes[1].tick_params(axis='x', rotation=45, labelsize=10)  # Rotate x-axis labels to avoid overlap

# 3. Box Plot: Loan Amount Distribution (Good vs Bad)
bp = axes[2].boxplot(
    box_data, labels=box_labels, patch_artist=True,
    showfliers=False, widths=0.6  # Hide extreme outliers for clearer distribution visualization
)
# Add fill color to box plot
for patch, color in zip(bp['boxes'], COLORS[:2]):
    patch.set_facecolor(color)
    patch.set_alpha(0.6)
# Optimize box plot line styles
for element in ['whiskers', 'caps', 'medians']:
    plt.setp(bp[element], color='black', linewidth=1)

axes[2].set_title('Loan Amount Distribution\n(Unit: DEM)', fontsize=12, fontweight='bold', pad=20)
axes[2].set_xlabel('Customer Risk Status', fontsize=11, fontweight='bold')
axes[2].set_ylabel('Loan Amount', fontsize=11, fontweight='bold')
axes[2].tick_params(labelsize=10)

# ===================== Save the Chart =====================
plt.tight_layout()  # Automatically adjust subplot spacing to avoid label overlap
plt.savefig("german_credit_risk_charts_en.png", dpi=300, bbox_inches="tight")
plt.close(fig)  # Manually close the figure to release memory

print("‚úÖ English Version Visualization Done!")
print("üìä Chart saved as: german_credit_risk_charts_en.png (current directory)")

‚úÖ English Version Visualization Done!
üìä Chart saved as: german_credit_risk_charts_en.png (current directory)


In [17]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, confusion_matrix, classification_report

# =============================================
# Step 1: Data Loading and Basic Preprocessing
# =============================================
df = pd.read_csv("german_credit.csv", encoding="utf-8")
df["loan_status"] = df["Risk"].map({"good": 0, "bad": 1})  # Risk Control Standard Label: 0=non-default  1=default
df = df.drop(["Risk"], axis=1, errors="ignore")

# Core: Column Mapping (Adapt to german_credit standard columns, fix NameError)
col_map = {"Credit amount": "Credit amount", "Duration": "Duration", "Age": "Age"}

# Robust Missing Value Imputation
for col in df.columns:
    if df[col].dtype in ["object", "category"] or df[col].nunique() <= 10:
        df[col] = df[col].fillna("Missing")
    else:
        df[col] = df[col].fillna(df[col].median())

print(f"‚úÖ Step 1: Data Loaded | Sample Count: {df.shape[0]} | Original Features: {df.shape[1]-1} (1 label column included)")

# =============================================
# Step 2: Risk Control Feature Engineering + WOE Encoding
# =============================================
# Create risk control combination features
df["Monthly_repayment"] = df[col_map["Credit amount"]] / df[col_map["Duration"]]
df["Credit_per_age"] = df[col_map["Credit amount"]] / df[col_map["Age"]]

# Classify feature types
continuous_features = [col_map["Age"], col_map["Credit amount"], col_map["Duration"], "Monthly_repayment", "Credit_per_age"]
categorical_features = [col for col in df.columns if col not in continuous_features + ["loan_status"]]

# Continuous feature binning (Winsorization + Quantile Binning, robust to outliers)
def bin_continuous_feature(df, feature, target, bins=6):
    df[feature] = df[feature].clip(lower=df[feature].quantile(0.01), upper=df[feature].quantile(0.99))
    df[f"{feature}_bin"] = pd.qcut(df[feature], bins, labels=False, duplicates="drop")
    return df

for feat in continuous_features:
    df = bin_continuous_feature(df, feat, "loan_status")

# WOE + IV Calculation Function
def calculate_woe_iv(df, feature_bin, target):
    target_classes = [0, 1]
    cross_tab = pd.crosstab(index=df[feature_bin], columns=df[target], dropna=False, colnames=[None]).reindex(columns=target_classes).fillna(0)
    cross_tab.columns = ["non_default", "default"]
    total_non_default, total_default = max(df[target].eq(0).sum(), 1e-8), max(df[target].eq(1).sum(), 1e-8)
    cross_tab["p0"], cross_tab["p1"] = cross_tab["non_default"]/total_non_default, cross_tab["default"]/total_default
    cross_tab["woe"] = np.log(np.maximum(cross_tab["p0"] / cross_tab["p1"], 1e-8))
    cross_tab["iv"] = (cross_tab["p0"] - cross_tab["p1"]) * cross_tab["woe"]
    woe_map = cross_tab["woe"].to_dict()
    woe_map["default"] = 0.0
    return woe_map, round(cross_tab["iv"].sum(), 4)

# WOE Encoding (Unified encoding for binned and categorical features)
woe_maps, iv_results = {}, []
features_to_woe = [f"{feat}_bin" for feat in continuous_features] + categorical_features
for feat in features_to_woe:
    woe_map, iv_val = calculate_woe_iv(df, feat, "loan_status")
    woe_maps[feat], iv_results = woe_map, iv_results + [{"feature": feat, "iv_value": iv_val}]
    df[f"{feat}_woe"] = df[feat].map(woe_map).fillna(woe_map["default"])

df = df.drop(features_to_woe, axis=1)  # Remove only original binned/categorical features
print(f"‚úÖ Step 2: WOE Encoding Completed | WOE Features Generated: {len(iv_results)} | Original continuous features retained")

# =============================================
# Step 3: IV Value Feature Selection + Train-Test Split
# =============================================
iv_df = pd.DataFrame(iv_results).sort_values("iv_value", ascending=False)
selected_features = iv_df[iv_df["iv_value"] >= 0.035]["feature"].tolist()
selected_woe_features = [f"{feat}_woe" for feat in selected_features]

# Final data cleaning + Stratified split (Mandatory for imbalanced risk control data)
df = df.dropna(subset=["loan_status"]).reset_index(drop=True)
df[selected_woe_features] = df[selected_woe_features].fillna(0)
X, y = df[selected_woe_features], df["loan_status"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42, stratify=y)

print(f"‚úÖ Step 3: Feature Selection Completed | Modeling Features: {X.shape[1]} | Train Set: {X_train.shape[0]} | Test Set: {X_test.shape[0]}")

# =============================================
# Step 4: Risk Control Evaluation Metrics (KS+PSI, Industry Standard)
# =============================================
def calculate_ks(y_true, y_proba):
    df_ks = pd.DataFrame({"y_true": y_true, "y_proba": y_proba}).sort_values("y_proba").reset_index(drop=True)
    df_ks["bin"] = pd.qcut(df_ks["y_proba"], q=10, labels=False, duplicates="drop")
    ks_cross = pd.crosstab(df_ks["bin"], df_ks["y_true"]).reindex(columns=[0,1]).fillna(0)
    ks_cross.columns = ["non_default", "default"]
    ks_cross["cum_non_default"] = ks_cross["non_default"].cumsum() / (ks_cross["non_default"].sum() + 1e-8)
    ks_cross["cum_default"] = ks_cross["default"].cumsum() / (ks_cross["default"].sum() + 1e-8)
    return round(abs(ks_cross["cum_non_default"] - ks_cross["cum_default"]).max(), 4)

def calculate_psi(p_train, p_test):
    all_proba = np.concatenate([p_train, p_test])
    bins = pd.qcut(all_proba, q=10, retbins=True, duplicates="drop")[1]
    train_pct, test_pct = np.histogram(p_train, bins=bins)[0]/(len(p_train)+1e-8), np.histogram(p_test, bins=bins)[0]/(len(p_test)+1e-8)
    return round(np.sum((train_pct - test_pct) * np.log(np.maximum(train_pct / test_pct, 1e-8))), 4)

print("‚úÖ Step 4: Risk Control Evaluation Metrics (KS+PSI) Defined")

# =============================================
# Step 5 + Step 6: Logistic Regression Modeling + Final Validation
# =============================================
# Initialize model once (LR is the first choice for risk control due to high interpretability)
lr_model = LogisticRegression(max_iter=2000, class_weight="balanced", random_state=42, solver="liblinear", C=0.65)
lr_model.fit(X_train, y_train)

# Model prediction (Get default probability)
y_train_proba = lr_model.predict_proba(X_train)[:, 1]
y_test_proba = lr_model.predict_proba(X_test)[:, 1]
y_test_pred = lr_model.predict(X_test)

# Core evaluation metrics (Calculated once for final validation)
final_auc = roc_auc_score(y_test, y_test_proba)
final_ks = calculate_ks(y_test, y_test_proba)
final_psi = calculate_psi(y_train_proba, y_test_proba)

print(f"‚úÖ Step 5: Model Trained | Core Metrics: AUC={final_auc:.4f} | KS={final_ks:.4f} | PSI={final_psi:.4f}")

# Feature Coefficient Interpretation (Core of LR's interpretability)
coef_df = pd.DataFrame({"Risk_Feature": selected_features, "WOE_Feature_Column": selected_woe_features, "Feature_Coefficient": lr_model.coef_[0]}).round(4)
print("\nüìä Risk Control Feature Coefficient Interpretation (Positive = High-Risk Feature, Negative = Low-Risk Feature):")
print(coef_df)

# Final Validation Report
print("\n" + "="*70)
print("üìà Risk Control Model Final Validation Results")
print("="*70)
print(f"üîπ Core Metrics: AUC={final_auc:.4f} | KS={final_ks:.4f} | PSI={final_psi:.4f}")
print("="*70)

# Confusion Matrix + Classification Report
print("\nüìã Test Set Confusion Matrix:")
print(confusion_matrix(y_test, y_test_pred))
print("\nüìù Classification Report (Focus on Recall of Default Class (1) for Risk Control):")
print(classification_report(y_test, y_test_pred, target_names=["0=non-default", "1=default"], digits=4))

# Model Qualification Judgment (Industry Deployment Standards)
def risk_conclusion(auc, ks, psi):
    return "‚úÖ Model Fully Qualified! KS‚â•0.4, Meets Practical Risk Control Deployment Standards!" if auc>=0.7 and ks>=0.4 and psi<=0.25 else f"‚ö†Ô∏è  Model Needs Optimization | AUC={auc:.4f} | KS={ks:.4f} | PSI={psi:.4f}"

print("\n" + "="*70)
print(risk_conclusion(final_auc, final_ks, final_psi))
print("="*70)

‚úÖ Step 1: Data Loaded | Sample Count: 1000 | Original Features: 9 (1 label column included)
‚úÖ Step 2: WOE Encoding Completed | WOE Features Generated: 11 | Original continuous features retained
‚úÖ Step 3: Feature Selection Completed | Modeling Features: 9 | Train Set: 750 | Test Set: 250
‚úÖ Step 4: Risk Control Evaluation Metrics (KS+PSI) Defined
‚úÖ Step 5: Model Trained | Core Metrics: AUC=0.7539 | KS=0.4000 | PSI=0.0479

üìä Risk Control Feature Coefficient Interpretation (Positive = High-Risk Feature, Negative = Low-Risk Feature):
            Risk_Feature         WOE_Feature_Column  Feature_Coefficient
0         Credit History         Credit History_woe              -0.8423
1           Duration_bin           Duration_bin_woe              -0.8464
2        Saving accounts        Saving accounts_woe              -1.0935
3                Age_bin                Age_bin_woe              -0.5740
4     Credit_per_age_bin     Credit_per_age_bin_woe               0.0460
5             

In [19]:
# =============================================
# Step 7: Credit Scorecard Conversion
# =============================================
import pandas as pd
pd.set_option('display.max_columns', None) 
pd.set_option('display.width', 200)         
pd.set_option('display.max_colwidth', 20)
print("="*80)
print("üìä Step 7: Credit Scorecard Conversion (300-850 Standard Score for Financial Industry)")
print("="*80)

# 7.1 Scorecard Basic Parameters
base_score = 600
pdo = 20  # Points to Double the Odds
odds = 1/3
factor = pdo / np.log(2)
offset = base_score - factor * np.log(odds)

# 7.2 Calculate Feature-Bin-Score Mapping
scorecard_data = []
for idx, row in coef_df.iterrows():
    feat_name = row["Risk_Feature"]
    woe_map = woe_maps[feat_name]
    feat_coef = row["Feature_Coefficient"]
    for feat_val, woe_val in woe_map.items():
        if feat_val == "default":
            continue
        feat_score = round(-1 * factor * feat_coef * woe_val)
        scorecard_data.append({
            "Risk_Feature": feat_name,
            "Feature_Value_Bin": feat_val,
            "WOE_Value": round(woe_val, 4),
            "Feature_Coefficient": round(feat_coef, 4),
            "Feature_Individual_Score": feat_score
        })

scorecard = pd.DataFrame(scorecard_data)
scorecard = scorecard.sort_values(["Risk_Feature", "Feature_Individual_Score"]).reset_index(drop=True)

# Add base score row
base_score_row = pd.DataFrame({
    "Risk_Feature": ["Base_Score"],
    "Feature_Value_Bin": ["-"],
    "WOE_Value": ["-"],
    "Feature_Coefficient": ["-"],
    "Feature_Individual_Score": [base_score]
})
scorecard = pd.concat([base_score_row, scorecard], ignore_index=True)

# 7.3 Credit Score Calculation Function for Single User (Fix Category Attribute Error)
def calculate_credit_score(user_feature, scorecard, woe_maps, selected_features, raw_processed_df):
    total_score = base_score
    for feat in selected_features:
        if feat.endswith("_bin"):
            raw_feat = feat.replace("_bin", "")
            # Map raw feature name with col_map
            raw_feat_original = col_map.get(raw_feat, raw_feat)
            user_raw_val = user_feature.get(raw_feat_original, np.nan)
            
            if pd.isna(user_raw_val):
                user_feat_match_val = "Missing"
            else:
                # Winsorize outliers (Consistent with Steps 1-6: 1%/99% quantiles)
                feat_1q = raw_processed_df[raw_feat_original].quantile(0.01)
                feat_99q = raw_processed_df[raw_feat_original].quantile(0.99)
                user_val_clip = np.clip(user_raw_val, feat_1q, feat_99q)
                
                # Get bin edges with retbins=True to avoid category attribute error
                _, bin_edges = pd.qcut(
                    raw_processed_df[raw_feat_original].clip(feat_1q, feat_99q),
                    q=6,
                    duplicates="drop",
                    retbins=True  # Key: Return bin edge array
                )
                
                # Match user value to corresponding bin label
                user_bin = pd.cut(
                    [user_val_clip],
                    bins=bin_edges,
                    labels=False,
                    include_lowest=True
                )[0]
                user_feat_match_val = int(user_bin) if pd.notna(user_bin) else "Missing"
        else:
            # Directly get raw value for categorical features
            user_feat_match_val = user_feature.get(feat, "Missing")
        
        # Match individual score from scorecard
        score_match = scorecard[
            (scorecard["Risk_Feature"] == feat) &
            (scorecard["Feature_Value_Bin"].astype(str) == str(user_feat_match_val))
        ]["Feature_Individual_Score"]
        if not score_match.empty:
            total_score += score_match.values[0]
    
    # Restrict score within 300-850 range
    total_score = max(300, min(850, total_score))
    return round(total_score)

# 7.4 Scorecard Output + Simulated User Score Test
print("‚úÖ 7.1 Generated Standard Financial Industry Scorecard (First 15 Rows):")
print(scorecard.head(15))
total_feat_bin = len(scorecard[scorecard["Risk_Feature"] != "Base_Score"])
print(f"\n‚úÖ 7.2 Scorecard Core Info: {total_feat_bin} feature bins in total | Base Score = {base_score} | PDO = {pdo}")

# Simulated user feature (Consistent with col_map column names)
sample_user = {
    col_map["Age"]: 35,
    col_map["Credit amount"]: 8000,
    col_map["Duration"]: 24,
    "Housing": "own",
    "Job": "skilled",
    "Saving accounts": "medium",
    "Credit History": "good"
}
sample_user_score = calculate_credit_score(sample_user, scorecard, woe_maps, selected_features, df)
print(f"\n‚úÖ 7.3 Simulated User Credit Score Test: {sample_user_score} Points (300-850 Standard Range)")
print(f"‚úÖ 7.4 Score Interpretation: Higher Score ‚Üí Lower Default Probability ‚Üí Better Credit Level")

üìä Step 7: Credit Scorecard Conversion (300-850 Standard Score for Financial Industry)
‚úÖ 7.1 Generated Standard Financial Industry Scorecard (First 15 Rows):
         Risk_Feature Feature_Value_Bin WOE_Value Feature_Coefficient  Feature_Individual_Score
0          Base_Score                 -         -                   -                  600     
1             Age_bin                 0   -0.5288              -0.574                   -9     
2             Age_bin                 2   -0.1127              -0.574                   -2     
3             Age_bin                 1      0.04              -0.574                    1     
4             Age_bin                 4    0.1362              -0.574                    2     
5             Age_bin                 5    0.2425              -0.574                    4     
6             Age_bin                 3    0.4572              -0.574                    8     
7      Credit History                 0   -1.3581             -0.8423 

In [20]:
# Generate Binned Features First, Then Calculate Credit Scores
import joblib
import os
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score
from sklearn.linear_model import LogisticRegression
from scipy.stats import ks_2samp

# ===================== 1. Basic Utility Functions =====================
def normalize_col_name(col):
    if not isinstance(col, str):
        return str(col)
    return col.strip().replace(" ", "_").lower()

def calculate_ks(y_true, y_pred):
    pos_pred = y_pred[y_true == 1]
    neg_pred = y_pred[y_true == 0]
    ks_stat, _ = ks_2samp(pos_pred, neg_pred)
    return ks_stat

def scorecard2dict(scorecard_df):
    scorecard_dict = {}
    for feat, group in scorecard_df.groupby("Risk_Feature"):
        if "Base_Score" in feat or "Total_Score" in feat:
            continue
        norm_feat = normalize_col_name(feat)
        scorecard_dict[norm_feat] = dict(zip(group["Feature_Value_Bin"], group["Feature_Individual_Score"]))
    return scorecard_dict

def cal_score(user_feat, scorecard_dict, selected_feats):
    base_score = 600
    total_score = base_score
    norm_feat = {normalize_col_name(k): v for k, v in user_feat.items()}
    
    for feat in selected_feats:
        val = norm_feat.get(feat, None)
        if val is None or pd.isna(val):
            continue
        if feat not in scorecard_dict:
            continue
        
        bin_map = scorecard_dict[feat]
        add = 0
        if val in bin_map:
            add = bin_map[val]
        elif isinstance(val, (int, float)) and str(int(val)) in bin_map:
            add = bin_map[str(int(val))]
        elif isinstance(val, (int, float)) and any(isinstance(k, pd.Interval) for k in bin_map.keys()):
            for k in bin_map.keys():
                if isinstance(k, pd.Interval) and k.left < val <= k.right:
                    add = bin_map[k]
                    break
        total_score += add
    
    return max(300, min(850, round(total_score)))

def batch_cal_score(X_data, binned_df, scorecard_dict, selected_feats):
    X_reset = X_data.reset_index(drop=True)
    binned_reset = binned_df[selected_feats].reset_index(drop=True)
    scores = [cal_score(binned_reset.loc[i].to_dict(), scorecard_dict, selected_feats) for i in range(len(X_reset))]
    return np.array(scores)

# ===================== 2. Pre-check + Core: Manually Generate Binned Features =====================
print("===== Step 1: Pre-check Prerequisite Variables + Generate Binned Features =====")
required_vars = ["scorecard", "df", "X_train", "y_train", "X_test", "y_test"]
for var in required_vars:
    if var not in locals() and var not in globals():
        raise NameError(f"‚ùå Missing prerequisite variable: {var}!")
print("‚úÖ All prerequisite variables are available!")

# Global column name normalization
df.columns = [normalize_col_name(c) for c in df.columns]
X_train.columns = [normalize_col_name(c) for c in X_train.columns]
X_test.columns = [normalize_col_name(c) for c in X_test.columns]

# Generate binned features (ending with _bin) for core continuous features in df
core_cont_feats = ["age", "duration", "credit_amount", "credit_per_age", "monthly_repayment"]
for feat in core_cont_feats:
    if feat in df.columns:
        # Quantile binning (5 bins, matched with scorecard)
        df[f"{feat}_bin"] = pd.qcut(df[feat], q=5, labels=False, duplicates="drop")
        print(f"‚úÖ Binned feature generated: {feat}_bin")
    else:
        print(f"‚ö†Ô∏è  No {feat} column in original df, skip binning")

# ===================== 3. Feature Extraction & Scorecard Matching =====================
print("\n===== Step 2: Feature Extraction & Scorecard Matching =====")
# 1. Features from scorecard
scorecard_dict = scorecard2dict(scorecard)
sc_feats = list(scorecard_dict.keys())
print(f"Valid features in scorecard: {sc_feats}")

# 2. Binned features (ending with _bin) in df
df_bin_feats = [c for c in df.columns if c.endswith('_bin')]
print(f"Binned features (with _bin) in df: {df_bin_feats}")

# 3. Get intersection of two feature lists
selected_feats = list(set(sc_feats) & set(df_bin_feats))
if not selected_feats:
    print("‚ö†Ô∏è  No matching features, add common binned features manually!")
    selected_feats = [f for f in ["age_bin", "credit_amount_bin", "duration_bin"] if f in df.columns]
print(f"Final binned features (with _bin) for scoring: {selected_feats}")

# Generate binned feature set + fill missing values
df_binned = df[selected_feats].copy().fillna("Missing")
# Scorecard dictionary fallback processing
for feat in selected_feats:
    if feat not in scorecard_dict:
        scorecard_dict[feat] = {v: 0 for v in df_binned[feat].unique()}
    else:
        for v in df_binned[feat].unique():
            if v not in scorecard_dict[feat]:
                scorecard_dict[feat][v] = 0
print("‚úÖ Feature matching completed!")

# ===================== 4. Model Training =====================
print("\n===== Step 3: Logistic Regression Model Training =====")
model_feats = [c for c in X_train.columns if c.endswith('_woe')]
if not model_feats:
    raise ValueError("‚ùå No WOE features in X_train!")
lr_model = LogisticRegression(C=0.1, penalty='l1', solver='saga', random_state=42)
lr_model.fit(X_train[model_feats], y_train)
print(f"‚úÖ Model training completed, using {len(model_feats)} WOE features!")

# ===================== 5. Batch Score Calculation =====================
print("\n===== Step 4: Batch Calculate Scores for Train/Test Sets =====")
train_scores = batch_cal_score(X_train, df_binned, scorecard_dict, selected_feats)
test_scores = batch_cal_score(X_test, df_binned, scorecard_dict, selected_feats)
y_train_align = y_train.reset_index(drop=True)[:len(train_scores)]
y_test_align = y_test.reset_index(drop=True)[:len(test_scores)]

# Print score distribution
print(f"Train set scores: {train_scores.min()} ~ {train_scores.max()}, Number of unique values: {len(np.unique(train_scores))}")
print(f"Test set scores: {test_scores.min()} ~ {test_scores.max()}, Number of unique values: {len(np.unique(test_scores))}")
if len(np.unique(train_scores)) == 1 and train_scores[0] == 600:
    print("‚ö†Ô∏è  All scores are still 600! Check if scorecard has non-zero scores!")

# ===================== 6. Basic Performance Validation =====================
print("\n===== Step 5: Model Performance Validation =====")
def cal_psi(train, test):
    if len(np.unique(train)) == 1 or len(np.unique(test)) == 1:
        return 0
    bins = np.quantile(np.concatenate([train, test]), np.linspace(0,1,10))
    t1, _ = np.histogram(train, bins=bins, density=True)
    t2, _ = np.histogram(test, bins=bins, density=True)
    t1, t2 = np.clip(t1, 1e-10, None), np.clip(t2, 1e-10, None)
    return np.sum((t2-t1)*np.log(t2/t1))

psi = cal_psi(train_scores, test_scores)
print(f"1. PSI: {psi:.4f} | Standard: ‚â§0.1 is qualified")

score_ana = pd.DataFrame({"score": test_scores, "default": y_test_align.values})
score_ana["score_bin"] = pd.cut(score_ana["score"], 10, labels=False)
score_seg = score_ana.groupby("score_bin").agg(Avg_Score=("score", "mean"), Default_Rate=("default", "mean")).round(4)
print("2. Score-Default Rate Monotonicity:")
print(score_seg)
mono = score_seg["Default_Rate"].is_monotonic_decreasing
print(f"   Conclusion: {'‚úÖ Qualified' if mono else '‚ö†Ô∏è Need Optimization'}")

auc = roc_auc_score(y_test_align, -1*test_scores)
ks = calculate_ks(y_test_align, -1*test_scores)
print(f"3. AUC: {auc:.4f} | Standard: ‚â•0.5 is qualified")
print(f"4. KS: {ks:.4f} | Standard: ‚â•0.3 is qualified")
if auc < 0.5:
    print("‚ö†Ô∏è  AUC<0.5! Scorecard mapping is reversed, set high-risk features to negative scores!")

# ===================== 7. Model Saving =====================
print("\n===== Step 6: Model Saving =====")
save_path = "./simple_risk_model/"
os.makedirs(save_path, exist_ok=True)
joblib.dump(lr_model, os.path.join(save_path, "lr_model.pkl"))
joblib.dump(scorecard_dict, os.path.join(save_path, "scorecard_dict.pkl"))
joblib.dump(selected_feats, os.path.join(save_path, "selected_feats.pkl"))
print(f"‚úÖ Model saved successfully, files in {save_path}!")

# ===================== 8. Final Summary =====================
print("\n" + "="*50)
print("üéâ Step 8 of Credit Scorecard Completed!")
print("="*50)
print(f"Core Result: Scores not all 600 = {'‚úÖ' if len(np.unique(train_scores))>1 else '‚ö†Ô∏è'}")
print(f"Performance Qualified: AUC‚â•0.5 = {'‚úÖ' if auc>=0.5 else '‚ö†Ô∏è'} | KS‚â•0.3 = {'‚úÖ' if ks>=0.3 else '‚ö†Ô∏è'}")
print(f"Stability Qualified: PSI‚â§0.1 = {'‚úÖ' if psi<=0.1 else '‚ö†Ô∏è'} | Monotonicity Qualified = {'‚úÖ' if mono else '‚ö†Ô∏è'}")
print("üìå Newbie Standard: Pass if scores are not all 600 + AUC‚â•0.5!")
print("="*50)

===== Step 1: Pre-check Prerequisite Variables + Generate Binned Features =====
‚úÖ All prerequisite variables are available!
‚úÖ Binned feature generated: age_bin
‚úÖ Binned feature generated: duration_bin
‚úÖ Binned feature generated: credit_amount_bin
‚úÖ Binned feature generated: credit_per_age_bin
‚úÖ Binned feature generated: monthly_repayment_bin

===== Step 2: Feature Extraction & Scorecard Matching =====
Valid features in scorecard: ['age_bin', 'credit_history', 'credit_amount_bin', 'credit_per_age_bin', 'duration_bin', 'housing', 'monthly_repayment_bin', 'purpose', 'saving_accounts']
Binned features (with _bin) in df: ['age_bin', 'duration_bin', 'credit_amount_bin', 'credit_per_age_bin', 'monthly_repayment_bin']
Final binned features (with _bin) for scoring: ['duration_bin', 'monthly_repayment_bin', 'age_bin', 'credit_per_age_bin', 'credit_amount_bin']
‚úÖ Feature matching completed!

===== Step 3: Logistic Regression Model Training =====
‚úÖ Model training completed, using 9