<a href="https://colab.research.google.com/github/11PRIMUS/intrusion-detection-system/blob/master/unsw.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import time
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')


In [None]:
class EGA_PSO:
    def __init__(self, n_particles=10, n_iterations=20, rf_estimators=30, cv_folds=2):
        self.n_particles = n_particles
        self.n_iterations = n_iterations
        self.rf_estimators = rf_estimators
        self.cv_folds = cv_folds
        self.best_features = None
        self.best_fitness = -1
        self.fitness_history = []
        self.current_pso_output = None

    def random_forest(self, particle, X, y):
        """Call RF to determine fitness"""
        #convert continuous particle to binary
        binary_particle = (particle > 0.6).astype(int)

        #skip if no feature selected
        if np.sum(binary_particle) == 0:
            return -1, None  # Penalize invalid solutions

        #RF fitness evaluation
        clf=RandomForestClassifier(n_estimators=self.rf_estimators,
                                    random_state=42, n_jobs=-1)

        selected_features =binary_particle.astype(bool)
        if self.cv_folds > 1:
            cv_scores = cross_val_score(clf, X[:, selected_features], y,
                                      cv=self.cv_folds, scoring='accuracy')
            accuracy = np.mean(cv_scores)
        else:
            clf.fit(X[:, selected_features], y)
            accuracy = clf.score(X[:, selected_features], y)

        return accuracy, binary_particle

    def selection(self, particles, fitness_values):
        """enhanced selection process of GA"""
        #sort particles by fitness and select top performer
        sorted_indices=np.argsort(fitness_values)[::-1]  # descending order
        elite_size =int(self.n_particles * 0.2)  # top 20%
        elite_indices=sorted_indices[:elite_size]

        new_selected_value =particles[elite_indices] #new value=best
        best_fit=fitness_values[sorted_indices[0]]

        return new_selected_value, best_fit, elite_indices

    def crossover_generation(self, parent_a, parent_b):
        """generate offspring through crossover"""
        n_features = len(parent_a)
        crossover_mask = np.random.randint(0, 2, size=n_features).astype(bool)
        offspring = np.where(crossover_mask, parent_a, parent_b)
        return offspring

    def enhanced_mutation(self, parent):
        """enhanced mutation process"""
        n_features =len(parent) #select value randomly
        mutation_mask =np.random.rand(n_features) < 0.1

        #generate offspring with mutation
        offspring =parent.copy()
        offspring[mutation_mask] =np.random.rand(np.sum(mutation_mask))
        return offspring

    def pso_rejected(self, offspring_parent, particles, velocities,pbest, gbest, particle_idx):
        """apply PSO to check outcome"""
        n_features =len(offspring_parent)

        # PSO velocity update parameters
        w =0.9 - (0.5 * self.current_iteration / self.n_iterations)  # inertia weight
        c1,c2 =0.5, 0.3  # cognitive and social factors

        r1, r2 = np.random.rand(n_features), np.random.rand(n_features)

        #update PSO velocity
        velocities[particle_idx] = (
            w * velocities[particle_idx] +
            c1 * r1 * (pbest[particle_idx] - offspring_parent) +
            c2 * r2 * (gbest - offspring_parent)
        )

        #position update
        new_output_pso =offspring_parent + velocities[particle_idx]
        new_output_pso = np.clip(new_output_pso, 0, 1) #stay within 0 and 1

        return new_output_pso

    def enhanced_selection(self):
        """selection for new population generation"""
        #this method can implement additional selection strategies
        #for now, it maintains the current approach
        pass

    def fit(self, X, y):
        """main EGA-PSO optimization loop """
        n_features =X.shape[1]

        #initialize particle and velocities
        particles =np.random.uniform(0, 1, (self.n_particles, n_features))
        velocities =np.zeros((self.n_particles, n_features))

        #personal and global best
        pbest = particles.copy()
        pbest_fitness = np.zeros(self.n_particles) - 10
        gbest =None
        gbest_fitness = -10

        print(f"Starting EGA-PSO optimization with {self.n_particles} particles for {self.n_iterations} iterations...")

        #1. RF to determine best fitness
        print("Determining initial fitness values...")
        for i in range(self.n_particles):
            fitness_val, _ =self.random_forest(particles[i], X, y)
            pbest_fitness[i] =fitness_val

            if fitness_val > gbest_fitness:
                gbest_fitness = fitness_val
                gbest = particles[i].copy()

        #2. enhanced selection process of GA
        for iteration in range(self.n_iterations):
            self.current_iteration = iteration
            current_fitness = []

            #evaluate current fitness for all particles
            for i in range(self.n_particles):
                fitness_val, _ = self.random_forest(particles[i], X, y)
                current_fitness.append(fitness_val)

                #personal best ++
                if fitness_val > pbest_fitness[i]:
                    pbest_fitness[i] = fitness_val
                    pbest[i] = particles[i].copy()

                #global best ++
                if fitness_val > gbest_fitness:
                    gbest_fitness = fitness_val
                    gbest = particles[i].copy()

            current_fitness = np.array(current_fitness)

            new_selected_value, best_fit, elite_indices = self.selection(particles, current_fitness)

            # fitness history
            self.fitness_history.append(gbest_fitness)

            if iteration % 10 ==0:
                avg_features =np.mean([np.sum(p > 0.6) for p in particles])
                print(f"Iteration {iteration}: Best fitness = {gbest_fitness:.4f}, "
                      f"Avg features selected = {avg_features:.1f}")

            #3. best fit individual
            offspring_list = []
            for _ in range(self.n_particles - len(elite_indices)):
                if len(new_selected_value) > 0:
                    if np.random.rand() < 0.8:  #80% crossover
                        parent_indices =np.random.choice(len(new_selected_value), 2, replace=False)
                        i_pa, i_pb = new_selected_value[parent_indices[0]], new_selected_value[parent_indices[1]]

                        i_pc =self.crossover_generation(i_pa, i_pb)
                    else:
                        parent_idx =np.random.choice(len(new_selected_value))
                        parent = new_selected_value[parent_idx]
                        i_pc = self.enhanced_mutation(parent)

                    offspring_list.append(i_pc)

            #4. calculate best fit
            if len(offspring_list) > 0:
                offspring_array = np.array(offspring_list)

                worst_indices =np.argsort(current_fitness)[:len(offspring_list)]

                for i, worst_idx in enumerate(worst_indices):
                    #5. PSO method to check outcome best fit
                    new_output_pso =self.pso_rejected(offspring_array[i], particles, velocities,
                                                     pbest, gbest, worst_idx)

                    #6. enhanced selection to generate new population
                    pso_fitness, _ = self.random_forest(new_output_pso, X, y)

                    if pso_fitness >= best_fit * 0.95:  #within 5% of best fit
                        if self.current_pso_output is not None:
                            new_output_pso = 0.7 *new_output_pso + 0.3 * self.current_pso_output
                        self.current_pso_output =new_output_pso
                        particles[worst_idx] = new_output_pso
                    else:
                        self.enhanced_selection()
                        particles[worst_idx]=offspring_array[i]

        #final solution
        final_fitness, self.best_features = self.random_forest(gbest, X, y)
        self.best_fitness = final_fitness

        print(f"Optimization completed!")
        print(f"Best fitness achieved: {self.best_fitness:.4f}")
        print(f"Number of selected features: {np.sum(self.best_features) if self.best_features is not None else 0}")

        return self.best_features.astype(bool) if self.best_features is not None else None

    def get_selected_features(self):
        if self.best_features is not None:
            return np.where(self.best_features)[0]
        return None

    def plot_fitness_history(self):
        """fitness evolution vs iterations"""
        plt.figure(figsize=(12, 6))
        plt.plot(self.fitness_history, 'b-', linewidth=2, marker='o')
        plt.title('EGA-PSO Fitness Evolution', fontsize=14)
        plt.xlabel('Iteration', fontsize=12)
        plt.ylabel('Best Fitness (Accuracy)', fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

print("EGA-PSO class defined successfully!")

EGA-PSO class defined successfully!


In [None]:
def load_unsw_nb15():
    try:
        print("Loading UNSW-NB15 training data...")
        # Load training data from ds/unsw/ directory
        train_data = pd.read_csv('/content/unsw_train.csv')
        print(f"Training data shape: {train_data.shape}")

        print("Loading UNSW-NB15 test data...")
        # Load test data
        test_data = pd.read_csv('/content/unsw_test.csv')
        print(f"Test data shape: {test_data.shape}")

        print("Preprocessing UNSW-NB15 data...")

        # Remove unnecessary columns if they exist
        columns_to_remove = ['id']  # UNSW-NB15 has an 'id' column
        for col in columns_to_remove:
            if col in train_data.columns:
                train_data = train_data.drop(col, axis=1)
                test_data = test_data.drop(col, axis=1)

        # Separate features and labels
        # UNSW-NB15 uses 'label' for binary classification (0=normal, 1=attack)
        # and 'attack_cat' for multi-class classification
        X_train = train_data.drop(['label', 'attack_cat'], axis=1)
        y_train = train_data['label']  # Binary labels
        X_test = test_data.drop(['label', 'attack_cat'], axis=1)
        y_test = test_data['label']

        categorical_columns = ['proto', 'service', 'state']

        label_encoders = {}
        for col in categorical_columns:
            if col in X_train.columns:
                le = LabelEncoder()
                combined_data = pd.concat([X_train[col], X_test[col]], axis=0)
                le.fit(combined_data)

                X_train[col] = le.transform(X_train[col])
                X_test[col] = le.transform(X_test[col])
                label_encoders[col] = le

        #remaining non-numeric columns
        for col in X_train.columns:
            if X_train[col].dtype == 'object':
                print(f"Converting categorical column: {col}")
                le = LabelEncoder()
                combined_data = pd.concat([X_train[col], X_test[col]], axis=0)
                le.fit(combined_data)
                X_train[col] = le.transform(X_train[col])
                X_test[col] = le.transform(X_test[col])

        #converts to numpy array
        X_train = X_train.astype(float).values
        X_test = X_test.astype(float).values
        y_train = y_train.astype(int).values
        y_test = y_test.astype(int).values

        #handles NaN values
        from sklearn.impute import SimpleImputer
        imputer = SimpleImputer(strategy='mean')
        X_train = imputer.fit_transform(X_train)
        X_test = imputer.transform(X_test)

        #feature scaling
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)

        print("Data preprocessing completed successfully!")
        print(f"Final training set shape: {X_train.shape}")
        print(f"Final test set shape: {X_test.shape}")
        print(f"Training class distribution: {np.bincount(y_train)}")
        print(f"Test class distribution: {np.bincount(y_test)}")

        return X_train, X_test, y_train, y_test

    except FileNotFoundError as e:
        print(f"Error: Dataset files not found. {e}")
        print("Please ensure UNSW_NB15_training-set.csv and UNSW_NB15_testing-set.csv are in directory.")
        return None, None, None, None
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None, None, None, None

In [None]:
# Load the UNSW-NB15 dataset
X_train, X_test, y_train, y_test = load_unsw_nb15()

if X_train is not None:
    print("\n" + "="*50)
    print("UNSW-NB15 Dataset loaded successfully!")
    print("="*50)
    print(f"Training set: {X_train.shape[0]} samples, {X_train.shape[1]} features")
    print(f"Test set: {X_test.shape[0]} samples, {X_test.shape[1]} features")
    print(f"Training class distribution: Normal={np.bincount(y_train)[0]}, Attack={np.bincount(y_train)[1]}")
    print(f"Test class distribution: Normal={np.bincount(y_test)[0]}, Attack={np.bincount(y_test)[1]}")
else:
    print("Failed to load dataset. Please check file paths.")

Loading UNSW-NB15 training data...
Training data shape: (82332, 45)
Loading UNSW-NB15 test data...
Test data shape: (175341, 45)
Preprocessing UNSW-NB15 data...
Data preprocessing completed successfully!
Final training set shape: (82332, 42)
Final test set shape: (175341, 42)
Training class distribution: [37000 45332]
Test class distribution: [ 56000 119341]

UNSW-NB15 Dataset loaded successfully!
Training set: 82332 samples, 42 features
Test set: 175341 samples, 42 features
Training class distribution: Normal=37000, Attack=45332
Test class distribution: Normal=56000, Attack=119341


In [None]:
#for quick testing you can use n_particles=10, n_iteration=20, rf_estimators=30, and cv_folds=1,2
#for better results increase the number of iteration , trees and cv folds
if X_train is not None:
    print("Initializing EGA-PSO parameters...")

    n_particles = 10     #particles in swarm
    n_iterations = 20
    rf_estimators = 30   #trees in RF
    cv_folds = 2          #cross folds

    print(f"Parameters:")
    print(f"- Particles: {n_particles}")
    print(f"- Iterations: {n_iterations}")
    print(f"- RF Estimators: {rf_estimators}")
    print(f"- CV Folds: {cv_folds}")

    feature_selector = EGA_PSO(
        n_particles=n_particles,
        n_iterations=n_iterations,
        rf_estimators=rf_estimators,
        cv_folds=cv_folds
    )

    print("EGA-PSO initialized successfully!")


Initializing EGA-PSO parameters...
Parameters:
- Particles: 10
- Iterations: 20
- RF Estimators: 30
- CV Folds: 2
EGA-PSO initialized successfully!


In [None]:
if X_train is not None:
    print("Starting EGA-PSO Feature Selection...")
    print("="*50)

    start_time = time.time()

    #feature selection
    selected_mask = feature_selector.fit(X_train, y_train)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"\nFeature selection completed in {execution_time:.2f} seconds")
    print(f"Average time per iteration: {execution_time/n_iterations:.2f} seconds")

Starting EGA-PSO Feature Selection...
Starting EGA-PSO optimization with 10 particles for 20 iterations...
Determining initial fitness values...
Iteration 0: Best fitness = 0.6747, Avg features selected = 16.3
Iteration 10: Best fitness = 0.7389, Avg features selected = 12.7
Optimization completed!
Best fitness achieved: 0.7389
Number of selected features: 12

Feature selection completed in 846.12 seconds
Average time per iteration: 42.31 seconds


In [None]:
if X_train is not None and selected_mask is not None:
    print("FEATURE SELECTION RESULTS")
    print("="*50)

    selected_features = feature_selector.get_selected_features()

    print(f" Selected {len(selected_features)} features out of {X_train.shape[1]} total features")
    print(f" Feature reduction: {(1 - len(selected_features)/X_train.shape[1])*100:.2f}%")
    print(f" Best fitness achieved: {feature_selector.best_fitness:.4f}")

    print(f"\nSelected feature indices: {selected_features}")

    # UNSW-NB15 feature names
    unsw_feature_names = [
        'dur', 'proto', 'service', 'state', 'spkts', 'dpkts', 'sbytes', 'dbytes',
        'rate', 'sttl', 'dttl', 'sload', 'dload', 'sloss', 'dloss', 'sinpkt',
        'dinpkt', 'sjit', 'djit', 'swin', 'stcpb', 'dtcpb', 'dwin', 'tcprtt',
        'synack', 'ackdat', 'smean', 'dmean', 'trans_depth', 'response_body_len',
        'ct_srv_src', 'ct_state_ttl', 'ct_dst_ltm', 'ct_src_dport_ltm',
        'ct_dst_sport_ltm', 'ct_dst_src_ltm', 'is_ftp_login', 'ct_ftp_cmd',
        'ct_flw_http_mthd', 'ct_src_ltm', 'ct_srv_dst', 'is_sm_ips_ports'
    ]

    print(f"\nSelected features by name:")
    for i, idx in enumerate(selected_features):
        if idx < len(unsw_feature_names):
            print(f"{i+1:2d}. {unsw_feature_names[idx]} (index: {idx})")
        else:
            print(f"{i+1:2d}. Feature_{idx} (index: {idx})")

FEATURE SELECTION RESULTS
 Selected 12 features out of 42 total features
 Feature reduction: 71.43%
 Best fitness achieved: 0.7389

Selected feature indices: [ 0  1  4  6 10 17 18 19 22 29 32 40]

Selected features by name:
 1. dur (index: 0)
 2. proto (index: 1)
 3. spkts (index: 4)
 4. sbytes (index: 6)
 5. dttl (index: 10)
 6. sjit (index: 17)
 7. djit (index: 18)
 8. swin (index: 19)
 9. dwin (index: 22)
10. response_body_len (index: 29)
11. ct_dst_ltm (index: 32)
12. ct_srv_dst (index: 40)


In [None]:
def print_metrics(y_true, y_pred, title):
    print(f"\n{title}")
    print("="*30)

    report = classification_report(y_true, y_pred, target_names=['Normal', 'Attack'], output_dict=True)
    cm = confusion_matrix(y_true, y_pred)

    TN, FP, FN, TP = 0, 0, 0, 0
    if cm.shape == (2, 2):
        TN, FP, FN, TP = cm.ravel()
    elif cm.shape == (1, 1):
        if y_true[0] == 0:
            TN = cm[0,0]
        else:
            TP = cm[0,0]


    accuracy = report.get('accuracy', 0)
    precision = report.get('Attack', {}).get('precision', 0)
    recall = report.get('Attack', {}).get('recall', 0)
    f1_score = report.get('Attack', {}).get('f1-score', 0)

    TPR = TP / (TP + FN) if (TP + FN) > 0 else 0
    TNR = TN / (TN + FP) if (TN + FP) > 0 else 0
    FPR = FP / (TN + FP) if (TN + FP) > 0 else 0
    FNR = FN / (TP + FN) if (TP + FN) > 0 else 0

    print(f"{'Metric':<20} {'Score':<10}")
    print("-" * 30)
    print(f"{'Accuracy':<20} {accuracy:<10.4f}")
    print(f"{'Precision (Attack)':<20} {precision:<10.4f}")
    print(f"{'Recall (Attack)/TPR':<20} {recall:<10.4f}")
    print(f"{'F1-Score (Attack)':<20} {f1_score:<10.4f}")
    print(f"{'TNR (Specificity)':<20} {TNR:<10.4f}")
    print(f"{'FPR (False Alarm)':<20} {FPR:<10.4f}")
    print(f"{'FNR (Miss Rate)':<20} {FNR:<10.4f}")

if X_test is not None:
    y_pred_selected = clf_selected.predict(X_test[:, selected_mask])
    y_pred_all = clf_all.predict(X_test)

    print_metrics(y_test, y_pred_selected, "Metrics - Selected Features")
    print_metrics(y_test, y_pred_all, "Metrics - All Features")


Metrics - Selected Features
Metric               Score     
------------------------------
Accuracy             0.9088    
Precision (Attack)   0.9766    
Recall (Attack)/TPR  0.8873    
F1-Score (Attack)    0.9298    
TNR (Specificity)    0.9547    
FPR (False Alarm)    0.0453    
FNR (Miss Rate)      0.1127    

Metrics - All Features
Metric               Score     
------------------------------
Accuracy             0.9008    
Precision (Attack)   0.9891    
Recall (Attack)/TPR  0.8638    
F1-Score (Attack)    0.9222    
TNR (Specificity)    0.9797    
FPR (False Alarm)    0.0203    
FNR (Miss Rate)      0.1362    
