In [None]:
import sys
print(sys.version)
print(sys.platform)

In [None]:
import sys, sklearn, numpy, joblib, imblearn

print("Python:", sys.version)
print("scikit-learn:", sklearn.__version__)
print("numpy:", numpy.__version__)
print("joblib:", joblib.__version__)
print("imbalanced-learn:", imblearn.__version__)

In [None]:
!python -m pip install seaborn optuna

In [None]:
import pandas as pd
import numpy as np
import glob
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from imblearn.under_sampling import RandomUnderSampler
from sklearn.tree import plot_tree
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV
from sklearn.feature_selection import mutual_info_classif, SelectKBest
import optuna
from sklearn.decomposition import PCA

In [None]:
# Load the dataset
file_path = "C:\\Users\\B760M-ITX D4 WIFI\\Documents\\GitHub\\ros-security\\dataset\\existing\\NavBot25.csv"
data = pd.read_csv(file_path)

In [None]:
# Display dataset info
print(data.info())
print(data.head())

In [None]:
print(data['Label'].value_counts())

In [None]:
# Define attack type mapping
attack_mapping = {
    "Normal": 0,
    "DoS Attack": 1,
    "UnauthSub Attack": 2,
    "SSH Bruteforce": 3,
    "Pubflood": 4,
    "Subflood": 5,
    "Reverse Shell": 6,
    "Port Scanning Attack": 7
}

# Convert attack type names to numeric labels
data["Label"] = data["Label"].map(attack_mapping)

# Drop rows with unmatched labels (if any)
data = data.dropna(subset=["Label"])

# Ensure labels are integers
data["Label"] = data["Label"].astype(int)

In [None]:
# Drop unnecessary columns
columns_to_drop = ['Flow ID', 'Src IP', 'Dst IP', 'Protocol', 'Timestamp']
data = data.drop(columns=columns_to_drop, errors='ignore')

# Check if any column is non-numeric
non_numeric_columns = data.select_dtypes(exclude=['number']).columns
print("Non-numeric columns:", non_numeric_columns)

# Handle missing values for numeric columns only
numeric_columns = data.select_dtypes(include=['number']).columns
data[numeric_columns] = data[numeric_columns].fillna(data[numeric_columns].mean())

# Check the dataset again
print(data.info())

In [None]:
# Split into features (X) and target (y)
X = data.drop('Label', axis=1)  # Features
y = data['Label']  # Target

In [None]:
# Split into training and testing sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print("Training set size:", X_train.shape)
print("Testing set size:", X_test.shape)

In [None]:
# Check for NaN values
print("NaN values in X_train:", np.isnan(X_train).sum().sum())
print("NaN values in X_test:", np.isnan(X_test).sum().sum())

# Check for infinity values
print("Infinity values in X_train:", np.isinf(X_train).sum().sum())
print("Infinity values in X_test:", np.isinf(X_test).sum().sum())

In [None]:
# Replace NaN and infinity with the mean of the column
X_train = X_train.replace([np.inf, -np.inf], np.nan)
X_test = X_test.replace([np.inf, -np.inf], np.nan)

# Fill NaN with column mean
X_train = X_train.fillna(X_train.mean())
X_test = X_test.fillna(X_test.mean())

In [None]:
# Print original class distribution
print("Original class distribution:")
print(y_train.value_counts())

In [None]:
# Plot class distribution before SMOTE
plt.figure(figsize=(8, 6))
y_train.value_counts().sort_index().plot(kind='bar', color='skyblue')
plt.xlabel("Class Label")
plt.ylabel("Number of Samples")
plt.title("Class Distribution Before SMOTE")
plt.xticks([0, 1, 2, 3, 4, 5, 6, 7], ['Normal', 'DoS Attack', 'UnauthSub Attack', 'SSH Bruteforce', 'UnauthPub Attack', 'Subflood', 'Reverse Shell', 'Port Scanning Attack'])
plt.show()

In [None]:
# Apply SMOTE to balance the training set
smote = SMOTE(random_state=42)
X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)

# Check the class distribution after SMOTE
print("Class distribution after SMOTE:")
print(y_train_balanced.value_counts())

In [None]:
# Plot class distribution after SMOTE
plt.figure(figsize=(8, 6))
y_train_balanced.value_counts().sort_index().plot(kind='bar', color='skyblue')
plt.xlabel("Class Label")
plt.ylabel("Number of Samples")
plt.title("Class Distribution After SMOTE")
plt.xticks([0, 1, 2, 3, 4, 5, 6, 7], ['Normal', 'DoS Attack', 'UnauthSub Attack', 'SSH Bruteforce', 'UnauthPub Attack', 'Subflood', 'Reverse Shell', 'Port Scanning Attack'])
plt.show()

In [None]:
def scale_data(X_train, X_test, scale_data=True):
    """
    Scales the data if scale_data is True.
    """
    if scale_data:
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns)

        X_test_scaled = scaler.transform(X_test)
        X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns)
    else:
        X_train_scaled = X_train
        X_test_scaled = X_test

    return X_train_scaled, X_test_scaled, scaler

In [None]:
def compute_mutual_info(X_train_scaled, y_train):
    """
    Computes mutual information (MI) scores for the features in X_train.
    """
    mi_scores = mutual_info_classif(X_train_scaled, y_train, random_state=42)

    # Create MI DataFrame with rank
    mi_df = pd.DataFrame({
        'Feature': X_train_scaled.columns,
        'MI_Score': mi_scores
    }).sort_values('MI_Score', ascending=False).reset_index(drop=True)
    mi_df['Rank'] = mi_df.index + 1

    return mi_df

In [None]:
def plot_top_features(mi_df, top_n=20):
    """
    Plots the top_n features based on their Mutual Information (MI) scores.
    """
    top_k = mi_df.head(top_n)

    plt.figure(figsize=(10, 8))
    sns.barplot(x='MI_Score', y='Feature', data=top_k, palette='viridis')
    plt.title(f'Top {top_n} Features by Mutual Information Score', fontsize=16)
    plt.xlabel('Mutual Information Score', fontsize=12)
    plt.ylabel('Feature', fontsize=12)
    plt.tight_layout()
    plt.show()

# First, scale the data and get the scaler
X_train_scaled, X_test_scaled, scaler = scale_data(X_train_balanced, X_test, scale_data=True)

# Then, compute mutual information
mi_df = compute_mutual_info(X_train_scaled, y_train_balanced)

# Print Top 20 Features based on MI Scores
print("[INFO] Top 20 Features by Mutual Information Scores:")
print(mi_df.head(20))

# Plot the top 20 features
plot_top_features(mi_df, top_n=20)

# Select features based on the mutual information threshold
selected_features = mi_df[mi_df['MI_Score'] >= 0.01]['Feature'].tolist()

# Now select the features from the original data
X_train_selected = X_train_balanced[selected_features]
X_test_selected = X_test[selected_features]

# Ensure X_train_selected has the same columns as X_test_selected
X_train_selected = X_train_balanced[X_test_selected.columns]  # Align training data to test features

# Apply scaling to both the train and test data using the same scaler (that was used previously)
X_train_scaled = scaler.fit_transform(X_train_selected)  # Fit and transform the train data
X_test_scaled = scaler.transform(X_test_selected)  # Transform the test data

# Print total number of selected features and their names
print(f"[INFO] Selected {len(selected_features)} Features:")
print(selected_features)

In [None]:
# Perform PCA
pca = PCA()
pca.fit(X_train_scaled)

# Get explained variance ratio and cumulative variance ratio
explained_variance_ratio = pca.explained_variance_ratio_
cumulative_variance_ratio = explained_variance_ratio.cumsum()

# Plot the cumulative explained variance ratio
plt.plot(cumulative_variance_ratio, label='Cumulative Explained Variance')

# Set a threshold (e.g. 95% variance explained)
threshold = 0.95
component_count = next((i for i, val in enumerate(cumulative_variance_ratio) if val >= threshold), len(cumulative_variance_ratio)-1)

# Plot the vertical line
plt.axvline(x=component_count, color='red', linestyle='--', label=f'{component_count+1} Components')

# Label the plot
plt.xlabel('Number of Components')
plt.ylabel('Cumulative Explained Variance')
plt.legend(loc='best')
plt.title(f'Cumulative Explained Variance (Threshold: {threshold*100:.0f}%)')

# Show plot
plt.show()

In [None]:
# Metrics calculation function
def calculate_metrics(y_true, y_pred, conf_matrix):
    tn, fp, fn, tp = conf_matrix.ravel()

    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    f1 = f1_score(y_true, y_pred)

    metrics = {
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "Specificity": specificity,
        "F1-score": f1
    }

    return metrics

def display_metrics(metrics):
    for metric_name, metric_value in metrics.items():
        print(f"{metric_name.ljust(12)}: {metric_value * 100:.2f}%")

In [None]:
# Train a base Decision Tree model before hyperparameter tuning
print("Training base Decision Tree model...")
dt_base = DecisionTreeClassifier(random_state=42)
dt_base.fit(X_train_scaled, y_train_balanced)
dt_base_preds = dt_base.predict(X_test_scaled)

In [None]:
# Generate and display metrics and confusion matrix for base model
print("\nBase Decision Tree Performance:")
print(classification_report(y_test, dt_base_preds, digits=4))
print(f"Accuracy: {accuracy_score(y_test, dt_base_preds):.4f}")
    
conf_matrix_base = confusion_matrix(y_test, dt_base_preds)

In [None]:
# Display confusion matrix as count
plt.figure(figsize=(15, 6))
plt.subplot(1, 2, 1)
sns.heatmap(conf_matrix_base, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Normal', 'DoS Attack', 'UnauthSub Attack', 'SSH Bruteforce', 'UnauthPub Attack', 'Subflood', 'Reverse Shell', 'Port Scanning Attack'], 
            yticklabels=['Normal', 'DoS Attack', 'UnauthSub Attack', 'SSH Bruteforce', 'UnauthPub Attack', 'Subflood', 'Reverse Shell', 'Port Scanning Attack'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix - Count (Base DT)')

In [None]:
# Define the Optuna objective function
def objective(trial):
    max_depth = trial.suggest_int('max_depth', 5, 50)
    min_samples_split = trial.suggest_int('min_samples_split', 2, 20)
    min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 10)
    criterion = trial.suggest_categorical('criterion', ['gini', 'entropy'])
    
    model = DecisionTreeClassifier(
        max_depth=max_depth,
        min_samples_split=min_samples_split,
        min_samples_leaf=min_samples_leaf,
        criterion=criterion,
        random_state=42
    )

    try:
        model.fit(X_train_scaled, y_train_balanced)
        y_pred = model.predict(X_test_scaled)
        score = f1_score(y_test, y_pred, average='macro')  # <-- FIXED HERE
    except Exception as e:
        print(f"Exception in Optuna trial: {e}")
        return 0.0

    return score

# Run Optuna optimization
print("\nStarting Optuna hyperparameter optimization with 15 trials...")
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=15)

# Best hyperparameters
best_params = study.best_params
print(f"\nBest hyperparameters found: {best_params}")

# Train final model
print("\nTraining Decision Tree with optimized hyperparameters...")
dt_tuned = DecisionTreeClassifier(
    max_depth=best_params['max_depth'],
    min_samples_split=best_params['min_samples_split'],
    min_samples_leaf=best_params['min_samples_leaf'],
    criterion=best_params['criterion'],
    random_state=42
)
dt_tuned.fit(X_train_scaled, y_train_balanced)
dt_tuned_preds = dt_tuned.predict(X_test_scaled)

In [None]:
# Train a new model with the best hyperparameters
print("\nTraining Decision Tree with optimized hyperparameters...")
dt_tuned = DecisionTreeClassifier(
    max_depth=best_params['max_depth'],
    min_samples_split=best_params['min_samples_split'],
    min_samples_leaf=best_params['min_samples_leaf'],
    criterion=best_params['criterion'],
    random_state=42
)
dt_tuned.fit(X_train_scaled, y_train_balanced)
dt_tuned_preds = dt_tuned.predict(X_test_scaled)

In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Get the confusion matrix
cm = confusion_matrix(y_test, dt_tuned_preds)

# Create a figure with two subplots
plt.figure(figsize=(16, 7))

# Plot 1: Confusion Matrix (Counts)
plt.subplot(1, 2, 1)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.title('Confusion Matrix (Counts)', fontsize=15)
plt.xlabel('Predicted Labels', fontsize=12)
plt.ylabel('True Labels', fontsize=12)

# Plot 2: Confusion Matrix (Percentages)
plt.subplot(1, 2, 2)
cm_percent = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100
sns.heatmap(cm_percent, annot=True, fmt='.1f', cmap='Blues', cbar=False)
plt.title('Confusion Matrix (Percentages)', fontsize=15)
plt.xlabel('Predicted Labels', fontsize=12)
plt.ylabel('True Labels', fontsize=12)

plt.tight_layout()
plt.show()

# Print classification report for additional metrics
from sklearn.metrics import classification_report
print('\nClassification Report:')
print(classification_report(y_test, dt_tuned_preds))

# Print overall accuracy
from sklearn.metrics import accuracy_score
print(f'\nAccuracy: {accuracy_score(y_test, dt_tuned_preds):.4f}')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import cross_val_score
from sklearn.tree import DecisionTreeClassifier

# Step 1: Perform 5-fold cross-validation
print("\nPerforming 5-Fold Cross Validation...")
cv_scores = cross_val_score(dt_tuned, X_train_scaled, y_train_balanced, cv=5)

# Step 2: Print cross-validation results
print(f"Cross-validation scores for each fold: {cv_scores}")
print(f"Mean accuracy: {np.mean(cv_scores)}")
print(f"Standard deviation: {np.std(cv_scores)}")

# Step 3: Visualize the comparison between the folds
plt.figure(figsize=(10, 6))

# Plot for cross-validation scores
plt.plot(range(1, 6), cv_scores, marker='o', label='Validation Accuracy', color='blue', linestyle='-', linewidth=2)

# Optional: If you want to compare training accuracy, you can also plot it (assuming you have training data available)
train_scores = [dt_tuned.fit(X_train_scaled, y_train_balanced).score(X_train_scaled, y_train_balanced) for _ in range(5)]  # Mock training accuracy for each fold

# Plot training accuracy for comparison (optional)
plt.plot(range(1, 6), train_scores, marker='x', label='Training Accuracy', color='red', linestyle='--', linewidth=2)

# Labels and title
plt.title('Comparison of Training and Validation Accuracy Across 5-Folds (Decision Tree)')
plt.xlabel('Fold Number')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

# Show plot
plt.show()

In [None]:
from joblib import dump

# Save the trained Decision Tree model
dump(dt_base, "model.joblib")
print("Model saved as model.joblib")

In [None]:
dump(scaler, "scaler.joblib")
print("Scaler saved as scaler.joblib")

In [None]:
features = list(X.columns)   # or whatever your feature DataFrame is called
with open("features.txt", "w") as f:
    for feat in features:
        f.write(feat + "\n")

print("Features saved as features.txt")