In [None]:
import pandas as pd
import numpy as np
import joblib
import seaborn as sns
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

# --- 1. Load All Artifacts and the Hold-Out Test Set ---
print("--- 1. Loading all models, preprocessors, and test data ---")
drive_path = '/content/drive/MyDrive/ANOMALY_DETECTION/'
try:
    # Load the unseen test data
    test_df = joblib.load(drive_path + 'hold_out_test_set.pkl')

    # Load the model artifacts
    autoencoder = load_model(drive_path + 'autoencoder_model.h5')
    rf_classifier = joblib.load(drive_path + 'rf_classifier.pkl')
    scaler = joblib.load(drive_path + 'scaler.pkl')
    label_encoder = joblib.load(drive_path + 'label_encoder.pkl')
    threshold = joblib.load(drive_path + 'threshold.pkl')
    pca = joblib.load(drive_path + 'pca_unified.pkl')
    training_columns = joblib.load(drive_path + 'training_columns.pkl')
    print("âœ… All artifacts loaded successfully.")
except FileNotFoundError as e:
    print(f"Error loading files: {e}. Please ensure all artifacts are in the correct path.")
    exit()

# --- 2. Preprocess the Test Data ---
print("\n--- 2. Preprocessing the test data using the saved pipeline ---")
# Separate labels from features
true_labels = test_df['Label']
features = test_df.drop(columns=['Label'])

# Apply the exact same preprocessing steps as in training
features_encoded = pd.get_dummies(features, columns=['Destination_port_group'], drop_first=True)
features_harmonized = features_encoded.reindex(columns=training_columns, fill_value=0)
features_scaled = scaler.transform(features_harmonized)
features_pca = pca.transform(features_scaled)
print(f"Test data preprocessed. Final shape: {features_pca.shape}")

# --- 3. Run the Full Two-Phase Prediction Pipeline ---
print("\n--- 3. Running predictions on the test set ---")
final_predictions = []
for i in range(len(features_pca)):
    instance = np.expand_dims(features_pca[i], axis=0)

    # Phase 1: Anomaly Detection with Autoencoder
    reconstruction_error = np.mean(np.square(autoencoder.predict(instance, verbose=0) - instance))

    if reconstruction_error > threshold:
        # Anomaly Detected -> Phase 2: Classify Attack
        attack_pred_encoded = rf_classifier.predict(instance)[0]
        attack_pred_label = label_encoder.inverse_transform([attack_pred_encoded])[0]
        final_predictions.append(attack_pred_label)
    else:
        # Normal -> Predict BENIGN
        final_predictions.append('BENIGN')

# --- 4. Evaluate Performance ---
print("\n--- 4. Generating Performance Reports ---")

# --- Phase 1 Evaluation: Anomaly Detection Performance ---
print("=====================================================")
print("          Phase 1: Anomaly Detection Report          ")
print("=====================================================")

# Create binary labels for the detection task (BENIGN vs. ATTACK)
true_binary = np.where(true_labels == 'BENIGN', 'BENIGN', 'ATTACK')
pred_binary = np.where(np.array(final_predictions) == 'BENIGN', 'BENIGN', 'ATTACK')

# Generate and print the classification report
print(classification_report(true_binary, pred_binary))

# Generate and visualize the confusion matrix
cm = confusion_matrix(true_binary, pred_binary, labels=['ATTACK', 'BENIGN'])
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['ATTACK', 'BENIGN'], yticklabels=['ATTACK', 'BENIGN'])
plt.title('Phase 1: Anomaly Detection Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()


# --- Phase 2 Evaluation: Attack Classification Performance ---
print("\n=====================================================")
print("         Phase 2: Attack Classification Report         ")
print("  (Evaluated only on samples correctly detected as attacks)  ")
print("=====================================================")

# Filter for instances that were truly attacks and were also predicted as attacks
attack_mask = (true_binary == 'ATTACK') & (pred_binary == 'ATTACK')
true_attack_labels = true_labels[attack_mask]
predicted_attack_labels = pd.Series(final_predictions)[attack_mask]

if not true_attack_labels.empty:
    # Generate and print the classification report
    print(f"Overall Accuracy on correctly detected attacks: {accuracy_score(true_attack_labels, predicted_attack_labels):.2%}\n")
    print(classification_report(true_attack_labels, predicted_attack_labels))
else:
    print("No attacks were correctly detected, so the Phase 2 report cannot be generated.")