In [None]:

from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
import pennylane as qml
from pennylane.templates import AngleEmbedding, StronglyEntanglingLayers
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from collections import namedtuple
import numpy as np


In [None]:

# Configuration
data_config = {
    "data_path": "C:/Users/Admin/Desktop/Quantum/Training and Testing Sets/UNSW_NB15_testing-set.csv",
    "split": 0.2,
    "num_pca_components": 7,
    "batch_size": 16,
    "num_workers": 4,

}

Samples = namedtuple("samples", ["x_train", "x_test", "y_train", "y_test"])

# Preprocessing pipelines
pipeline_ang = Pipeline([
    ("scaler", MinMaxScaler((0, 2 * np.pi))),  # Scale features for PCA
    ("pca", PCA(data_config["num_pca_components"])),
])

pipeline_amp = Pipeline([
    ("scaler", MinMaxScaler((0, 1))),          # Scale features for normalization
    #("normalizer", Normalizer(norm="l2")),     # Normalize for amplitude encoding
])

def get_raw_samples(cfg, state):
    """Load raw samples and preprocess them."""
    # Load data
    data = pd.read_csv(cfg["data_path"])
    
    # Encode non-numeric columns
    non_numeric_cols = data.select_dtypes(include=["object"]).columns
    print("Non-numeric columns:", non_numeric_cols)
    for col in non_numeric_cols:
        le = LabelEncoder()
        data[col] = le.fit_transform(data[col])
    
    # Define features and target
    target = "label"
    X = data.drop(columns=["attack_cat", "id", target]).to_numpy()
    y = data[target].to_numpy()
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, train_size=100, test_size=20, random_state=state, stratify=y
    )
    
    return Samples(X_train, X_test, y_train, y_test)

def preprocess_samples(samples, pipeline):
    """Apply preprocessing pipeline to samples."""
    X_train = pipeline.fit_transform(samples.x_train)
    X_test = pipeline.transform(samples.x_test)
    return Samples(X_train, X_test, samples.y_train, samples.y_test)

# Main processing
samples_raw = get_raw_samples(data_config,42)
samples_preprocessed = preprocess_samples(samples_raw, pipeline_ang)

# Unpack final outputs
X_train, X_test, y_train, y_test = (
    samples_preprocessed.x_train,
    samples_preprocessed.x_test,
    samples_preprocessed.y_train,
    samples_preprocessed.y_test,
)

# Summary of splits
print(f"Training set: X_train shape = {X_train.shape}, y_train shape = {y_train.shape}")
print(f"Test set: X_test shape = {X_test.shape}, y_test shape = {y_test.shape}")

# Count occurrences in y
y_train_counts = np.bincount(y_train)
y_test_counts = np.bincount(y_test)
print(f"Training set -> y=0: {y_train_counts[0]}, y=1: {y_train_counts[1]}")
print(f"Test set -> y=0: {y_test_counts[0]}, y=1: {y_test_counts[1]}")


In [None]:
n_qubits = len(X_train[0])
n_qubits

In [None]:

def custom_ansatz(param,wires):
    """Custom ansatz with two layers of parameterized gates."""
    for i in wires:
        qml.RY(param[i], wires=i)  # Example parameterized rotation
    for i in range(len(wires) - 1):
        qml.CNOT(wires=[wires[i], wires[i + 1]])  # Entangling gates
    for i in wires:
        qml.RZ(param[i], wires=i)  # Additional parameterized rotation

def ansatz5(params, wires):
    """ Args:
        params (array-like): Parameters for the gates.
        wires (list): List of wires on which the ansatz is applied.
    """
    # First layer of RX and RZ gates
    for i in range(len(wires)):
        qml.RX(params[i], wires=wires[i])
        qml.RZ(params[i], wires=wires[i])

    # CRX gates between all pairs of qubits
    
    for x in range(len(wires) - 1, -1, -1):
        for y in range(len(wires) - 1, -1, -1):
            if x != y:
                qml.CRZ(params[x], wires=[wires[x], wires[y]])

def ansatz6(params, wires):
    """ Args:
        params (array-like): Parameters for the gates.
        wires (list): List of wires on which the ansatz is applied.
    """
    # First layer of RX and RZ gates
    for i in range(len(wires)):
        qml.RX(params[i], wires=wires[i])
        qml.RZ(params[i], wires=wires[i])

    # CRX gates between all pairs of qubits
    
    for x in range(len(wires) - 1, -1, -1):
        for y in range(len(wires) - 1, -1, -1):
            if x != y:
                qml.CRX(params[x], wires=[wires[x], wires[y]])

def ansatz13(params, wires):
    """ Args:
        params (array-like): Parameters for the gates.
        wires (list): List of wires on which the ansatz is applied.
    """
    # First layer of RY gates
    for i in range(len(wires)):
        qml.RY(params[i], wires=wires[i])

    # First layer of CRX gates
    for x in range(len(wires) - 1, -1, -1):
        qml.CRZ(params[x], wires=[wires[x], wires[(x + 1) % len(wires)]])

    # Second layer of RY gates
    for i in range(len(wires)):
        qml.RY(params[i], wires=wires[i])

    # Second layer of CRX gates
    for x in range(len(wires)):
        qml.CRZ(
            params[x],
            wires=[wires[(x + 6) % len(wires)], wires[(x + 5) % len(wires)]],
        )


def ansatz14(params, wires):
    """ Args:
        params (array-like): Parameters for the gates.
        wires (list): List of wires on which the ansatz is applied.
    """
    # First layer of RY gates
    for i in range(len(wires)):
        qml.RY(params[i], wires=wires[i])

    # First layer of CRX gates
    for x in range(len(wires) - 1, -1, -1):
        qml.CRX(params[x], wires=[wires[x], wires[(x + 1) % len(wires)]])

    # Second layer of RY gates
    for i in range(len(wires)):
        qml.RY(params[i], wires=wires[i])

    # Second layer of CRX gates
    for x in range(len(wires)):
        qml.CRX(
            params[x],
            wires=[wires[(x + 6) % len(wires)], wires[(x + 5) % len(wires)]],
        )

def ansatz19(params, wires):
    """ Args:
        params (array-like): Parameters for the gates.
        wires (list): List of wires on which the ansatz is applied.
    """
    # First layer of RX and RZ gates
    for i in range(len(wires)):
        qml.RX(params[i], wires=wires[i])
        qml.RZ(params[i], wires=wires[i])

    # Layer of CRX gates
    for x in range(len(wires) - 1, -1, -1):
        qml.CRX(params[x], wires=[wires[x], wires[(x + 1) % len(wires)]])



In [None]:
# Ansatz functions
ansatz_functions = [ansatz5, ansatz6, ansatz13, ansatz14, ansatz19]

def kernel_ang(x1, x2, ansatz_fn):
    """Quantum kernel with angle encoding and ansatz."""
    # Apply encoding for x1
    qml.AngleEmbedding(x1, wires=range(n_qubits))
    
    # Apply ansatz for x1
    ansatz_fn(x1, wires=range(n_qubits))
        
    
    # Adjoint of ansatz for x2
    qml.adjoint(ansatz_fn)(x2, wires=range(n_qubits))

    
    # Adjoint of encoding for x2
    qml.adjoint(AngleEmbedding)(x2, wires=range(n_qubits))
    
    # Return probabilities
    return qml.probs(wires=range(n_qubits))


def kernel_amp(x1, x2):
    """Quantum kernel with amplitude encoding and ansatz."""
    # Apply encoding for x1
    qml.AmplitudeEmbedding(x1, wires=range(n_qubits), pad_with=0, normalize=True)

    # Adjoint of encoding for x2
    qml.adjoint(qml.AmplitudeEmbedding)(x2, wires=range(n_qubits), pad_with=0, normalize=True)
    
    # Return probabilities
    return qml.probs(wires=range(n_qubits))

def kernel_matrix_ang(A, B, ansatz_fn):
    """Compute the matrix whose entries are the kernel
    evaluated on pairwise data from sets A and B."""
    return np.array([[kernel_ang(a, b, ansatz_fn)[0] for b in B] for a in A])

def kernel_matrix_amp(A, B):
    """Compute the matrix whose entries are the kernel
    evaluated on pairwise data from sets A and B."""
    return np.array([[kernel_amp(a, b)[0] for b in B] for a in A])

In [None]:
import matplotlib.pyplot as plt

x1 = [0.1] * n_qubits  # Example input 1
x2 = [0.2] * n_qubits  # Example input 2
for a in ansatz_functions:
    qml.draw_mpl(kernel_ang)(x1, x2, a)
    plt.show()

qml.draw_mpl(kernel_amp)(x1, x2)
plt.show()

In [None]:
n_qubits=6

def test_kernels(config):
    """Test different kernel combinations and collect results."""
    results = []
    ansatz_functions = ["ansatz5", "ansatz6", "ansatz13", "ansatz14", "ansatz19"]
    random_states = [ 69, 77, 72, 38, 36, 55, 90, 98, 22, 57, 40, 80, 93, 75, 86]
    shots = [1024, 8192, None]
    encodings = ["amplitute", "angle"]
    for shot in shots:
        dev_kernel = qml.device("default.qubit", wires=n_qubits, shots = shot)
        @qml.qnode(dev_kernel)
        def kernel_ang(x1, x2, ansatz_fn):
            """Quantum kernel with angle encoding and ansatz."""
            # Apply encoding for x1
            qml.AngleEmbedding(x1, wires=range(n_qubits))
            
            # Apply ansatz for x1
            ansatz_fn(x1, wires=range(n_qubits))
                
            
            # Adjoint of ansatz for x2
            qml.adjoint(ansatz_fn)(x2, wires=range(n_qubits))

            
            # Adjoint of encoding for x2
            qml.adjoint(AngleEmbedding)(x2, wires=range(n_qubits))
            
            # Return probabilities
            return qml.probs(wires=range(n_qubits))


        @qml.qnode(dev_kernel)
        def kernel_amp(x1, x2):
            """Quantum kernel with amplitude encoding and ansatz."""
            # Apply encoding for x1
            qml.AmplitudeEmbedding(x1, wires=range(n_qubits), pad_with=0, normalize=True)

            # Adjoint of encoding for x2
            qml.adjoint(qml.AmplitudeEmbedding)(x2, wires=range(n_qubits), pad_with=0, normalize=True)
            
            # Return probabilities
            return qml.probs(wires=range(n_qubits))
        
        def kernel_matrix_ang(A, B, ansatz_fn):
            """Compute the matrix whose entries are the kernel
            evaluated on pairwise data from sets A and B."""
            return np.array([[kernel_ang(a, b, ansatz_fn)[0] for b in B] for a in A])

        def kernel_matrix_amp(A, B):
            """Compute the matrix whose entries are the kernel
            evaluated on pairwise data from sets A and B."""
            return np.array([[kernel_amp(a, b)[0] for b in B] for a in A])

        for k in random_states:
            for encoding in encodings:
                # Main processing
                samples_raw = get_raw_samples(data_config,k)
                samples_preprocessed = preprocess_samples(samples_raw, pipeline_amp)

                # Unpack final outputs
                X_train, X_test, y_train, y_test = (
                    samples_preprocessed.x_train,
                    samples_preprocessed.x_test,
                    samples_preprocessed.y_train,
                    samples_preprocessed.y_test,
                )

                i = 0
                if encoding == "angle":
                    for ansatz_fn in config:
                        K_train = kernel_matrix_ang(X_train, X_train, ansatz_fn)
                        K_test = kernel_matrix_ang(X_test, X_train, ansatz_fn)
                elif encoding == "amplitude":
                    K_train = kernel_matrix_amp(X_train, X_train)
                    K_test = kernel_matrix_amp(X_test, X_train)


                # Train and predict
                with dev_kernel.tracker:
                    svm = SVC(kernel="precomputed").fit(K_train, y_train)
                    y_pred = svm.predict(K_test)

                # Compute metrics
                f1 = f1_score(y_test, y_pred, average="binary")
                accuracy = accuracy_score(y_test, y_pred)
            

                # Collect results
                results.append({
                    "shots" : shot,
                    "random_state" : k,
                    "ansatz": ansatz_functions[i],
                    "f1_score": f1,
                    "accuracy": accuracy,
                })

                print(f"Ansatz: {ansatz_functions[i]}")
                print(f"F1 Score: {f1}, Accuracy: {accuracy}")
                i+=1
    return pd.DataFrame(results)

results= test_kernels(ansatz_functions)
results_df = pd.DataFrame(results)
print(results_df)

# Save to file (optional)
results_df.to_csv("kernel_tests_results_amp.csv", index=False)