In [None]:
import pandas as pd
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

tf.random.set_seed(39)

data = pd.read_csv('datasets/features_train.csv')
data = data.drop(columns=['Flow ID', 'NNP'])
X = data.drop(columns=['Label'])
y = data['Label']
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=37)

def mc_dropout_predict(f_model: tf.keras.Model, X_input: np.ndarray, abs: bool=False):
    f_model.trainable = False
    
    if abs:
        predictions = f_model(X_input, training=True).numpy().flatten()
    else:
        predictions = (f_model(X_input, training=True).numpy().flatten() >= 0.5).astype(np.int8)

    return predictions

def build_bnn(input_dim) -> tf.keras.Model:
    # model = tf.keras.Sequential([
    #     tfp.layers.DenseFlipout(16, activation='relu'),
    #     tf.keras.layers.BatchNormalization(),
    #     tf.keras.layers.Dropout(0.3),
    #     tfp.layers.DenseFlipout(32, activation='relu'),
    #     tf.keras.layers.BatchNormalization(),
    #     tf.keras.layers.Dropout(0.4),
    #     tfp.layers.DenseFlipout(64, activation='relu'),
    #     tf.keras.layers.BatchNormalization(),
    #     tf.keras.layers.Dropout(0.5),
    #     tf.keras.layers.Dense(16, activation='relu'),
    #     tf.keras.layers.Dense(8, activation='relu'),
    #     tf.keras.layers.Dense(1, activation='sigmoid')
    # ])
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(16, activation="relu", input_shape=(input_dim,)),
        tf.keras.layers.Dense(32, activation="relu"),
        tf.keras.layers.Dense(64, activation="relu"),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(64, activation="relu"),
        tf.keras.layers.Dropout(0.4),
        tf.keras.layers.Dense(32, activation="relu"),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(16, activation="relu"),
        tfp.layers.DenseFlipout(8, activation="relu"),
        tf.keras.layers.Dense(1, activation="sigmoid")
    ])

    return model

input_dim = X_train.shape[1]
model = build_bnn(input_dim)

model.compile(
    optimizer='adam', 
    loss=tf.keras.losses.BinaryCrossentropy(),
    metrics=['accuracy']
)

model.fit(X_train, y_train, epochs=50, batch_size=32)

predictions = mc_dropout_predict(model, X_test)
accuracy = accuracy_score(y_test, predictions)
precision = precision_score(y_test, predictions)
recall = recall_score(y_test, predictions)
f1 = f1_score(y_test, predictions)
aucroc = roc_auc_score(y_test, mc_dropout_predict(model, X_test, True))

In [None]:
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1: {f1:.4f}")
print(f"Test AUCROC: {aucroc:.4f}")

In [33]:
import os

os.makedirs("logs", exist_ok=True)

file = open("logs/log.log", mode="w")

file.write(f"Test Accuracy: {accuracy:.4f}\n")
file.write(f"Test Precision: {precision:.4f}\n")
file.write(f"Test Recall: {recall:.4f}\n")
file.write(f"Test F1: {f1:.4f}\n")
file.write(f"Test AUCROC: {aucroc:.4f}\n")
file.write("\n")
file.flush()

def predict_with_uncertainty(f_model, X, n_iter):
    predictions = np.array([mc_dropout_predict(f_model, X) for _ in range(n_iter)])
    mean_pred = predictions.mean(axis=0)
    uncertainty = predictions.std(axis=0)

    return mean_pred, uncertainty

mean_pred, uncertainty = predict_with_uncertainty(model, X_test, 1000)
unc_sum = 0

for i in range(len(mean_pred)):
    unc_sum += uncertainty[i]

    file.write(f"Sample {i+1}:\n")
    file.write(f"  Mean Prediction: {mean_pred[i]:.4f}\n")
    file.write(f"  Uncertainty (Std Dev): {uncertainty[i]:.4f}\n")
    file.write("\n")
    file.flush()

file.write(f"Uncertainty Mean: {(unc_sum / len(mean_pred)):.4f}\n")
file.close()

In [None]:
import shap
import numpy as np
import matplotlib.pyplot as plt

def create_background_dataset(X_train, num_background=100):
    indices = np.random.choice(X_train.shape[0], num_background, replace=False)

    return X_train[indices]

def perform_shap_analysis(model, X_train, X_test, feature_names):
    background = create_background_dataset(X_train)
    explainer = shap.KernelExplainer(model, background)
    temp_shap_values = explainer.shap_values(X_test[:100])
    shap_values = np.zeros(shape=(temp_shap_values.shape[0], temp_shap_values.shape[1]))

    for i in range(shap_values.shape[0]):
        shap_values[i] = temp_shap_values[i].flatten()

    del temp_shap_values
    
    shap.summary_plot(shap_values, X_test[:100], feature_names=feature_names)
    
    mean_shap_values = np.abs(shap_values).mean(axis=0)
    
    plt.figure(figsize=(10, 6))
    plt.bar(feature_names, mean_shap_values)
    plt.title("Feature Importance based on Mean Absolute SHAP Values")
    plt.xlabel("Features")
    plt.ylabel("Mean Absolute SHAP Value")
    plt.xticks(rotation=90)
    plt.show()
    plt.close()
    
    print("Feature Importance based on Mean Absolute SHAP Values:")

    for feature, importance in zip(feature_names, mean_shap_values):
        print(f"{feature}: {importance:.4f}")

feature_names = X.columns.tolist()

perform_shap_analysis(model, X_train, X_test, feature_names)