In [1]:
import torch
from torch import nn
from sklearn.model_selection import train_test_split
from prepare_datasets import *

X, y , feature_names , categorical_features, continuous_features, actionable_features= get_and_prepare_german_dataset()

X = torch.from_numpy(X).type(torch.float)
y = torch.from_numpy(y).type(torch.float)
X_train, X_test, y_train, y_test = train_test_split(X,
                                                    y,
                                                    test_size=0.2,
                                                    random_state=42)

In [2]:
# Make device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [3]:
# Taken from https://github.com/dylan-slack/manipulating-cfes/blob/main/train_models.py
# wanted to use this the same model as in the paper
class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.tanh1 = nn.Tanh()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.tanh2 = nn.Tanh()
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.tanh3 = nn.Tanh()
        self.fc4 = nn.Linear(hidden_size, num_classes)

    def forward(self, x, return_logit=False):
        out = self.fc1(x)
        out = self.tanh1(out)
        out = self.fc2(out)
        out = self.tanh2(out)
        out = self.fc3(out)
        out = self.tanh3(out)
        out = self.fc4(out)
        return out


In [4]:
model = NeuralNetwork(X.shape[1], 200, 1)

In [5]:
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [6]:
from sklearn.metrics import f1_score

epochs = 1000
X_train, y_train = X_train.to(device), y_train.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)
model.to(device)

for epoch in range(epochs):
    model.train()
    y_logits = model(X_train).squeeze()
    y_pred = torch.round(torch.sigmoid(y_logits))
    loss = loss_fn(y_logits, y_train)

    f1_score_training = f1_score(
        y_true=y_train.cpu().detach().numpy(),
        y_pred=y_pred.cpu().detach().numpy()
    )

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    ### Testing
    model.eval()
    with torch.inference_mode():
        test_logits = model(X_test).squeeze()
        test_pred = torch.round(torch.sigmoid(test_logits))
        test_loss = loss_fn(test_logits, y_test)

        f1_score_test = f1_score(
            y_true=y_test.cpu().detach().numpy(),
            y_pred=test_pred.cpu().detach().numpy()
        )

    if epoch % 100 == 0:
        print(f"Epoch: {epoch} | Loss: {loss:.5f}, F1-Training: {f1_score_training:.2f}% | Test loss: {test_loss:.5f}, F1-Testing: {f1_score_test:.2f}%")

Epoch: 0 | Loss: 0.69896, F1-Training: 0.03% | Test loss: 0.67704, F1-Testing: 0.83%
Epoch: 100 | Loss: 0.59433, F1-Training: 0.82% | Test loss: 0.60478, F1-Testing: 0.83%
Epoch: 200 | Loss: 0.57253, F1-Training: 0.83% | Test loss: 0.60203, F1-Testing: 0.84%
Epoch: 300 | Loss: 0.55851, F1-Training: 0.82% | Test loss: 0.59466, F1-Testing: 0.83%
Epoch: 400 | Loss: 0.54787, F1-Training: 0.82% | Test loss: 0.59394, F1-Testing: 0.83%
Epoch: 500 | Loss: 0.53770, F1-Training: 0.83% | Test loss: 0.59148, F1-Testing: 0.82%
Epoch: 600 | Loss: 0.52748, F1-Training: 0.83% | Test loss: 0.58763, F1-Testing: 0.83%
Epoch: 700 | Loss: 0.51451, F1-Training: 0.84% | Test loss: 0.58835, F1-Testing: 0.83%
Epoch: 800 | Loss: 0.50463, F1-Training: 0.84% | Test loss: 0.58872, F1-Testing: 0.83%
Epoch: 900 | Loss: 0.49531, F1-Training: 0.85% | Test loss: 0.59118, F1-Testing: 0.82%


In [7]:
import dice_ml

df = pd.DataFrame(X, columns=feature_names)
df['target'] = y

dice_data = dice_ml.Data(
    dataframe=df,
    continuous_features=continuous_features,
    categorical_features=categorical_features,
    outcome_name='target'
)

In [8]:
class PyTorchModelWrapper:
    def __init__(self, model):
        self.model = model

    def predict_proba(self, x):
        self.model.eval()
        with torch.no_grad():
            x = x.values
            x = x.astype(np.float64)
            x_tensor = torch.FloatTensor(x)
            outputs = self.model(x_tensor)
            probabilities = torch.sigmoid(outputs).squeeze()

            if probabilities.ndim == 0:

                probabilities = probabilities.unsqueeze(0)

            prob_numpy = probabilities.cpu().numpy()
            return np.column_stack((1 - prob_numpy, prob_numpy))


dice_model = dice_ml.Model(
    model=PyTorchModelWrapper(model),
    backend="sklearn",
    model_type='classifier'
)

exp = dice_ml.Dice(
    dice_data,
    dice_model,
    method="genetic"
)


In [9]:

query_instance = X[11:12]
query_df = pd.DataFrame(query_instance, columns=feature_names)


#  generate counterfactuals
dice_exp = exp.generate_counterfactuals(
    query_df,
    total_CFs=1,
    desired_class="opposite",
    features_to_vary = actionable_features
)

print("\nCounterfactuals:")
dice_exp.visualize_as_dataframe(show_only_changes=True)


100%|██████████| 1/1 [00:00<00:00,  2.87it/s]


Counterfactuals:
Query instance (original outcome : 0)





Unnamed: 0,Gender,ForeignWorker,Single,Age,LoanDuration,LoanAmount,LoanRateAsPercentOfIncome,YearsAtCurrentHome,NumberOfOtherLoansAtBank,NumberOfLiableIndividuals,...,OtherLoansAtBank,HasCoapplicant,HasGuarantor,OwnsHouse,RentsHouse,Unemployed,YearsAtCurrentJob_lt_1,YearsAtCurrentJob_geq_4,JobClassIsSkilled,target
0,1.0,0.0,0.0,24.0,48.0,4308.0,3.0,4.0,1.0,1.0,...,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,0



Diverse Counterfactual set (new outcome: 1.0)


Unnamed: 0,Gender,ForeignWorker,Single,Age,LoanDuration,LoanAmount,LoanRateAsPercentOfIncome,YearsAtCurrentHome,NumberOfOtherLoansAtBank,NumberOfLiableIndividuals,...,OtherLoansAtBank,HasCoapplicant,HasGuarantor,OwnsHouse,RentsHouse,Unemployed,YearsAtCurrentJob_lt_1,YearsAtCurrentJob_geq_4,JobClassIsSkilled,target
0,-,-,-,-,18.0,-,-,-,-,-,...,-,-,-,1.0,0.0,-,0.0,-,-,1.0


In [10]:
# Before creating DiCE explainer
constant_features = [col for col in df.columns
                    if df[col].nunique() == 1]
if constant_features:
    print(f"Constant features found: {constant_features}")
    # Either remove them or handle specially

In [11]:
# get counterfactuals as list
cf_list = dice_exp.cf_examples_list[0].final_cfs_df.values.tolist()

# Convert query_instance to tensor
if isinstance(query_instance, np.ndarray):
    query_instance = torch.from_numpy(query_instance).float()

for cf in cf_list:
    # Convert cf to tensor
    if isinstance(cf, list):
        cf = np.array(cf)
    if isinstance(cf, np.ndarray):
        cf = torch.from_numpy(cf).float()

    cf_features = cf[:-1]  # all features except the last one which is the target
    query_features = query_instance.squeeze()

    with torch.no_grad():
        # Calculate L1 distance between feature vectors
        distance = torch.norm(query_features - cf_features, p=1)
        print(f"L1 distance between query and counterfactual: {distance.item()}")

L1 distance between query and counterfactual: 106.0


In [13]:
from Helper_functions import *

X_positive, X_negative, pos_indices, neg_indices = split_by_classification(model, X_test)


In [14]:
query_df = pd.DataFrame(X_negative.cpu().numpy(), columns=feature_names)

dice_exp = exp.generate_counterfactuals(
    query_df,
    total_CFs=1,
    desired_class="opposite",
    features_to_vary=actionable_features
)

100%|██████████| 32/32 [00:13<00:00,  2.34it/s]


In [15]:
l1_distances = []
for i in range(len(X_negative)):
    original = X_negative[i]
    cf_df = dice_exp.cf_examples_list[i].final_cfs_df
    cf_values = cf_df[feature_names].values  # exclude target column
    cf_tensor = torch.from_numpy(cf_values).float().to(device)
    distance = torch.norm(original - cf_tensor, p=1).item()
    l1_distances.append(distance)

# Compute mean L1 distance
mean_l1 = np.mean(l1_distances)
print(f"Mean L1 distance for negatively classified data using DiCE: {mean_l1:.2f}")

Mean L1 distance for negatively classified data using DiCE: 1214.53


In [35]:
%%latex
the  cost function has this formula

$$  BinaryCrossEntropy(D {\scriptstyle train}) -  \frac{\lambda}{|D {\scriptstyle negative}|} \cdot \sum_{x \in D {\scriptstyle negative} } C( x )  $$


with C(x) is an approximation of the Cost of Recourse set to be
$$  C(x) = |G{\scriptstyle class 1} - G{\scriptstyle class 2} |     $$

G being the logit of that specific class and the approximation being the difference of the logitxs

<IPython.core.display.Latex object>