In [None]:
import copy
import tensorflow as tf
import numpy as np
import pandas as pd
import torch
import keras
import torch.nn as nn
import torch.optim as optim
import tqdm
from sklearn.model_selection import train_test_split

In [None]:
# ********** Load and clean dataset ************
# Download the dateset here: https://www.kaggle.com/competitions/home-credit-default-risk/data?select=application_train.csv
data = pd.read_csv("application_train.csv")
print(data.shape)

reduced_data = data[["TARGET","CODE_GENDER","FLAG_OWN_CAR","FLAG_OWN_REALTY","AMT_INCOME_TOTAL","AMT_CREDIT","NAME_EDUCATION_TYPE","REGION_RATING_CLIENT","CNT_FAM_MEMBERS"]]
data = reduced_data[~reduced_data["CNT_FAM_MEMBERS"].isna()]
data.dropna()
print(data.shape)
data.to_csv("credit_risk.csv", index=False)

for cat in ["CODE_GENDER","FLAG_OWN_CAR","FLAG_OWN_REALTY","NAME_EDUCATION_TYPE"]:
    # Encode the categorical data using sequential numbers
    feature = list(data[cat].unique())
    print(feature)
    data[cat].replace(feature, [i for i, _ in enumerate(feature)], inplace=True)

In [None]:
# ********** Split dataset and prepare for training ************
y = data[['TARGET']]
X = data.drop('TARGET', axis = 1)

# convert pandas DataFrame (X) and numpy array (y) into PyTorch tensors
X = torch.tensor(X.values, dtype=torch.float32)
y = torch.tensor(y.values, dtype=torch.long)

# split
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.7, shuffle=True)

In [None]:
n_samples, n_features = X.shape
print(n_samples, n_features)
# n_classes = y.shape[1]
n_classes = torch.unique(y).shape[0]
print(n_classes)

In [None]:
# ********** Define and instantiate Sequential model ************
class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden = nn.Linear(input_size, hidden_size)
        self.act = nn.ReLU()
        self.output = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.act(self.hidden(x))
        x = self.output(x)
        return x

# compute weights to account for unbalanced labels
tot_y = data[data["TARGET"] == 1].shape[0]
tot_n = data[data["TARGET"] == 0].shape[0]
w_y = tot_y / data.shape[0]
w_n = tot_n / data.shape[0]

# loss metric and optimizer
input_size, hidden_size, output_size = (n_features, 10, n_classes)
model = NeuralNetwork(input_size, hidden_size, output_size)
loss_fn = nn.CrossEntropyLoss(weight=torch.tensor([w_n,w_y]))
loss_fn.requires_grad = True
params = list(model.parameters())
optimizer = optim.Adam(params, lr=0.001)

In [None]:
# ********** Actual Training ************
n_epochs = 4
batch_size = 100
batches_per_epoch = len(X_train) // batch_size

best_acc = - np.inf   # init to negative infinity
best_weights = None
train_loss_hist = []
train_acc_hist = []
test_loss_hist = []
test_acc_hist = []

# training loop
for epoch in range(n_epochs):
    epoch_loss = []
    epoch_acc = []
    # set model in training mode and run through each batch
    model.train()
    with tqdm.trange(batches_per_epoch, unit="batch", mininterval=0) as bar:
        bar.set_description(f"Epoch {epoch}")
        for i in bar:
            # take a batch
            start = i * batch_size
            X_batch = X_train[start:start+batch_size]
            y_batch = y_train[start:start+batch_size]
            # forward pass
            y_pred = model(X_batch)
            loss = loss_fn(y_pred, y_batch.flatten())
            # backward pass
            optimizer.zero_grad()
            loss.backward()
            # update weights
            optimizer.step()
            # compute and store metrics
            acc = (torch.argmax(y_pred, 1) == y_batch.flatten()).float().mean()
            epoch_loss.append(float(loss))
            epoch_acc.append(float(acc))
            bar.set_postfix(
                loss=float(loss),
                acc=float(acc)
            )
    # set model in evaluation mode and run through the test set
    model.eval()
    y_pred = model(X_test)
    ce = loss_fn(y_pred, y_test.flatten())
    acc = (torch.argmax(y_pred, 1) == y_test.flatten()).float().mean()
    ce = float(ce)
    acc = float(acc)
    train_loss_hist.append(np.mean(epoch_loss))
    train_acc_hist.append(np.mean(epoch_acc))
    test_loss_hist.append(ce)
    test_acc_hist.append(acc)
    if acc > best_acc:
        best_acc = acc
        best_weights = copy.deepcopy(model.state_dict())
    print(f"Epoch {epoch} validation: Cross-entropy={ce:.2f}, Accuracy={acc*100:.1f}%")

In [None]:
# Restore best model and save it
model.load_state_dict(best_weights)
torch.save(model.state_dict(), "pytorch_weights.pth")

In [None]:
# Convert Torch Model to Keras
keras_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(8,)),
    tf.keras.layers.Dense(10, activation='relu'),
    tf.keras.layers.Dense(2)
])

# Load the PyTorch model weights
pytorch_weights = torch.load('pytorch_weights.pth')

weights = []
layer_weights = []
for el in pytorch_weights.values():
    if len(el.shape) > 1:
        layer_weights.append(torch.transpose(el, 0, 1).numpy())
    else:
        layer_weights.append(el.numpy())
weights.append(list(layer_weights[0:2]))
weights.append(list(layer_weights[2:]))

# Iterate over the layers in the Keras model and the weights in the PyTorch model
for keras_layer, layer_weights in zip(keras_model.layers, weights):
    # Set the weights of the corresponding layers in the Keras model
    keras_layer.set_weights(layer_weights)

keras_model.compile(
    # optimizer=optimizer, loss="binary_crossentropy"
    optimizer=optimizer, loss="binary_crossentropy", metrics=['accuracy']
)

# Show models are the same
t_model = NeuralNetwork(input_size, hidden_size, output_size)
pytorch_weights = torch.load('pytorch_weights.pth')
t_model.load_state_dict(pytorch_weights)

randomData = data.sample(100)
y = randomData[['TARGET']]
X = randomData.drop('TARGET', axis=1)
X_tensor = torch.tensor(X.values, dtype=torch.float32)
y_tensor = torch.tensor(y.values, dtype=torch.long)


t_pred = t_model(X_tensor)
t_acc = (torch.argmax(t_pred, 1) == y_tensor.flatten()).float().mean()
print(f"Accuracy of torch model is: {t_acc}")

k_pred = keras_model.predict(X)
k_acc = (np.mean(np.argmax(k_pred, 1) == y.values.flatten()))
print(f"Accuracy of keras model is: {k_acc}")

keras_model.compile(loss=loss_fn)
# Save Keras model
keras_model.save("model.keras")