In [1]:
import numpy as np
import pandas as pd
from imblearn.over_sampling import RandomOverSampler
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.feature_selection import SelectFromModel
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_auc_score, log_loss, precision_score, recall_score, f1_score, \
    brier_score_loss, roc_curve, average_precision_score
from scipy.stats import ks_2samp
from lightgbm import LGBMClassifier
from sklearn.model_selection import ParameterGrid
import os
import pickle

# VisionPermutator 类定义 (保持不变)
class VisionPermutator(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.scaler = StandardScaler()
        self.minmax_scaler = MinMaxScaler()
        self.robust_scaler = RobustScaler()
        self.selector = SelectFromModel(LogisticRegression(penalty="l1", solver="liblinear"))
        self.feature_mapping = {}

    def fit(self, X, y=None):
        self.scaler.fit(X)
        self.minmax_scaler.fit(X)
        self.robust_scaler.fit(X)
        self.selector.fit(X, y)
        return self

    def transform(self, X):
        X_scaled = self.scaler.transform(X)
        X_minmax = self.minmax_scaler.transform(X)
        X_robust = self.robust_scaler.transform(X)
        X_selected = self.selector.transform(X)

        # 记录原始特征的映射
        self.record_feature_mapping(X, X_scaled, 'scaled')
        self.record_feature_mapping(X, X_minmax, 'minmax')
        self.record_feature_mapping(X, X_robust, 'robust')
        self.record_feature_mapping(X, X_selected, 'selected')

        X_selected_expanded = self.expand_selected_features(X_selected, X.shape[1])

        X_transformed = self.permute_mlp(pd.DataFrame(X_scaled, columns=X.columns),
                                         pd.DataFrame(X_minmax, columns=X.columns),
                                         pd.DataFrame(X_robust, columns=X.columns),
                                         pd.DataFrame(X_selected_expanded, columns=X.columns))
        self.update_feature_mapping(X_transformed, X.columns)
        return X_transformed

    def record_feature_mapping(self, original_X, transformed_X, method):
        for i in range(transformed_X.shape[1]):
            original_feature = original_X.columns[i % original_X.shape[1]]
            self.feature_mapping[f'{method}_feature_{i}'] = original_feature

    def expand_selected_features(self, X_selected, n_features):
        expanded_features = np.zeros((X_selected.shape[0], n_features))
        selected_indices = np.where(self.selector.get_support())[0]
        for i, col_idx in enumerate(selected_indices):
            expanded_features[:, col_idx] = X_selected[:, i]
        return expanded_features

    def permute_mlp(self, X_scaled, X_minmax, X_robust, X_selected):
        X_height = self.encode_height(X_scaled, X_minmax, X_robust, X_selected)
        X_width = self.encode_width(X_scaled, X_minmax, X_robust, X_selected)
        X_weighted = self.weighted_permute_mlp(X_height, X_width)
        return X_weighted

    def encode_height(self, X_scaled, X_minmax, X_robust, X_selected):
        X_height_encoded = X_scaled.apply(lambda row: self.linear_projection(row, axis=0), axis=1)
        X_height_encoded += X_minmax.apply(lambda row: self.polynomial_projection(row, axis=0), axis=1)
        X_height_encoded += X_robust.apply(lambda row: self.nonlinear_projection(row, axis=0), axis=1)
        X_height_encoded += X_selected.apply(lambda row: self.selected_projection(row, axis=0), axis=1)
        return pd.DataFrame(X_height_encoded.tolist(), index=X_scaled.index)

    def encode_width(self, X_scaled, X_minmax, X_robust, X_selected):
        X_width_encoded = X_scaled.apply(lambda row: self.linear_projection(row, axis=1), axis=1)
        X_width_encoded += X_minmax.apply(lambda row: self.polynomial_projection(row, axis=1), axis=1)
        X_width_encoded += X_robust.apply(lambda row: self.nonlinear_projection(row, axis=1), axis=1)
        X_width_encoded += X_selected.apply(lambda row: self.selected_projection(row, axis=1), axis=1)
        return pd.DataFrame(X_width_encoded.tolist(), index=X_scaled.index)

    def linear_projection(self, row, axis):
        projection_vector = np.random.rand(row.shape[0])
        if axis == 0:
            projection = np.dot(row.values.reshape(-1, 1), projection_vector.reshape(1, -1))
        elif axis == 1:
            projection = np.dot(projection_vector.reshape(-1, 1), row.values.reshape(1, -1))
        projection = projection / np.linalg.norm(projection)  # 正则化
        return projection.flatten()

    def polynomial_projection(self, row, axis, degree=2):
        projection_vector = np.random.rand(row.shape[0])
        if axis == 0:
            projection = np.dot(row.values.reshape(-1, 1) ** degree, projection_vector.reshape(1, -1))
        elif axis == 1:
            projection = np.dot(projection_vector.reshape(-1, 1) ** degree, row.values.reshape(1, -1))
        projection = projection / np.linalg.norm(projection)  # 正则化
        return projection.flatten()

    def nonlinear_projection(self, row, axis):
        projection_vector = np.random.rand(row.shape[0])
        if axis == 0:
            projection = np.dot(np.sin(row.values.reshape(-1, 1)), projection_vector.reshape(1, -1))
        elif axis == 1:
            projection = np.dot(projection_vector.reshape(-1, 1), np.sin(row.values.reshape(1, -1)))
        projection = projection / np.linalg.norm(projection)  # 正则化
        return projection.flatten()

    def selected_projection(self, row, axis):
        projection_vector = np.random.rand(row.shape[0])
        if axis == 0:
            projection = np.dot(row.values.reshape(-1, 1), projection_vector.reshape(1, -1))
        elif axis == 1:
            projection = np.dot(projection_vector.reshape(-1, 1), row.values.reshape(1, -1))
        projection = projection / np.linalg.norm(projection)  # 正则化
        return projection.flatten()

    def weighted_permute_mlp(self, X_height, X_width):
        weight_height = np.random.rand()
        weight_width = 1 - weight_height
        X_weighted = weight_height * X_height + weight_width * X_width
        return X_weighted

    def update_feature_mapping(self, X_transformed, original_columns):
        for i in range(X_transformed.shape[1]):
            original_feature = original_columns[i % len(original_columns)]
            self.feature_mapping[f'feature_{i}'] = original_feature

def h_mean(precision, recall):
    if precision + recall == 0:
        return 0
    return 2 * (precision * recall) / (precision + recall)


def h_mean_score(y_true, y_pred):
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    return h_mean(precision, recall)


# 定义 type1error 和 type2error 函数
def type1error(y_proba, y_true, threshold=0.5):
    y_pred = (y_proba >= threshold).astype(int)
    fp = ((y_pred == 1) & (y_true == 0)).sum()
    return fp / (y_true == 0).sum()


def type2error(y_proba, y_true, threshold=0.5):
    y_pred = (y_proba >= threshold).astype(int)
    fn = ((y_pred == 0) & (y_true == 1)).sum()
    return fn / (y_true == 1).sum()

In [2]:
root_path = 'D:/study/Credit(1)/Credit/'
params_path = r'D:\study\Credit(1)\Credit\params/'
dataset_path = r'D:\study\credit_scoring_datasets/'
shuffle_path = r'D:\study\Credit(1)\Credit\shuffle_index/'
save_path = r'D:\study\second\outcome/'
os.makedirs(save_path, exist_ok=True)

data = pd.read_csv(r'D:\study\credit_scroing_datasets\shandong.csv', low_memory=True)
features = data.drop('label', axis=1).replace([-np.inf, np.inf], 0).fillna(0)
labels = data['label']

# 分割数据集
train_size = int(features.shape[0] * 0.8)
valid_size = int(features.shape[0] * 0.1)
test_size = valid_size  # 假设测试集大小与验证集相同

with open(shuffle_path + 'shandong/shuffle_index.pickle', 'rb') as f:
    shuffle_index = pickle.load(f)

train_index = shuffle_index[:train_size]
valid_index = shuffle_index[train_size:(train_size + valid_size)]
test_index = shuffle_index[(train_size + valid_size):(train_size + valid_size + test_size)]

train_x, train_y = features.iloc[train_index, :], labels.iloc[train_index]
valid_x, valid_y = features.iloc[valid_index, :], labels.iloc[valid_index]
test_x, test_y = features.iloc[test_index, :], labels.iloc[test_index]

full_train_x = pd.concat([train_x, valid_x], axis=0)
full_train_y = pd.concat([train_y, valid_y], axis=0)

In [3]:
vp = VisionPermutator()
vp.fit(full_train_x, full_train_y)
full_train_x_transformed = vp.transform(full_train_x)
test_x_transformed = vp.transform(test_x)

full_train_x_transformed = full_train_x_transformed.astype(float)
test_x_transformed = test_x_transformed.astype(float)

In [23]:
lgb_param_grid = {
    'n_estimators': [500],
    'max_depth': [4],
    'learning_rate': [0.05],
    'reg_lambda': [5]
}

param_combinations = list(ParameterGrid(lgb_param_grid))
results = []
positive_count = np.sum(full_train_y == 1)
negative_count = np.sum(full_train_y == 0)
scale_pos_weight = negative_count / positive_count

In [24]:
gamma_i = np.mean(full_train_x_transformed, axis=1)  # 示例定义
alpha1 = np.exp(gamma_i + 1)  # α1 = exp(γi + 1)
alpha2 = 2 - gamma_i          # α2 = 2 - γi
import lightgbm as lgb

def custom_eval_metric(y_pred, dataset):
    y_true = dataset.get_label()
    p = 1 / (1 + np.exp(-y_pred))  # sigmoid function
    gamma_i = np.mean(full_train_x_transformed, axis=1)
    alpha1 = np.exp(gamma_i + 1)
    alpha2 = 2 - gamma_i
    cost = - (alpha1 * y_true * np.log(p) + alpha2 * (1 - y_true) * np.log(1 - p))
    return 'custom_cost', np.mean(cost), False

for params_set in param_combinations:
    params = {
        'n_estimators': params_set['n_estimators'],
        'max_depth': params_set['max_depth'],
        'learning_rate': params_set['learning_rate'],
        'reg_lambda': params_set['reg_lambda'],
        'objective': 'binary',
        'metric': 'custom'
    }


In [25]:
    lgb_train = lgb.Dataset(full_train_x_transformed, full_train_y, weight=(alpha1 * full_train_y + alpha2 * (1 - full_train_y)))

    # 训练模型
    lgb_model = lgb.train(params, lgb_train, valid_sets=[lgb_train], feval=custom_eval_metric)

    # 预测和评估模型
    preds_proba = lgb_model.predict(test_x_transformed)
    preds = (preds_proba >= 0.5).astype(int)




LightGBMError: Length of weights differs from the length of #data

In [None]:
 auc_score = roc_auc_score(test_y, preds_proba)
    logloss = log_loss(test_y, preds_proba)
    ks = ks_2samp(preds_proba[test_y == 1], preds_proba[test_y != 1]).statistic
    accuracy = accuracy_score(test_y, preds)
    precision = precision_score(test_y, preds)
    recall = recall_score(test_y, preds)
    f1 = f1_score(test_y, preds)
    brier_score = brier_score_loss(test_y, preds_proba)
    average_precision = average_precision_score(test_y, preds_proba)
    fprs, tprs, thresholds = roc_curve(test_y, preds_proba)
    true_positive_rate = tprs
    true_negative_rate = 1 - fprs
    gmean = np.sqrt(true_positive_rate * true_negative_rate)

    # 计算 H-mean 和其他自定义指标
    hm = h_mean(precision, recall)

    # 计算 type1error 和 type2error
    type1_error = type1error(preds_proba, test_y)
    type2_error = type2error(preds_proba, test_y)

    # 计算 Acc AUC Prec Rec 的平均值
    average_score = (accuracy + auc_score + precision + recall) / 4

    # 将结果存入列表
    results.append({
        'params': params_set,
        'accuracy': accuracy,
        'auc_score': auc_score,
        'logloss': logloss,
        'ks_stat': ks,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'brier_score': brier_score,
        'average_precision': average_precision,
        'hm': hm,
        'gmean': gmean,
        'type1_error': type1_error,
        'type2_error': type2_error,
        'average_score': average_score
    })

    # 输出每个参数组合的结果
    print(f"Params: {params_set}, Accuracy: {accuracy}, AUC: {auc_score}, Average Score: {average_score}")

# 输出所有结果
print("所有结果：")
for result in results:
    print(result)