In [None]:
# importing all necessary libraries...
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve

In [None]:
Data = pd.read_csv("./archive/creditcard.csv").astype('float32')

# preprocess Data
Data = Data.drop(['Time'], axis=1)
Data['Amount'] = StandardScaler().fit_transform(Data['Amount'].values.reshape(-1,1))

fraud = Data[Data['Class'] == 1]
nonFraud = Data[Data['Class'] == 0]

# seperate nonfraud data 
trainData, testData = train_test_split(nonFraud, train_size=0.90)

# add fraud data to testData
testData = pd.concat([fraud, testData])

# final preprocessing
trainData = trainData.drop(['Class'], axis=1)

# save both testData and trainData for future use
trainData.to_csv('./Dataset/trainingData', index=False)
testData.to_csv('./Dataset/testingData', index=False)

In [None]:
# load and shuffle dataset
testData = pd.read_csv('./Dataset/testingData').astype('float32')
testData = testData.sample(frac=1, random_state=42)

In [None]:
testData.head()

In [None]:
len(testData[testData['Class'] == 1])

In [None]:
fraudSample = testData[testData['Class'] == 1]
nfraudSample = testData[testData['Class'] == 0].sample(650)

sampleData = pd.concat([fraudSample, nfraudSample])
dataY = sampleData["Class"].values
dataX = sampleData.drop(["Class"], axis=1).values

In [None]:
# helper function for TSNE visualization...
def tsne_plot(x1, y1):
    tsne = TSNE()
    X_t = tsne.fit_transform(x1)

    plt.scatter(X_t[np.where(y1 == 0), 0], X_t[np.where(y1 == 0), 1], marker='o', color='g', linewidth=1, alpha=0.8, label='Non Fraud')
    plt.scatter(X_t[np.where(y1 == 1), 0], X_t[np.where(y1 == 1), 1], marker='o', color='r', linewidth=1, alpha=0.8, label='Fraud')
    plt.legend(loc='lower center')
    plt.show()
    
tsne_plot(dataX, dataY)

In [None]:
dataX.shape

In [None]:
class VariationalAutoencoder(tf.keras.Model):
    def __init__(self, inputShape, latent_dim):
        super(VariationalAutoencoder, self).__init__()
        self.inputShape = inputShape
        self.latent_dim = latent_dim
        self.prior = tfp.distributions.MultivariateNormalDiag(loc=tf.zeros(latent_dim))
        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()

    def build_encoder(self):
        encoder = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=self.inputShape, name='encoder_input'),
            tf.keras.layers.Dense(20, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(10, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(8, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(4, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(tfp.layers.MultivariateNormalTriL.params_size(self.latent_dim), activation=None),
            tfp.layers.MultivariateNormalTriL(self.latent_dim, 
                                               activity_regularizer=tfp.layers.KLDivergenceRegularizer(self.prior)),
        ], name='encoder')
        return encoder

    def build_decoder(self):
        decoder = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=[self.latent_dim]),
            tf.keras.layers.Dense(4, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(8, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(10, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(20, activation=tf.nn.leaky_relu),
            tf.keras.layers.Dense(self.inputShape[0], activation=None), 
        ], name='decoder')
        return decoder
    
    def encode(self, x):
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        batch_size = tf.shape(mean)[0]
        eps = tf.random.normal(shape=(batch_size, self.latent_dim))
        return eps * tf.exp(logvar * .5) + mean

    def decode(self, z, apply_sigmoid=False):
        logits = self.decoder(z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits

    def call(self, x):
        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        decoded = self.decode(z)
        return decoded

def loss_function(y_true, y_pred):
    z_logvar, z_mean = model.encode(y_true)
    kl_divergence_loss = -0.5 * tf.reduce_sum(1 + z_logvar - tf.square(z_mean) - tf.exp(z_logvar), axis=-1)
    reconstruction_loss = tf.reduce_sum(tf.square(y_true - y_pred), axis=-1) 
    loss = tf.reduce_mean(reconstruction_loss + kl_divergence_loss)
    return loss

# Create an instance of the VAE model
inputShape = dataX.shape[1:]
latent_dim = 2
model = VariationalAutoencoder(inputShape, latent_dim)

# Build model
model.compile(optimizer=tf.keras.optimizers.Adam(1e-4), loss=loss_function)

In [None]:
model(tf.zeros((1, *inputShape)))

In [None]:
model.load_weights('weight.keras')

In [None]:
loss = model.evaluate(dataX)

In [None]:
rData = model.call(dataX)

In [None]:
# Calculate log probability for the test dataset
log_prob = -tf.reduce_sum(
    tf.nn.sigmoid_cross_entropy_with_logits(labels=dataX, logits=rData),
    axis=-1
)

# Plot histogram of log probability for nonFraud credit cards
plt.hist(log_prob[dataY==0] ,bins=80)
plt.xlabel('Log Probability')
plt.ylabel('Frequency')
plt.title('Log Probability Distribution')

plt.show()

In [None]:
# Calculate log probability for the test dataset
log_prob = -tf.reduce_sum(
    tf.nn.sigmoid_cross_entropy_with_logits(labels=dataX, logits=rData),
    axis=-1
)

# Plot histogram of log probability for Fraud credit cards
plt.hist(log_prob[dataY==1], bins=80)
plt.xlabel('Log Probability')
plt.ylabel('Frequency')
plt.title('Log Probability Distribution')

plt.show()

In [None]:
# choosing a threshold
log_prob[0:6]

In [None]:
precision, recall, thresholds = precision_recall_curve(dataY, log_prob)

In [None]:
len(thresholds)

In [None]:
import statistics

statistics.mean(thresholds)

In [None]:
pr_auc = auc(recall, precision)
pr_auc

In [None]:
plt.figure()
plt.plot(recall, precision, color='blue', label=f'PR AUC = {pr_auc:.2f}')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.show()

In [None]:
# Evaluating our model based on a single Threshold vaule

from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

threshold = 327.00

predictions = (log_prob > threshold)

# Calculate confusion matrix
conf_matrix = confusion_matrix(dataY, predictions)

# Extract TP, TN, FP, FN
TN, FP, FN, TP = conf_matrix.ravel()

# Calculate evaluation metrics
accuracy = accuracy_score(dataY, predictions)
precision = precision_score(dataY, predictions)
recall = recall_score(dataY, predictions)
f1 = f1_score(dataY, predictions)

print("True Positives (TP) ->", TP)
print("True Negatives (TN) ->", TN)
print("False Positives (FP) ->", FP)
print("False Negatives (FN) ->", FN)
print("Accuracy ->", accuracy)
print("Precision ->", precision)
print("Recall ->", recall)
print("F1 Score ->", f1)

In [None]:
# Plotting Confusion matrix...
import seaborn as sns

labels = [f"True Negative \n\n {TN}",F"False Positive \n\n {FP}",F"False Negative \n\n {FN}",f"True Positive \n\n {TP}"]
labels = np.asarray(labels).reshape(2,2)
plt.figure(figsize=(6, 6))
sns.heatmap(conf_matrix, annot=labels, fmt="", cmap="Blues", xticklabels=False, yticklabels=False)

plt.xlabel('Predicted values')
plt.ylabel('True values')
plt.title('Confusion Matrix')
plt.show()