In [1]:
import numpy as np
from matplotlib import pyplot as plt
from tqdm import tqdm
import pandas as pd

from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV

from sklearn.base import BaseEstimator, TransformerMixin, clone
import pyswarms as ps
from sklearn.linear_model import LinearDiscriminantAnalysis

import xgboost as xgb
from xgboost import XGBClassifier

ImportError: cannot import name 'LinearDiscriminantAnalysis' from 'sklearn.linear_model' (/home/ari/mambaforge/envs/ccd/lib/python3.10/site-packages/sklearn/linear_model/__init__.py)

In [None]:
class PSOTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, model):
        self.model = model
    
    def fit(self, X, y=None):
        
        # Define objective function
        def f_per_particle(m, alpha):
            """Computes for the objective function per particle

            Inputs
            ------
            m : numpy.ndarray
                Binary mask that can be obtained from BinaryPSO, will
                be used to mask features.
            alpha: float (default is 0.5)
                Constant weight for trading-off classifier performance
                and number of features

            Returns
            -------
            numpy.ndarray
                Computed objective function
            """
            total_features = X.shape[1]
            # Get the subset of the features from the binary mask
            if np.count_nonzero(m) == 0:
                X_subset = X
            else:
                X_subset = X[:,m==1]
            # Perform classification and store performance in P
            self.model.fit(X_subset, y)
            P = (self.model.predict(X_subset) == y).mean()
            # Compute for the objective function
            j = (alpha * (1.0 - P)
                + (1.0 - alpha) * (1 - (X_subset.shape[1] / total_features)))

            return j
        
        def f(x, alpha=0.88):
            """Higher-level method to do classification in the
            whole swarm.

            Inputs
            ------
            x: numpy.ndarray of shape (n_particles, dimensions)
                The swarm that will perform the search

            Returns
            -------
            numpy.ndarray of shape (n_particles, )
                The computed loss for each particle
            """
            n_particles = x.shape[0]
            j = [f_per_particle(x[i], alpha) for i in range(n_particles)]
            return np.array(j)
        
        # Initialize swarm, arbitrary
        options = {'c1': 0.5, 'c2': 0.5, 'w':0.9, 'k': 30, 'p':2}

        # Call instance of PSO
        dimensions = X.shape[1] # dimensions should be the number of features
        optimizer = ps.discrete.BinaryPSO(n_particles=30, 
                                          dimensions=dimensions, 
                                          options=options)

        # Perform optimization
        cost, pos = optimizer.optimize(f, 
                                       iters=1000, 
                                       verbose=2)
        self.positions = pos
        
        return self
    
    def transform(self, X, y=None):
        X_selected_features = X[:, pos==1]
        return X_selected_features
    
    def get_params(self, deep=True):
        return {"model": self.model}
    
    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

In [None]:
df = pd.read_csv("data/merged_and_edited_dataset.csv",
                dtype={
                        "src_ip": "string",
                        "dst_ip": "string",
                        "client_fingerprint": "string",
                        "application_name": "string",
                        "application_category_name": "string",
                        "requested_server_name": "string",
                        "atk_type": "string",
                        "traffic_type": "string"
                    }).drop([
                        "Unnamed: 0",
                        "server_fingerprint",
                        "user_agent",
                        "content_type", 
                        "src_ip", 
                        "dst_ip", 
                        "splt_direction", 
                        "splt_ps", 
                        "splt_piat_ms", 
                        "application_name", 
                        "application_category_name", 
                        "requested_server_name", 
                        "client_fingerprint"
                    ], axis=1)
X = df.drop(['id', 'traffic_type', 'atk_type'], axis=1)
y = LabelEncoder().fit_transform(df['atk_type'])

In [None]:
classifier = XGBClassifier(n_estimators = 500,
                           max_depth = 7,
                           learning_rate = 0.1,
                           verbose=None,
                           eval_metric='logloss', 
                           tree_method="gpu_hist")
scaler = StandardScaler()
pipe = Pipeline(
    [
        ("scaling", scaler),
        ("fs", "passthrough"),
        ("classify", classifier)
    ],
    memory = "cache/"
)

param_grid = [
    {
        "fs": [PSOTransformer(LinearDiscriminantAnalysis())]
    }
    #{
    #    "fs": [PCA()],
    #    "fs__n_components": np.arange(5, len(X.columns), 10)
    #}
]

search = GridSearchCV(pipe, param_grid, cv=5, verbose=4)
search.fit(X, y)
print("Best parameter (CV score=%0.3f):" % search.best_score_)
print(search.best_params_)