# 基于深度学习算法的信用卡风险客户分析

## 1. 初始化

In [1]:
import itertools

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix

## 2. 工具函数

### (1) IV 计算 `calc_iv(df, feature, target, pr=False)`

In [None]:
def calc_iv(df, feature, target, pr=False):
    lst = []
    df[feature] = df[feature].fillna("NULL")

    for i in range(df[feature].nunique()):
        val = list(df[feature].unique())[i]
        lst.append([feature,
                    val,
                    df[df[feature] == val].count()[feature],
                    df[(df[feature] == val) & (df[target] == 0)].count()[feature],
                    df[(df[feature] == val) & (df[target] == 1)].count()[feature]])

    iv_calc_data = pd.DataFrame(lst, columns=['Variable', 'Value', 'All', 'Good', 'Bad'])
    iv_calc_data['Share'] = iv_calc_data['All'] / iv_calc_data['All'].sum()
    iv_calc_data['Bad Rate'] = iv_calc_data['Bad'] / iv_calc_data['All']
    iv_calc_data['Distribution Good'] = (iv_calc_data['All'] - iv_calc_data['Bad']) / (
            iv_calc_data['All'].sum() - iv_calc_data['Bad'].sum())
    iv_calc_data['Distribution Bad'] = iv_calc_data['Bad'] / iv_calc_data['Bad'].sum()
    iv_calc_data['WoE'] = np.log(iv_calc_data['Distribution Good'] / iv_calc_data['Distribution Bad'])

    iv_calc_data = iv_calc_data.replace({'WoE': {np.inf: 0, -np.inf: 0}})

    iv_calc_data['IV'] = iv_calc_data['WoE'] * (iv_calc_data['Distribution Good'] - iv_calc_data['Distribution Bad'])

    iv_calc_data = iv_calc_data.sort_values(by=['Variable', 'Value'], ascending=[True, True])
    iv_calc_data.index = range(len(iv_calc_data.index))

    if pr:
        print(iv_calc_data)
        print('IV = ', iv_calc_data['IV'].sum())

    ivv = iv_calc_data['IV'].sum()
    print('IV =', ivv)
    print(df[feature].value_counts())
    return ivv, iv_calc_data

### (2) 转换零时数据 `convert_dummy(df, feature, rank=0)`

In [None]:
def convert_dummy(df, feature, rank=0):
    pos = pd.get_dummies(df[feature], prefix=feature)
    mode = df[feature].value_counts().index[rank]
    biggest = feature + '_' + str(mode)
    pos.drop([biggest], axis=1, inplace=True)
    df.drop([feature], axis=1, inplace=True)
    df = df.join(pos)
    return df

### (3) 非二值数据分类 `get_category(df, col, bins_num, labels, q_cut = False)`

In [None]:
def get_category(df, col, bins_num, labels, q_cut=False):
    if q_cut:
        ldf = pd.qcut(df[col], q=bins_num, labels=labels)
    else:
        ldf = pd.cut(df[col], bins=bins_num, labels=labels)

    ldf = pd.DataFrame(ldf)
    name = 'cat' + '_' + col
    ldf[name] = ldf[col]
    df = df.join(ldf[name])
    df[name] = df[name].astype(object)
    return df

### (4) 绘制混淆矩阵 `plot_confusion_matrix(cm, classes, m_type, normalize=True, c_map=plt.cm.Blues)`

In [None]:
def plot_confusion_matrix(cm, classes, m_type, normalize=True, c_map=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=c_map)
    plt.title(f'Confusion matrix - {m_type}')
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

    plt.savefig(f'./confusion_matrix_{str.lower(m_type)}.png', dpi=72)

### (5) 绘制训练图 `plot_train_history(his)`

In [None]:
def plot_train_history(his, m_type):
    plt.plot(his.history['accuracy'])
    plt.plot(his.history['val_accuracy'])
    plt.title(f'Model Accuracy - {m_type}')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

    plt.savefig(f'./model_accuracy_{str.lower(m_type)}', dpi=72)

    plt.plot(his.history['loss'])
    plt.plot(his.history['val_loss'])
    plt.title(f'Model Loss - {m_type}')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

    plt.savefig(f'./model_loss_{str.lower(m_type)}', dpi=72)

## 3. 整理原始数据集

### (1) 数据导入

In [None]:
# 导入数据集

data = pd.read_csv('./data/application_record.csv', encoding='utf-8')
record = pd.read_csv('./data/credit_record.csv', encoding='utf-8')

### (2) 获得 Y 数据标定

In [None]:
# 计算客户开户月份，并合并到主数据集

open_month = pd.DataFrame(record.groupby(['ID'])['MONTHS_BALANCE'].agg(min))
open_month = open_month.rename(columns={'MONTHS_BALANCE': 'open_month'})
calc_data = pd.merge(data, open_month, how='left', on='ID')

In [None]:
# 设定逾期超过 60 日的为风险客户

# 标记 STATUS 为 2、3、4、5 的为风险客户
record['is_risky'] = None
record.loc[record['STATUS'] == '2', 'is_risky'] = 'Y'
record.loc[record['STATUS'] == '3', 'is_risky'] = 'Y'
record.loc[record['STATUS'] == '4', 'is_risky'] = 'Y'
record.loc[record['STATUS'] == '5', 'is_risky'] = 'Y'

# 按照 ID 进行数据合并
g = record.groupby('ID').count()

# 标记风险客户
g['is_risky'][g['is_risky'] > 0] = 'Y'
g['is_risky'][g['is_risky'] == 0] = 'N'
g = g[['is_risky']]

calc_data = pd.merge(calc_data, g, how='inner', on='ID')
calc_data['risk'] = calc_data['is_risky']
calc_data.loc[calc_data['risk'] == 'Y', 'risk'] = 1
calc_data.loc[calc_data['risk'] == 'N', 'risk'] = 0

In [None]:
# 查看风险客户数量

print(g['is_risky'].value_counts(sort=False))
g['is_risky'].value_counts(normalize=True, sort=False)

### (3) 整理列名称和去除空数据

In [None]:
# 为了方便处理数据，重命名原本的数据列

calc_data.rename(columns={
    'CODE_GENDER': 'gender',
    'FLAG_OWN_CAR': 'car',
    'FLAG_OWN_REALTY': 'realty',
    'CNT_CHILDREN': 'children',
    'AMT_INCOME_TOTAL': 'income',
    'NAME_INCOME_TYPE': 'income_type',
    'NAME_EDUCATION_TYPE': 'edu',
    'NAME_FAMILY_STATUS': 'family',
    'NAME_HOUSING_TYPE': 'housing',
    'FLAG_MOBIL': 'mobile',
    'FLAG_WORK_PHONE': 'wk_phone',
    'FLAG_PHONE': 'phone',
    'FLAG_EMAIL': 'email',
    'OCCUPATION_TYPE': 'occupation',
    'CNT_FAM_MEMBERS': 'family_size'
}, inplace=True)

In [None]:
# 移除 NULL 数据

calc_data.dropna()
calc_data = calc_data.mask(calc_data == 'NULL').dropna()

In [None]:
iv_data = pd.DataFrame(calc_data.columns, columns=['vars'])
iv_data['iv'] = None
namelist = ['FLAG_MOBIL', 'open_month', 'is_risky', 'risk', 'ID']

for n in namelist:
    iv_data.drop(iv_data[iv_data['vars'] == 1].index, inplace=True)

## 4. 二值数据处理

### (1) 性别

In [None]:
# 性别

param = 'gender'

calc_data[param] = calc_data[param].replace(['F', 'M'], [0, 1])
iv, ivd = calc_iv(calc_data, param, 'risk')
iv_data.loc[iv_data['vars'] == param, 'iv'] = iv
ivd.head()

### (2) 是否拥有汽车

In [None]:
# 是否拥有汽车

param = 'car'

calc_data[param] = calc_data[param].replace(['N', 'Y'], [0, 1])
iv, ivd = calc_iv(calc_data, param, 'risk')
iv_data.loc[iv_data['vars'] == param, 'iv'] = iv
ivd.head()

### (3) 是否拥有不动产

In [None]:
# 是否拥有不动产

param = 'realty'

calc_data[param] = calc_data[param].replace(['N', 'Y'], [0, 1])
iv, ivd = calc_iv(calc_data, param, 'risk')
iv_data.loc[iv_data['vars'] == param, 'iv'] = iv
ivd.head()

### (4) 是否拥有电话

In [None]:
# 是否拥有电话

param = 'phone'

calc_data[param] = calc_data[param].astype(str)
calc_data.drop(calc_data[calc_data[param] == 'nan'].index, inplace=True)
iv, ivd = calc_iv(calc_data, param, 'risk')
iv_data.loc[iv_data['vars'] == param, 'iv'] = iv
ivd.head()

### (5) 是否拥有工作电话

In [None]:
# 是否拥有工作电话

param = 'wk_phone'

calc_data[param] = calc_data[param].astype(str)
calc_data.drop(calc_data[calc_data[param] == 'nan'].index, inplace=True)
iv, ivd = calc_iv(calc_data, param, 'risk')
iv_data.loc[iv_data['vars'] == param, 'iv'] = iv
ivd.head()

### (6) 是否拥有电子邮箱

In [None]:
# 是否拥有电子邮箱

param = 'email'

calc_data[param] = calc_data[param].astype(str)
calc_data.drop(calc_data[calc_data[param] == 'nan'].index, inplace=True)
iv, ivd = calc_iv(calc_data, param, 'risk')
iv_data.loc[iv_data['vars'] == param, 'iv'] = iv
ivd.head()

## 5. 连续数值数据处理

### (1) 拥有孩子数量

In [None]:
calc_data.loc[calc_data['children'] >= 2, 'children'] = '2+'
iv, ivd = calc_iv(calc_data, 'children', 'risk')
iv_data.loc[iv_data['vars'] == 'children', 'iv'] = iv
calc_data = convert_dummy(calc_data, 'children')
ivd.head()

### (2) 年收入

In [None]:
ann_income = calc_data['income'].astype(object)
ann_income = ann_income / 10000
print(ann_income.value_counts(bins=10, sort=False))
ann_income.plot(kind='hist', bins=50, density=True)

print('')

calc_data = get_category(calc_data, 'income', 3, ['low', 'medium', 'high'], q_cut=True)
iv, ivd = calc_iv(calc_data, 'cat_income', 'risk')
iv_data.loc[iv_data['vars'] == 'income', 'iv'] = iv
calc_data = convert_dummy(calc_data, 'cat_income')
ivd.head()

### (3) 年龄

In [None]:
calc_data['age'] = - (calc_data['DAYS_BIRTH']) // 365
print(calc_data['age'].value_counts(bins=10, normalize=True, sort=False))
calc_data['age'].plot(kind='hist', bins=20, density=True)

print('')

calc_data = get_category(calc_data, 'age', 5, ['lowest', "low", 'medium', 'high', 'highest'], q_cut=True)
iv, ivd = calc_iv(calc_data, 'cat_age', 'risk')
iv_data.loc[iv_data['vars'] == 'age', 'iv'] = iv
calc_data = convert_dummy(calc_data, 'cat_age')
ivd.head()

### (4) 工龄

In [None]:
calc_data['work'] = - (calc_data['DAYS_EMPLOYED']) // 365
calc_data[calc_data['work'] < 0] = np.nan
calc_data['work'].fillna(calc_data['work'].mean(), inplace=True)
calc_data['work'].plot(kind='hist', bins=20, density=True)

calc_data = get_category(calc_data, 'work', 5, ["lowest", "low", "medium", "high", "highest"])
iv, ivd = calc_iv(calc_data, 'cat_work', 'risk')
iv_data.loc[iv_data['vars'] == 'DAYS_EMPLOYED', 'iv'] = iv
calc_data = convert_dummy(calc_data, 'cat_work')
ivd.head()

### (5) 家庭成员数量

In [None]:
calc_data['family_size'] = calc_data['family_size'].astype(int)
calc_data['family_size_group'] = calc_data['family_size']
calc_data['family_size_group'] = calc_data['family_size_group'].astype(object)
calc_data.loc[calc_data['family_size_group'] >= 3, 'family_size_group'] = '3+'
iv, ivd = calc_iv(calc_data, 'family_size_group', 'risk')
iv_data.loc[iv_data['vars'] == 'family_size', 'iv'] = iv
calc_data = convert_dummy(calc_data, 'family_size_group')
ivd.head()

## 6. 枚举数据

### (1) 收入类型

In [None]:
print(calc_data['income_type'].value_counts(sort=False))
print("")
print(calc_data['income_type'].value_counts(normalize=True, sort=False))

In [None]:
calc_data.loc[calc_data['income_type'] == 'Pensioner', 'income_type'] = 'State servant'
calc_data.loc[calc_data['income_type'] == 'Student', 'income_type'] = 'State servant'
iv, ivd = calc_iv(calc_data, 'income_type', 'risk')
iv_data.loc[iv_data['vars'] == 'income_type', 'iv'] = iv
ivd.head()

In [None]:
calc_data = convert_dummy(calc_data, 'income_type')

### (2) 职业类型

In [None]:
print(calc_data['occupation'].value_counts(sort=False))

In [None]:
calc_data.loc[
    (calc_data['occupation'] == 'Security staff') |
    (calc_data['occupation'] == 'Laborers') |
    (calc_data['occupation'] == 'Drivers') |
    (calc_data['occupation'] == 'Cleaning staff') |
    (calc_data['occupation'] == 'Cooking staff') |
    (calc_data['occupation'] == 'Low-skill Laborers') |
    (calc_data['occupation'] == 'Waiters/barmen staff'), 'occupation'] = 'low'

calc_data.loc[
    (calc_data['occupation'] == 'Sales staff') |
    (calc_data['occupation'] == 'Accountants') |
    (calc_data['occupation'] == 'Core staff') |
    (calc_data['occupation'] == 'Private service staff') |
    (calc_data['occupation'] == 'Medicine staff') |
    (calc_data['occupation'] == 'Secretaries') |
    (calc_data['occupation'] == 'HR staff') |
    (calc_data['occupation'] == 'Realty agents'), 'occupation'] = 'medium'

calc_data.loc[
    (calc_data['occupation'] == 'Managers') |
    (calc_data['occupation'] == 'High skill tech staff') |
    (calc_data['occupation'] == 'IT staff'), 'occupation'] = 'high'

In [None]:
iv, ivd = calc_iv(calc_data, 'occupation', 'risk')
iv_data.loc[iv_data['vars'] == 'occupation', 'iv'] = iv
ivd.head()

In [None]:
calc_data = convert_dummy(calc_data, 'occupation')

### (3) 住房类型

In [None]:
print(calc_data['housing'].value_counts(sort=False))

In [None]:
iv, ivd = calc_iv(calc_data, 'housing', 'risk')
iv_data.loc[iv_data['vars'] == 'housing', 'iv'] = iv
ivd.head()

In [None]:
calc_data = convert_dummy(calc_data, 'housing')

### (4) 教育水平

In [None]:
print(calc_data['edu'].value_counts(sort=False))

In [None]:
calc_data.loc[calc_data['edu'] == 'Academic degree', 'edu'] = 'Higher education'
iv, ivd = calc_iv(calc_data, 'edu', 'risk')
iv_data.loc[iv_data['vars'] == 'edu', 'iv'] = iv
ivd.head()

In [None]:
calc_data = convert_dummy(calc_data, 'edu')

### (5) 婚姻状况

In [None]:
print(calc_data['family'].value_counts(sort=False))

In [None]:
iv, ivd = calc_iv(calc_data, 'family', 'risk')
iv_data.loc[iv_data['vars'] == 'family', 'iv'] = iv
ivd.head()

In [None]:
calc_data = convert_dummy(calc_data, 'family')

## 7. IV 变量值

In [None]:
iv_data = iv_data.sort_values(by='iv', ascending=False)
iv_data.loc[iv_data['vars'] == 'DAYS_BIRTH', 'vars'] = 'age_group'
iv_data.loc[iv_data['vars'] == 'DAYS_EMPLOYED', 'vars'] = 'work_age_group'
iv_data

## 8. 创建训练与测试数据集

In [None]:
calc_data.columns

In [None]:
Y = calc_data['risk']
X = calc_data[[
    'gender', 'realty', 'wk_phone', 'children_1', 'children_2+',
    'cat_age_low', 'cat_age_medium', 'cat_age_high', 'cat_age_highest',
    'cat_work_low', 'cat_work_medium', 'cat_work_high', 'cat_work_highest',
    'occupation_medium', 'occupation_high',
    'family_size_group_1', 'family_size_group_3+',
    'housing_Co-op apartment', 'housing_Rented apartment', 'housing_Municipal apartment', 'housing_Office apartment',
    'housing_With parents',
    'edu_Higher education', 'edu_Incomplete higher', 'edu_Lower secondary',
    'family_Separated', 'family_Single / not married', 'family_Civil marriage', 'family_Widow'
]]

In [None]:
Y = Y.astype(int)
X_b, Y_b = SMOTE().fit_resample(X, Y)
X_b = pd.DataFrame(X_b, columns=X.columns)

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X_b, Y_b, stratify=Y_b, test_size=0.3, random_state=10000)
X_train = np.array(X_train).astype('float32')
X_test = np.array(X_test).astype('float32')
Y_train = np.array(Y_train).astype('int')
Y_test = np.array(Y_test).astype('int')

In [None]:
print(f'Input Columns: {X_train.shape[1]}')
print('Output Columns: 1')
print(f'Trains: {X_train.shape[0]}')
print(f'Tests: {X_test.shape[0]}')

## 8. 随机森林模型

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
rf_model = RandomForestClassifier(n_estimators=300, max_depth=20, min_samples_leaf=16)

In [None]:
rf_model.fit(X_train, Y_train)

In [None]:
rf_predict_result = rf_model.predict(X_test)

In [None]:
print('Accuracy Score is {:.5}'.format(accuracy_score(Y_test, rf_predict_result)))
print(pd.DataFrame(confusion_matrix(Y_test, rf_predict_result)))
plot_confusion_matrix(confusion_matrix(Y_test, rf_predict_result), ['0', '1'], "Random Forest")

## 9. 训练 BP 网络

In [None]:
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense

关于隐藏层节点数

设 `l` 为节点数，`n` 为输入层节点数，`m` 为输出层节点数，`a` 为 0-10 的常数，有：

$$
l < n-1
$$
$$
l < \sqrt{(m+n)} + a
$$
$$
l < log_2n
$$

In [None]:
bp_input_nodes = 29
bp_output_nodes = 1
bp_a = 5

bp_hide_layer_lim1 = bp_input_nodes - 1
bp_hide_layer_lim2 = np.sqrt(bp_input_nodes + bp_output_nodes) + bp_a
bp_hide_layer_lim3 = np.log2(bp_input_nodes)

print(f'l < {bp_hide_layer_lim1}')
print(f'l < {bp_hide_layer_lim2}')
print(f'l < {bp_hide_layer_lim3}')

In [None]:
bp_model = Sequential()

bp_model.add(Dense(units=29, activation='relu', input_shape=(29,)))
bp_model.add(Dense(units=4, activation='softmax'))
bp_model.add(Dense(units=1))

bp_model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
bp_model.summary()

In [None]:
bp_history = bp_model.fit(X_train, Y_train, epochs=500, batch_size=128, validation_split=0.3)

In [None]:
plot_train_history(bp_history, 'BP')

In [None]:
bp_model.save('./model')

In [None]:
bp_predict_result = bp_model.predict(X_test)
bp_predict_result = pd.DataFrame(bp_predict_result)

bp_predict_result[bp_predict_result > 1] = 1
bp_predict_result[bp_predict_result < 0] = 0

bp_predict_result = bp_predict_result.round()

In [None]:
print('Accuracy Score is {:.5}'.format(accuracy_score(Y_test, bp_predict_result)))
print(pd.DataFrame(confusion_matrix(Y_test, bp_predict_result)))
plot_confusion_matrix(confusion_matrix(Y_test, bp_predict_result), ['0', '1'], "BP")