<a href="https://colab.research.google.com/github/Feegex/Colette-Threat-Detection-Model/blob/main/Test_TM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import matplotlib.pyplot as plt
import shap
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score

# 1. Custom Dataset Handling - Use Real Dataset or Synthetic Data
def load_dataset(real_data_path=None, n_samples=1000, n_features=20):
    try:
        if real_data_path:
            data = pd.read_csv(real_data_path)
            features = data.drop(columns=['label'])
            labels = data['label']
        else:
            from sklearn.datasets import make_classification
            features, labels = make_classification(n_samples=n_samples, n_features=n_features, n_informative=15, n_classes=2)
        return features, labels
    except Exception as e:
        print(f"Error loading dataset: {e}")
        raise

# Preprocess data
def preprocess_data(features, labels, test_size=0.3):
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(features)
    X_train, X_test, y_train, y_test = train_test_split(scaled_features, labels, test_size=test_size)
    return X_train, X_test, y_train, y_test

# Custom Transformer block
class TransformerBlock(layers.Layer):
    def __init__(self, embedding_dim, num_heads, ff_dim, dropout_rate=0.1):
        super().__init__()
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embedding_dim)
        self.ffn = tf.keras.Sequential([layers.Dense(ff_dim, activation="relu"), layers.Dense(embedding_dim)])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(dropout_rate)
        self.dropout2 = layers.Dropout(dropout_rate)

    def call(self, inputs, training):
        attn_output = self.attention(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# Build Transformer Model
def build_transformer_model(input_shape, num_heads=2, key_dim=64, ff_dim=128, learning_rate=1e-4):
    inputs = layers.Input(shape=(input_shape,))
    x = layers.Dense(key_dim)(inputs)
    x = layers.Reshape((1, key_dim))(x)
    transformer_block = TransformerBlock(embedding_dim=key_dim, num_heads=num_heads, ff_dim=ff_dim)
    x = transformer_block(x)
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dropout(0.4)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.4)(x)
    x = layers.Dense(32, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(1, activation='sigmoid')(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer=optimizers.Adam(learning_rate=learning_rate), loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Train model
def train_model(model, X_train, y_train, epochs=30, batch_size=32, validation_split=0.2):
    lr_scheduler = callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, verbose=1)
    early_stopping = callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)
    return model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=validation_split, callbacks=[lr_scheduler, early_stopping])

# Evaluate model
def evaluate_model(model, X_test, y_test):
    y_pred = (model.predict(X_test) > 0.5).astype("int32")
    accuracy = np.mean(y_pred.flatten() == y_test)
    auc = roc_auc_score(y_test, y_pred)
    print(f"Test Accuracy: {accuracy:.2f}, AUC: {auc:.2f}")
    return accuracy, auc

# Real-Time Monitoring Simulation
def real_time_monitoring(model, incoming_data_stream):
    print("Starting real-time threat monitoring...")
    for data in incoming_data_stream:
        prediction = model.predict(data.reshape(1, -1))
        print(f"Real-Time Prediction: {prediction[0][0]:.2f}")
        time.sleep(2)

# Explain model predictions using SHAP
def explain_model_shap(model, X_train, X_test):
    background_samples = shap.sample(X_train, 100)
    explainer = shap.DeepExplainer(model, background_samples)
    shap_values = explainer.shap_values(X_test)
    shap.summary_plot(shap_values, X_test)

# Plot training history
def plot_training_history(history):
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    features, labels = load_dataset()
    X_train, X_test, y_train, y_test = preprocess_data(features, labels)
    model = build_transformer_model(X_train.shape[1])
    history = train_model(model, X_train, y_train)
    evaluate_model(model, X_test, y_test)
    plot_training_history(history)
    incoming_data_stream = np.random.randn(5, X_train.shape[1])
    real_time_monitoring(model, incoming_data_stream)
    explain_model_shap(model, X_train, X_test)
