# Data Preparation

In [1]:
import pandas as pd


credit = pd.read_csv('../datasets/Credit.csv', index_col=0)

In [2]:
credit.head()

Unnamed: 0,Default,checkingstatus1,duration,history,purpose,amount,savings,employ,installment,sex,residence,age,housing,cards,liable,tele,foreign
1,0,<DM0,6,terrible,goods/repair,1169,DM0-100,7+years,4,male,4,67,own,2,1,yes,foreign
2,1,DM0-200,48,poor,goods/repair,5951,DM0-100,1-7years,2,female,2,22,own,1,1,no,foreign
3,0,NoAccount,12,terrible,edu,2096,DM0-100,1-7years,2,male,3,49,own,1,2,no,foreign
4,0,<DM0,42,poor,goods/repair,7882,DM0-100,1-7years,2,male,4,45,forfree,1,2,no,foreign
5,1,<DM0,24,poor,newcar,4870,DM0-100,1-7years,3,male,4,53,forfree,2,2,no,foreign


In [3]:
from sklearn.preprocessing import LabelEncoder


credit_processed = credit.copy()


label_encoders = {}  
for column in credit_processed.select_dtypes(include=['object']).columns:
    le = LabelEncoder()
    credit_processed[column] = le.fit_transform(credit_processed[column])
    label_encoders[column] = le


credit_processed = credit_processed.rename(columns={'Default': 'target'})

credit_processed.head()


Unnamed: 0,target,checkingstatus1,duration,history,purpose,amount,savings,employ,installment,sex,residence,age,housing,cards,liable,tele,foreign
1,0,0,6,2,2,1169,0,2,4,1,4,67,1,2,1,1,0
2,1,1,48,1,2,5951,0,1,2,0,2,22,1,1,1,0,0
3,0,3,12,2,1,2096,0,1,2,1,3,49,1,1,2,0,0
4,0,0,42,1,2,7882,0,1,2,1,4,45,0,1,2,0,0
5,1,0,24,1,3,4870,0,1,3,1,4,53,0,2,2,0,0


In [65]:
# from sklearn.preprocessing import OneHotEncoder

# credit_processed = credit.copy()

# # 初始化OneHotEncoder
# encoder = OneHotEncoder(sparse=False, drop='first')  # drop='first'可以避免多重共线性

# # 选择要进行one-hot编码的列
# categorical_columns = credit_processed.select_dtypes(include=['object']).columns

# # 进行one-hot编码
# encoded_features = encoder.fit_transform(credit_processed[categorical_columns])
# encoded_df = pd.DataFrame(encoded_features, columns=encoder.get_feature_names_out(categorical_columns))

# # 删除原始的分类列
# credit_processed.drop(categorical_columns, axis=1, inplace=True)

# # 将one-hot编码的列添加到数据框中
# credit_processed = pd.concat([credit_processed, encoded_df], axis=1)

# credit_processed = credit_processed.rename(columns={'Default': 'target'})

# credit_processed.head()


`sparse` was renamed to `sparse_output` in version 1.2 and will be removed in 1.4. `sparse_output` is ignored unless you leave `sparse` to its default value.


Unnamed: 0,target,duration,amount,installment,residence,age,cards,liable,checkingstatus1_DM0-200,checkingstatus1_DM200+,...,savings_DM100-1000,savings_DM1000+,employ_1-7years,employ_7+years,employ_unemployed,sex_male,housing_own,housing_rent,tele_yes,foreign_german
1,0.0,6.0,1169.0,4.0,4.0,67.0,2.0,1.0,1.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
2,1.0,48.0,5951.0,2.0,2.0,22.0,1.0,1.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0
3,0.0,12.0,2096.0,2.0,3.0,49.0,1.0,2.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
4,0.0,42.0,7882.0,2.0,4.0,45.0,1.0,2.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
5,1.0,24.0,4870.0,3.0,4.0,53.0,2.0,2.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0


In [4]:
encoder_mappings = {}

for column, encoder in label_encoders.items():
    encoder_mappings[column] = list(zip(encoder.classes_, range(len(encoder.classes_))))

encoder_mappings


{'checkingstatus1': [('<DM0', 0),
  ('DM0-200', 1),
  ('DM200+', 2),
  ('NoAccount', 3)],
 'history': [('good', 0), ('poor', 1), ('terrible', 2)],
 'purpose': [('biz', 0),
  ('edu', 1),
  ('goods/repair', 2),
  ('newcar', 3),
  ('usedcar', 4)],
 'savings': [('DM0-100', 0), ('DM100-1000', 1), ('DM1000+', 2)],
 'employ': [('0-1year', 0),
  ('1-7years', 1),
  ('7+years', 2),
  ('unemployed', 3)],
 'sex': [('female', 0), ('male', 1)],
 'housing': [('forfree', 0), ('own', 1), ('rent', 2)],
 'tele': [('no', 0), ('yes', 1)],
 'foreign': [('foreign', 0), ('german', 1)]}

## xgboost classification

In [5]:
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report
from sklearn.preprocessing import MinMaxScaler

credit_processed = credit_processed.dropna(subset=['target'])

X = credit_processed.drop(columns=['target'])
y = credit_processed['target']
print(y)


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=26)

scaler = MinMaxScaler(feature_range=(-1, 1))
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

clf_xgb = xgb.XGBClassifier(objective='binary:logistic', random_state=26)
clf_xgb.fit(X_train_scaled, y_train)


y_pred = clf_xgb.predict(X_test_scaled)


print(f"F1 Score: {f1_score(y_test, y_pred)}")

print(classification_report(y_test, y_pred))


1       0
2       1
3       0
4       0
5       1
       ..
996     0
997     0
998     0
999     1
1000    0
Name: target, Length: 1000, dtype: int64
F1 Score: 0.5283018867924528
              precision    recall  f1-score   support

           0       0.83      0.83      0.83       147
           1       0.53      0.53      0.53        53

    accuracy                           0.75       200
   macro avg       0.68      0.68      0.68       200
weighted avg       0.75      0.75      0.75       200



In [6]:
continuous_features = ['duration', 'amount', 'installment', 'residence', 'age', 'cards', 'liable']
categorical_features = ['checkingstatus1', 'history', 'purpose', 'savings', 'employ', 'sex', 'housing', 'tele', 'foreign']
immutable_features = ['sex', 'liable', 'foreign', 'purpose']
non_decreasing_features = ['age', 'employ']
correlated_features = []

# ALIBI

In [19]:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline

import tensorflow as tf
import tensorflow.keras as keras
from alibi.explainers import Counterfactual
from alibi.explainers import CounterfactualRLTabular, CounterfactualRL
from alibi.explainers import CounterfactualProto
#from alibi.datasets import fetch_adult
from alibi.models.tensorflow import HeAE
from alibi.models.tensorflow import Actor, Critic
#from alibi.models.tensorflow import ADULTEncoder, ADULTDecoder
from alibi.explainers.cfrl_base import Callback
from alibi.explainers.backends.cfrl_tabular import get_he_preprocessor, get_statistics, \
    get_conditional_vector, apply_category_mapping


In [71]:
def predict_fn(X):
    # The predict_proba method of the pipeline returns an array of shape (n_samples, 2)
    # Return both columns as the CounterfactualProto explainer expects a probability for each class
    pred_proba = loaded_model.predict_proba(X)
    return np.hstack([1 - pred_proba[:, 1].reshape(-1, 1), pred_proba[:, 1].reshape(-1, 1)])


In [72]:
# Assuming X_train is a pandas DataFrame
X_train = X_train.drop(columns=['target'])

# Calculate the min and max values for the entire training dataset
feature_min = np.min(X_train, axis=0)
feature_max = np.max(X_train, axis=0)


tf.compat.v1.disable_eager_execution()

# Initialize CounterFactualProto
shape = (1,) + X_train.shape[1:]
# Initialize the explainer
cf = CounterfactualProto(predict_fn, shape, use_kdtree=True, theta=10., max_iterations=1000,
                         feature_range=(feature_min, feature_max), 
                         c_init=1., c_steps=10)
cf.fit(X_train.values)


KeyError: "['target'] not found in axis"

In [None]:
instances = X_test[y_pred == 1].values

# List to store counterfactuals
counterfactuals = []

start_time = time.time()
# Loop through each instance and generate counterfactual


for instance in instances:
    explanation = cf.explain(instance.reshape(1, -1))
    
    # Check if a counterfactual was found
    if explanation.cf is not None:
        counterfactuals.append(explanation.cf['X'])
    else:
        # You can append a placeholder or simply skip
        # Here, I'm appending None to indicate no counterfactual was found for this instance
        counterfactuals.append(None)

# Convert the list of counterfactuals to a numpy array for further processing
# Note: If you appended None for missing counterfactuals, you might want to handle them before converting to an array
# counterfactuals_array = np.array(counterfactuals).squeeze()
# Strategy 1: Replace with a specific value
counterfactuals_replaced = [cf if cf is not None else -1 for cf in counterfactuals]

counterfactuals_mean = [cf if cf is not None else mean_value for cf in counterfactuals]

# Convert the list of counterfactuals to a numpy array
counterfactuals_array_replaced = np.array(counterfactuals_replaced).squeeze()


# Stop the timer
end_time = time.time()

# Calculate the elapsed time
elapsed_time = end_time - start_time

No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!
No counterfactual found!


# Alibi rl

In [7]:
import tensorflow as tf
from tensorflow import keras

class CreditEncoder(keras.Model):
    def __init__(self, hidden_dim, latent_dim):
        super(CreditEncoder, self).__init__()
        self.dense1 = keras.layers.Dense(hidden_dim, activation='relu')
        self.dense2 = keras.layers.Dense(latent_dim, activation='relu')

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        return x


In [8]:
class CreditDecoder(keras.Model):
    def __init__(self, hidden_dim, output_dims):
        super(CreditDecoder, self).__init__()
        self.dense1 = keras.layers.Dense(hidden_dim, activation='relu')
        self.outputs_list = [keras.layers.Dense(dim, activation='sigmoid') for dim in output_dims]

    def call(self, inputs):
        x = self.dense1(inputs)
        return [output_layer(x) for output_layer in self.outputs_list]


In [9]:
class HeAE(keras.Model):
    def __init__(self, encoder: keras.Model, decoder: keras.Model):
        super(HeAE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def call(self, x):
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat


In [10]:
import numpy as np

# 定义特征类型
feature_types = {
    "checkingstatus1": str,
    "history": str,
    "purpose": str,
    "savings": str,
    "employ": str,
    "sex": str,
    "housing": str,
    "tele": str,
    "foreign": str,
    "duration": int,
    "amount": int,
    "installment": int,
    "residence": int,
    "age": int,
    "cards": int,
    "liable": int
}

# 定义预处理函数
def credit_preprocessor(data):
    processed_data = data.copy()
    for column, dtype in feature_types.items():
        if dtype == str:
            processed_data[column] = label_encoders[column].transform(processed_data[column])
    return processed_data.astype(np.float32)


In [11]:
def credit_inv_preprocessor(data):
    inv_processed_data = data.copy()
    for column, dtype in feature_types.items():
        if dtype == str:
            inv_processed_data[column] = label_encoders[column].inverse_transform(inv_processed_data[column].astype(int))
    return inv_processed_data


In [12]:
def get_credit_preprocessor(X, feature_names, category_map, feature_types):
    # 在这里定义您的预处理逻辑
    def preprocessor(data):
        # 对数据进行预处理
        return processed_data

    def inv_preprocessor(data):
        # 对数据进行反向预处理
        return inv_processed_data

    return preprocessor, inv_preprocessor


In [13]:
# 定义数值特征和分类特征
numerical_features = ['duration', 'amount', 'installment', 'residence', 'age', 'cards', 'liable']
categorical_features = ['checkingstatus1', 'history', 'purpose', 'savings', 'employ', 'sex', 'housing', 'tele', 'foreign']

# 获取这些特征的索引，同时排除第一列（目标列）
numerical_ids = [credit.columns.get_loc(feature) - 1 for feature in numerical_features]
categorical_ids = [credit.columns.get_loc(feature) - 1 for feature in categorical_features]


In [14]:
# 定义常数
EPOCHS = 50
HIDDEN_DIM = 128
LATENT_DIM = 15

# 根据您的数据定义输出维度
OUTPUT_DIMS = [len(numerical_ids)]
OUTPUT_DIMS += [len(encoder_mappings[cat_id]) for cat_id in categorical_features]

# 定义异构自编码器
heae = HeAE(encoder=CreditEncoder(hidden_dim=HIDDEN_DIM, latent_dim=LATENT_DIM),
            decoder=CreditDecoder(hidden_dim=HIDDEN_DIM, output_dims=OUTPUT_DIMS))

# ... 其他代码（如损失函数、优化器、训练等） ...


In [15]:
# 定义损失函数
he_loss = [keras.losses.MeanSquaredError()]
he_loss_weights = [1.]

# 添加分类损失
for i in range(len(categorical_features)):
    he_loss.append(keras.losses.SparseCategoricalCrossentropy(from_logits=True))
    he_loss_weights.append(1./len(categorical_features))

# 定义指标
metrics = {}
for i, cat_name in enumerate(categorical_features):
    metrics.update({f"output_{i+2}": keras.metrics.SparseCategoricalAccuracy()})

# 定义优化器
optimizer = keras.optimizers.Adam(learning_rate=1e-3)


In [16]:
from sklearn.preprocessing import StandardScaler

# 初始化标准化器
scaler = StandardScaler()

def heae_preprocessor(data):
    processed_data = data.copy()
    
    # 对数值特征进行标准化
    processed_data[numerical_features] = scaler.fit_transform(processed_data[numerical_features])
    
    # 将分类特征转换为整数编码
    for column in categorical_features:
        processed_data[column] = label_encoders[column].transform(processed_data[column])
    
    return processed_data.astype(np.float32)


In [17]:
# 编译模型
heae.compile(optimizer=optimizer,
             loss=he_loss,
             loss_weights=he_loss_weights,
             metrics=metrics)

# 定义训练数据集
# 注意：您需要确保您的数据已经经过了适当的预处理
# 使用整个数据集重新训练LabelEncoder
for column in categorical_features:
    label_encoders[column].fit(X_train[column])


# 然后，您可以继续使用heae_preprocessor函数进行预处理

trainset_input = heae_preprocessor(X_train).astype(np.float32)
trainset_outputs = {
    "output_1": trainset_input.iloc[:, :len(numerical_features)]

}

for i, cat_id in enumerate(categorical_features):
    trainset_outputs.update({
        f"output_{i+2}": X_train.loc[:, cat_id]
    })

trainset = tf.data.Dataset.from_tensor_slices((trainset_input, trainset_outputs))
trainset = trainset.shuffle(1024).batch(128, drop_remainder=True)

# 训练模型
heae.fit(trainset, epochs=EPOCHS)


Epoch 1/50


  output, from_logits = _get_logits(


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50


Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50


Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50


Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50


Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x12d0aa00910>

In [23]:
encoder = CreditEncoder(hidden_dim=HIDDEN_DIM, latent_dim=LATENT_DIM)
decoder = CreditDecoder(hidden_dim=HIDDEN_DIM, output_dims=OUTPUT_DIMS)

category_map = {feature: list(X[feature].unique()) for feature in categorical_features}

class XGBoostWrapper:
    def __init__(self, model):
        self.model = model

    def predict(self, instances):
        return self.model.predict_proba(instances)

# Wrap the XGBoost model
wrapped_model = XGBoostWrapper(clf_xgb)

# 3. Initialize the Counterfactual Generator
cf = CounterfactualRLTabular(
    predictor=wrapped_model.predict,
    encoder=encoder,
    decoder=decoder,
    encoder_preprocessor=heae_preprocessor,
    decoder_inv_preprocessor=credit_inv_preprocessor,
    coeff_sparsity=0.5,
    coeff_consistency=0.5,
    feature_names=list(X.columns),
    category_map=category_map,
    immutable_features=immutable_features,
    ranges=None,  # You can define the ranges as described in the documentation if needed
    weight_num=1.0,
    weight_cat=1.0,
    latent_dim=LATENT_DIM,
    backend='tensorflow',
    seed=0
)

# 4. Generate Counterfactuals
# For demonstration, let's take the first instance from the test set
instance = X_test_scaled[0].reshape(1, -1)
# 假设你的目标类别是0
target_class = np.array([[0]])

# 生成反事实
explanation = cf.explain(instance, Y_t=target_class)

# Print the counterfactual
print("Original instance:", instance)
print("Counterfactual instance:", explanation.cf['X'])

# encoder = CreditEncoder(hidden_dim=HIDDEN_DIM, latent_dim=LATENT_DIM)
# decoder = CreditDecoder(hidden_dim=HIDDEN_DIM, output_dims=OUTPUT_DIMS)

# category_map = {feature: list(X[feature].unique()) for feature in categorical_features}

# class XGBoostWrapper:
#     def __init__(self, model):
#         self.model = model

#     def predict(self, instances):
#         return self.model.predict_proba(instances)

# # Wrap the XGBoost model
# wrapped_model = XGBoostWrapper(clf_xgb)

# # 3. Initialize the Counterfactual Generator
# cf = CounterfactualRLTabular(
#     ohe = False,
#     predictor=wrapped_model.predict,
#     encoder=encoder,
#     decoder=decoder,
#     encoder_preprocessor=heae_preprocessor,
#     decoder_inv_preprocessor=credit_inv_preprocessor,
#     coeff_sparsity=0.5,
#     coeff_consistency=0.5,
#     category_map=category_map,
#     data=X_train_scaled,
#     categorical_features=categorical_features,
#     continuous_features=continuous_features,
#     immutable_features=immutable_features,
#     non_decreasing_features=non_decreasing_features,
#     correlated_features=correlated_features,
#     feature_names=list(X.columns),
#     outcome_name='target',
#     task='classification',
#     max_counterfactual_length=5,
#     counterfactual_feature_range=None,
#     counterfactual_target_range=None,
#     reward_func=None,
#     reward_type='l2',
#     lam=0.1,
#     learning_rate=1e-3,
#     latent_dim = LATENT_DIM,
#     max_iterations=5000,
#     batch_size=100,
#     verbose=True
# )

# # 4. Generate Counterfactuals
# # For demonstration, let's take the first instance from the test set
# instance = X_test_scaled[0].reshape(1, -1)
# # 假设你的目标类别是1
# target_class = np.array([[0]])

# # 生成反事实
# explanation = cf.explain(instance, Y_t=target_class)


# # Print the counterfactual
# print("Original instance:", instance)
# print("Counterfactual instance:", explanation.cf['X'])

ValueError: need at least one array to concatenate