# Detecting network intrusion attacks with quantum kernel embedding and SVM application: a hardware implementation

## 0. Install dependencies & hardware backend

0.0. Dependencies

In [1]:
import os
from quask.core_implementation.qiskit_kernel import QiskitKernel
from quask.core import KernelType
from qiskit_ibm_runtime import QiskitRuntimeService
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
import numpy as np
from quask.core import Ansatz, KernelFactory, KernelType
from quask.core_implementation import QiskitKernel
from sklearn.preprocessing import MinMaxScaler

  import pkg_resources


0.1. Establish creds if running on IBM hardware (assumes IBM Quantum Cloud account exists and credentials saved with save_account())

In [2]:
service = QiskitRuntimeService(instance="crn:v1:bluemix:public:quantum-computing:us-east:a/b8ff6077c08a4ea9871560ccb827d457:d3452110-b228-4c79-8959-15ea8cfd435d::") # assuming creds saved with save_account()
backend = service.backend("ibm_rensselaer")

## 1. Create Kernels

### 1.0. Configure for either quantum simulator or hardware backend

Simulator (noiseless) backend

In [3]:
def create_qiskit_noiseless(ansatz, measurement: str, type: KernelType):
    # Here platform="infty_shots" forces Estimator/Aer path inside QiskitKernel
    return QiskitKernel(
        ansatz,
        measurement,
        type,
        platform="infty_shots",
        n_shots=None  # None means no sampling, use statevector/estimator
    )

# Register the noiseless factory (not needed if already registered)
KernelFactory.add_implementation('qiskit_noiseless', create_qiskit_noiseless)

# # Select implementation for create_kernel calls (names much match; e.g., 'qiskit_noiseless'):
KernelFactory.set_current_implementation('qiskit_noiseless')

Hardware (NISQ) backend

Note: only one implementation can be selected (e.g., qiskit_noiseless OR qiskit_ibm)

In [4]:
# Specify options for kernel
options = {
    "dynamical_decoupling": {"sequence_type": "XX", "enable": False},
    "twirling": {"enable_gates": False, "enable_measure": False, "num_randomizations": "auto", "shots_per_randomization": "auto"}
}

def create_qiskit_ibm(ansatz, measurement: str, type: KernelType):
    return QiskitKernel(
        ansatz,
        measurement,
        type,
        platform="ibm_quantum",  
        n_shots=2048,
        options=options,
        backend=backend,
    )

KernelFactory.add_implementation('qiskit_ibm', create_qiskit_ibm)

KernelFactory.set_current_implementation('qiskit_ibm') # comment out if only running on simulator

### 1.1. Ansatz and kernel creation

Config modified from QuASK iris dataset anomaly detection example; see [QuASK: How to optimize a quantum kernel](https://quask.readthedocs.io/en/latest/tutorials_quask/quask_2_optimizers.html) for alternate optimization techniques

In [5]:
ansatz = Ansatz(n_features=8, n_qubits=4, n_operations=8) # 8 features for 8 columns of NIDS data
ansatz.initialize_to_identity()
ansatz.change_operation(0, new_feature=0, new_wires=[0, 1], new_generator="XX", new_bandwidth=3)
ansatz.change_operation(1, new_feature=1, new_wires=[1, 2], new_generator="XY", new_bandwidth=3)
ansatz.change_operation(2, new_feature=2, new_wires=[2, 3], new_generator="XZ", new_bandwidth=3)
ansatz.change_operation(3, new_feature=3, new_wires=[3, 0], new_generator="YX", new_bandwidth=3)
ansatz.change_operation(4, new_feature=4, new_wires=[0, 1], new_generator="YY", new_bandwidth=3)
ansatz.change_operation(5, new_feature=5, new_wires=[1, 2], new_generator="YZ", new_bandwidth=3)
ansatz.change_operation(6, new_feature=6, new_wires=[2, 3], new_generator="ZX", new_bandwidth=3)
ansatz.change_operation(7, new_feature=7, new_wires=[3, 0], new_generator="ZY", new_bandwidth=3)

kernel = KernelFactory.create_kernel(ansatz, "ZZZZ", KernelType.FIDELITY) # create kernel with Ansatz

### 1.2. Instantiate machine learning model

In [6]:
model = SVC(kernel='precomputed') 

## 2. Fit quantum kernels to SVM model and test on BCCC-CIC-CSE-IDS2018

2.0. Load modified datasets (see KERNELSCRIPT.py for dataset cleaning and reduction)

In [7]:
benign_path = "TEST-DATA/500bfftpbenign.npy"
attack_path = "TEST-DATA/500bfftpattack.npy"

qX1 = np.load(benign_path)
qX2 = np.load(attack_path)

attack_name = attack_path[13:].replace("attack", "").replace(".npy", "").upper()

2.1. Create testing and training sets

In [8]:
# Select first N=30 samples
qX1 = qX1[:5]
qX2 = qX2[:5]

# First half of new list contains anomaly data (-1); second half is benign (1)
qX = np.vstack([qX1, qX2])
qy = np.array([-1] * len(qX1) + [1] * len(qX2)) 

# Use 0.2-0.3 test size (train on 70% of data, test on 30)
qX_train, qX_test, qy_train, qy_test = train_test_split(qX, qy, test_size=0.3, random_state=42)

2.2. Normalize data

In [9]:
samples = np.append(qX_train, qX_test, axis=0)
minmax_scale = MinMaxScaler((-1, 1)).fit(samples)
qX_train = minmax_scale.transform(qX_train)
qX_test = minmax_scale.transform(qX_test)

### 2.3. Build training matrix using quantum kernel

In [10]:
K_train = kernel.build_kernel(qX_train, qX_train, matrix="train")

model.fit(K_train, qy_train)

0 circuit-174
1 circuit-178_dg
2 barrier
3 measure
4 measure
5 measure
6 measure
     ┌──────────────────────┐                                                »
q_0: ┤0                     ├────────────────────────────────────────────────»
     │  exp(-it XX)(1.0808) │┌───────────────────┐                           »
q_1: ┤1                     ├┤0                  ├───────────────────────────»
     └──────────────────────┘│  exp(-it XY)(1.5) │┌─────────────────────────┐»
q_2: ────────────────────────┤1                  ├┤0                        ├»
                             └───────────────────┘│  exp(-it XZ)(-0.049315) │»
q_3: ─────────────────────────────────────────────┤1                        ├»
                                                  └─────────────────────────┘»
«     ┌───────────────────────┐┌────────────────────┐                      »
«q_0: ┤1                      ├┤0                   ├──────────────────────»
«     │                       ││  exp(-it YY)(-1.5) │┌

KeyboardInterrupt: 

### 2.4. Predict the labels for the test data

In [23]:
# Predict the labels for the test data
K_test = kernel.build_kernel(qX_test, qX_train, matrix="test")
y_pred = model.predict(K_test)

### 2.5. Calculate and output QML model accuracy

In [24]:
accuracy = np.sum(qy_test == y_pred) / len(qy_test)
print(f"Accuracy for {attack_name} is {accuracy}")

# Optional additional metrics
# from sklearn.metrics import classification_report
# cr = classification_report(qy_test, y_pred)
# print(cr) 

Accuracy for BFFTP is 0.3333333333333333


## 3. Further notes

The above demo handles a singular network attack, split for improved readability. Below is our testing across all attacks.

In [None]:
import time
data_dir = 'TEST-DATA-TEMP'
timing = 0

for fname in os.listdir(data_dir):
    if fname.endswith('.npy') and 'benign' in fname:
        benign_path = os.path.join(data_dir, fname)
        
        # Construct corresponding attack file name
        attack_fname = fname.replace('benign', 'attack')
        attack_path = os.path.join(data_dir, attack_fname)

        attack_label = attack_fname.replace("attack", "").replace("500", "").replace(".npy", "").upper()

        if os.path.exists(attack_path):
            # Load both arrays
            qX1 = np.load(benign_path)
            qX2 = np.load(attack_path)

            # select first 30 samples
            qX1 = qX1[:500]
            qX2 = qX2[:500]

            # Create testing/training sets
            qX = np.vstack([qX1, qX2])
            qy = np.array([-1] * len(qX1) + [1] * len(qX2))

            qX_train, qX_test, qy_train, qy_test = train_test_split(qX, qy, test_size=0.3, random_state=42)
            
            # normalize data
            samples = np.append(qX_train, qX_test, axis=0)
            minmax_scale = MinMaxScaler((-1, 1)).fit(samples)
            qX_train = minmax_scale.transform(qX_train)
            qX_test = minmax_scale.transform(qX_test)

            # Train
            start = time.process_time()
            K_train = kernel.build_kernel(qX_train, qX_train, matrix="train")

            # Fit the ML model
            model.fit(K_train, qy_train)
        
            # Test
            K_test = kernel.build_kernel(qX_test, qX_train, matrix="test")
            y_pred = model.predict(K_test)
            end = time.process_time()

            timing += (end - start)

            # Calculate accuracy
            accuracy = np.sum(qy_test == y_pred) / len(qy_test)
            print(f"Accuracy for {attack_label} is {accuracy}")
            print(f"CPU time: {end - start:.6f} seconds")
print("average time: ", timing/3)


Accuracy for BFFTP is 0.8866666666666667
Accuracy for BFSSH is 0.9966666666666667
