In [12]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.metrics import accuracy_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import StandardScaler
from matplotlib import pyplot as plt
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split

In [3]:
df = pd.read_csv('C:/Users/cptas/accepted_2007_to_2018Q4.csv')
n_df = df[['loan_amnt', 'term', 'int_rate', 'home_ownership',
        'annual_inc', 'verification_status', 'purpose', 'delinq_2yrs', 'inq_last_6mths', 'open_acc', 'pub_rec',
        'revol_bal', 'total_acc', 'initial_list_status',
        'application_type', 'loan_status']]
n_df = n_df.dropna()
n_df['purpose'] = np.where((n_df.purpose == 'debt_consolidation') |
                (n_df.purpose == 'credit_card') |
                (n_df.purpose== "house") |
                (n_df.purpose == 'home_improvement'), 1, 0)

n_df['loan_status'] = np.where((n_df.loan_status == 'Current') |
                (n_df.loan_status == 'Fully Paid') |
                (n_df.loan_status== "Issued") |
                (n_df.loan_status == 'Does not meet the credit policy. Status:Fully Paid'), 1, 0)

#scale data

scaler = StandardScaler()

n_df['loan_amnt'] = scaler.fit_transform(n_df[['loan_amnt']])
n_df['int_rate'] = scaler.fit_transform(n_df[['int_rate']])
n_df['annual_inc'] = scaler.fit_transform(n_df[['annual_inc']])
n_df['delinq_2yrs'] = scaler.fit_transform(n_df[['delinq_2yrs']])
n_df['inq_last_6mths'] = scaler.fit_transform(n_df[['inq_last_6mths']])
n_df['open_acc'] = scaler.fit_transform(n_df[['open_acc']])
n_df['pub_rec'] = scaler.fit_transform(n_df[['pub_rec']])
n_df['revol_bal'] = scaler.fit_transform(n_df[['revol_bal']])
n_df['total_acc'] = scaler.fit_transform(n_df[['total_acc']])

# Select variables
X = n_df[['loan_amnt', 'term', 'int_rate', 'home_ownership',
        'annual_inc', 'verification_status', 'purpose', 'delinq_2yrs', 'inq_last_6mths', 'open_acc', 'pub_rec',
        'revol_bal', 'total_acc', 'initial_list_status',
        'application_type']]

# Encode categorical variables
X_encoded = pd.get_dummies(X, columns=['term', 'home_ownership',
                                       'verification_status',
                                       'initial_list_status',
                                       'application_type'])

y = n_df['loan_status']

  df = pd.read_csv('C:/Users/cptas/accepted_2007_to_2018Q4.csv')


In [15]:
class Discriminator(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Discriminator, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out = nn.functional.relu(self.fc1(x))
        out = self.fc2(out)
        out = nn.functional.sigmoid(out)
        return out

class Classifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.fc1(x)
        out = nn.functional.relu(out)
        out = self.fc2(out)
        out = self.sigmoid(out)
        return out
    
def evaluate(model, dataloader):
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for inputs, labels, _ in dataloader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy
    
criterion_cls = nn.BCELoss()
criterion_adv = nn.BCELoss()
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X_encoded, y, test_size=0.2, random_state=42)

# Define sensitive attribute
sensitive_attr = 'purpose'

# Create train and test dataloaders
train_dataset = TensorDataset(torch.Tensor(X_train.values), torch.Tensor(y_train.values), torch.Tensor(X_train[sensitive_attr].values))
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

test_dataset = TensorDataset(torch.Tensor(X_test.values), torch.Tensor(y_test.values), torch.Tensor(X_test[sensitive_attr].values))
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Define model, optimizer, and loss function
classifier = Classifier(input_size=X_train.shape[1], hidden_size=64, output_size=1)
discriminator = Discriminator(input_size=X_train.shape[1], hidden_size=64)
optimizer_cls = optim.Adam(classifier.parameters(), lr=0.001)
optimizer_adv = optim.Adam(discriminator.parameters(), lr=0.001)

# Set hyperparameters
num_epochs = 10
lambda_adv = 0.1
lambda_fair = 1.0

for epoch in range(num_epochs):
    for i, (inputs, labels, sensitive) in enumerate(train_dataloader):
        # Train discriminator
        optimizer_adv.zero_grad()
        pred_sensitive = discriminator(inputs)
        loss_adv = criterion_adv(pred_sensitive, sensitive.unsqueeze(1))
        loss_adv.backward()
        optimizer_adv.step()

        # Train classifier
        optimizer_cls.zero_grad()
        pred_labels = classifier(inputs)
        loss_cls = criterion_cls(pred_labels, labels.unsqueeze(1))
        pred_sensitive = discriminator(inputs)
        loss_adv = criterion_adv(pred_sensitive, sensitive.unsqueeze(1))
        pos_rates = []
        for j in range(len(torch.unique(sensitive))):
            pos_rates.append(torch.mean(pred_labels[sensitive == j]))
        fair_loss = torch.sum(torch.abs(torch.stack(pos_rates) - torch.mean(pred_labels)))
        loss_total = loss_cls - (lambda_adv * loss_adv) + (lambda_fair * fair_loss)
        loss_total.backward()
        optimizer_cls.step()

# Evaluation
classifier.eval()
with torch.no_grad():
    y_true = []
    y_pred = []
    for inputs, labels, sensitive in test_dataloader:
        outputs = classifier(inputs)
        predicted = (outputs >= 0.5).squeeze().long()
        y_true.extend(labels.tolist())
        y_pred.extend(predicted.tolist())

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-score:", f1)

from sklearn.metrics import roc_auc_score

y_scores = []
y_true = []
classifier.eval()
with torch.no_grad():
    for inputs, labels, sensitive in test_dataloader:
        outputs = classifier(inputs)
        y_scores.extend(outputs.tolist())
        y_true.extend(labels.tolist())

roc_auc = roc_auc_score(y_true, y_scores)

print("ROC-AUC score:", roc_auc)





KeyboardInterrupt: 