In [24]:
!pip install qiskit
!pip install qiskit_machine_learning
#!pip install 'qiskit[visualization]'
!pip install qiskit-ibm-runtime
!pip install qiskit-aer

Defaulting to user installation because normal site-packages is not writeable
Collecting qiskit
  Obtaining dependency information for qiskit from https://files.pythonhosted.org/packages/91/6b/1a251a18497eda67213efd16369ab6f35b0ca7db325a5bda810661aad25e/qiskit-2.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading qiskit-2.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting rustworkx>=0.15.0 (from qiskit)
  Obtaining dependency information for rustworkx>=0.15.0 from https://files.pythonhosted.org/packages/4f/e2/e21187b255c6211d71db0d08a44fc16771038b2af41712d66c408d9bec16/rustworkx-0.16.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading rustworkx-0.16.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting stevedore>=3.0.0 (from qiskit)
  Obtaining dependency information for stevedore>=3.0.0 from https://files.pythonhosted.org/packages/f7/45/8c4ebc0c460e6ec38

In [1]:
import numpy as np
import csv
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

def load_data(filepath):

    with open(filepath) as csv_file:
        data_file = csv.reader(csv_file)
        temp = next(data_file)
        n_samples = int(temp[0])
        n_features = int(temp[1])
        target_names = np.array(temp[2:])
        data = np.empty((n_samples, n_features))
        target = np.empty((n_samples,), dtype=int)

        for i, ir in enumerate(data_file):
            data[i] = np.asarray(ir[:-1], dtype=np.float64)
            target[i] = np.asarray(ir[-1], dtype=int)

    return data, target, target_names

def load_botnetdga():

    data, target, target_names = load_data('dataset/BotnetDgaDataset.csv')

    return data, target

def botnetdga(training_size, test_size, n):

    class_labels = [r'benign', r'dga']

    data, target = load_botnetdga()
    sample_train, sample_test, label_train, label_test = train_test_split(data, target, train_size=training_size, test_size=test_size, random_state=0)
    
   # training_input = {key: (sample_train[label_train == k, :])[:training_size]
   #                   for k, key in enumerate(class_labels)}
   # test_input = {key: (sample_test[label_test == k, :])[:test_size]
   #               for k, key in enumerate(class_labels)}
    
    return sample_train, sample_test, label_train, label_test


In [4]:
# BotnetDGA data set
plot_data = False
training_size = 1000
test_size = 300
feature_dim = 7

sample_train, sample_test, label_train, label_test = botnetdga(training_size=training_size, test_size=test_size, n=feature_dim)

import torch
X_train = torch.tensor(sample_train, dtype=torch.float32)
X_test = torch.tensor(sample_test, dtype=torch.float32)
y_train = torch.tensor(label_train, dtype=torch.float32)
y_test = torch.tensor(label_test, dtype=torch.float32)


In [7]:
#Experiment 1-1-1: ZFeatureMap+RealAmplitudes+ADAM
#Experiment 1-1-2: ZFeatureMap+TwoLocal+ADAM
#Experiment 1-1-3: ZFeatureMap+EfficientSU2+ADAM

#Experiment 1-2-1: ZZFeatureMap+RealAmplitudes+ADAM
#Experiment 1-2-2: ZZFeatureMap+TwoLocal+ADAM
#Experiment 1-2-3: ZZFeatureMap+EfficientSU2+ADAM

import qiskit
from qiskit.circuit.library import TwoLocal, PauliFeatureMap, ZFeatureMap, ZZFeatureMap, NLocal, TwoLocal, RealAmplitudes, EfficientSU2, ExcitationPreserving
from qiskit.primitives import StatevectorEstimator as Estimator
from qiskit import QuantumCircuit
from qiskit.quantum_info import Pauli, SparsePauliOp

class QuantumNet(torch.nn.Module):
    def __init__(self, estimator, feature_map, ansatz, n_qubits, observables):
        super().__init__()
        self.estimator = estimator
        self.feature_map = feature_map
        self.ansatz = ansatz
        self.n_qubits = n_qubits
        self.observables = observables
        
        # Combine circuits
        self.qc = QuantumCircuit(feature_dim)
        self.qc.compose(self.feature_map, inplace=True)
        self.qc.compose(self.ansatz, inplace=True)

        self.input_params = list(self.feature_map.parameters)
        self.weight_params = list(self.ansatz.parameters)

        # Learnable weights
        self.weights = torch.nn.Parameter(torch.randn(len(self.weight_params), dtype=torch.float32))

    def forward(self, x_batch):
        results = []
        
        for x in x_batch:
            bind_params = {p: float(v) for p, v in zip(self.input_params, x.tolist())}
            bind_params.update({p: float(w) for p, w in zip(self.weight_params, self.weights)})

            pub = (self.qc, self.observables, bind_params)
            job = self.estimator.run([pub])

            result = job.result()[0]
            results.append(result.data.evs[0])
            
            results_np = np.array(results)  # combines into one NumPy ndarray
        return torch.tensor(results_np, requires_grad=True)

    
# build quantum circuit
estimator = Estimator()
feature_map = ZFeatureMap(feature_dim)
ansatz = RealAmplitudes(feature_dim, reps=2)
#ansatz= TwoLocal(feature_dim, ['ry', 'rz'], 'cz')
#ansatz = EfficientSU2(feature_dim)
observables = [[SparsePauliOp("Z" * feature_dim)]]

model = QuantumNet(estimator, feature_map, ansatz, feature_dim, observables)

# Training 
loss_fn = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

for epoch in range(100):
    print('epoch:', epoch)
    model.train()
    optimizer.zero_grad()
    logits = model(X_train)
    loss = loss_fn(logits, y_train.unsqueeze(1))
    loss.backward()
    optimizer.step()

    
# Evaluation
model.eval()
with torch.no_grad():
    predictions = torch.sigmoid(model(X_test)).round().squeeze()
    accuracy = (predictions == y_test).float().mean()
    print(f"Test Accuracy: {accuracy:.2%}")

epoch: 0
epoch: 1
epoch: 2
epoch: 3
epoch: 4
epoch: 5
epoch: 6
epoch: 7
epoch: 8
epoch: 9
epoch: 10
epoch: 11
epoch: 12
epoch: 13
epoch: 14
epoch: 15
epoch: 16
epoch: 17
epoch: 18
epoch: 19
Test Accuracy: 49.78%


In [13]:
#Experiment 2-1-1: ZFeatureMap+RealAmplitudes+COBYLA
#Experiment 2-1-2: ZFeatureMap+TwoLocal+COBYLA
#Experiment 2-1-3: ZFeatureMap+EfficientSU2+COBYLA

#Experiment 2-2-1: ZZFeatureMap+RealAmplitudes+COBYLA
#Experiment 2-2-2: ZZFeatureMap+TwoLocal+COBYLA
#Experiment 2-2-3: ZZFeatureMap+EfficientSU2+COBYLA

import numpy as np
import torch
from qiskit import QuantumCircuit
from qiskit.circuit.library import ZFeatureMap, RealAmplitudes
from qiskit.primitives import StatevectorEstimator as Estimator
from qiskit.quantum_info import SparsePauliOp
from scipy.optimize import minimize


class QuantumNet:
    def __init__(self, estimator, feature_map, ansatz, n_qubits, observable):
        self.estimator = estimator
        self.feature_map = feature_map
        self.ansatz = ansatz
        self.n_qubits = n_qubits
        self.observable = observable

        self.qc = QuantumCircuit(n_qubits)
        self.qc.compose(self.feature_map, inplace=True)
        self.qc.compose(self.ansatz, inplace=True)

        self.input_params = list(self.feature_map.parameters)
        self.weight_params = list(self.ansatz.parameters)

        self.weights = np.random.randn(len(self.weight_params))

    def forward(self, x_batch, weights):
        results = []
        for x in x_batch:
            bind_params = {p: float(v) for p, v in zip(self.input_params, x.tolist())}
            bind_params.update({p: float(w) for p, w in zip(self.weight_params, weights)})

            job = self.estimator.run([(self.qc, self.observable, bind_params)])
            result = job.result()[0]
            results.append(result.data.evs)
        return np.array(results)


# Initialize components
estimator = Estimator()
feature_map = ZZFeatureMap(feature_dim)
#ansatz = RealAmplitudes(feature_dim, reps=2)
#ansatz= TwoLocal(feature_dim, ['ry', 'rz'], 'cz')
ansatz = EfficientSU2(feature_dim)
observable = SparsePauliOp("Z" * feature_dim)

model = QuantumNet(estimator, feature_map, ansatz, feature_dim, observable)

loss_fn = torch.nn.BCEWithLogitsLoss()

# COBYLA-compatible cost function
def cobyla_cost(weights):
    logits = model.forward(X_train, weights)
    logits_tensor = torch.tensor(logits, dtype=torch.float32).unsqueeze(1)  # Fixed here
    loss = loss_fn(logits_tensor, y_train.unsqueeze(1))
    return loss.item()



# Run COBYLA
print("Starting COBYLA optimization...")
result = minimize(cobyla_cost, model.weights, method='COBYLA', options={'maxiter': 50})
print("Optimization complete.")

# Store best weights
model.weights = result.x
print("Final loss:", result.fun)

Starting COBYLA optimization...
Optimization complete.
Final loss: 0.6878038048744202


In [146]:
#Experiment 3: RawFeatureVector+RealAmplitudes+COBYLA

import numpy as np
import torch
from qiskit import QuantumCircuit
from qiskit.circuit.library import ZFeatureMap, RealAmplitudes
from qiskit.primitives import StatevectorEstimator as Estimator
from qiskit.quantum_info import SparsePauliOp
from scipy.optimize import minimize
from qiskit_machine_learning.circuit.library import RawFeatureVector


class QuantumNet:
    def __init__(self, estimator, feature_map, ansatz, n_qubits, observable):
        self.estimator = estimator
        self.feature_map = feature_map
        self.ansatz = ansatz
        self.n_qubits = n_qubits
        self.observable = observable

        self.qc = QuantumCircuit(n_qubits)
        self.qc.compose(self.feature_map, inplace=True)
        self.qc.compose(self.ansatz, inplace=True)

        self.input_params = list(self.feature_map.parameters)
        self.weight_params = list(self.ansatz.parameters)
        self.weights = np.random.randn(len(self.weight_params))

    def forward(self, x_batch, weights):
        results = []
        for x in x_batch:

            # Step 2: bind RawFeatureVector to create a new circuit per sample
            param_dict = dict(zip(self.feature_map.parameters, x))
            feature_bound = self.feature_map.assign_parameters(param_dict)

            # Step 3: compose with ansatz
            full_circuit = QuantumCircuit(self.n_qubits)
            full_circuit.compose(feature_bound, inplace=True)
            full_circuit.compose(self.ansatz, inplace=True)

            # Step 4: bind weights to the ansatz
            bind_params = {p: float(w) for p, w in zip(self.weight_params, weights)}

            # Step 5: evaluate
            job = self.estimator.run([(full_circuit, self.observable, bind_params)])
            result = job.result()[0]
            results.append(result.data.evs)

        return np.array(results)



def main():
    # Load data
    training_size = 1500
    test_size = 450
    feature_dim = 7
    feature_dimension = 8  # RawFeatureVector requires dim = 2^n
    n_qubits=3

    sample_train, sample_test, label_train, label_test = botnetdga(training_size, test_size, feature_dim)

    X_train = torch.tensor(sample_train, dtype=torch.float32)
    X_test = torch.tensor(sample_test, dtype=torch.float32)
    y_train = torch.tensor(label_train, dtype=torch.float32)
    y_test = torch.tensor(label_test, dtype=torch.float32)

    # Pad with zeros
    X_train_padded = np.zeros((training_size,feature_dimension))
    X_train_padded[:,:feature_dim] = X_train

    # Build vqc model
    estimator = Estimator()
    feature_map = RawFeatureVector(feature_dimension=feature_dimension)
    ansatz = RealAmplitudes(n_qubits, reps=2)
    observable = SparsePauliOp("Z" * n_qubits)

    model = QuantumNet(estimator, feature_map, ansatz, n_qubits, observable)
    loss_fn = torch.nn.BCEWithLogitsLoss()

    # COBYLA-compatible cost function
    def cobyla_cost(weights):
        logits = model.forward(X_train_padded, weights)
        logits_tensor = torch.tensor(logits, dtype=torch.float32).unsqueeze(1)
        loss = loss_fn(logits_tensor, y_train.unsqueeze(1))
        return loss.item()

    print("Starting COBYLA optimization...")
    result = minimize(cobyla_cost, model.weights, method='COBYLA', options={'maxiter': 20})
    print("Optimization complete.")

    model.weights = result.x
    print("Final loss:", result.fun)


if __name__ == "__main__":
    main()


Starting COBYLA optimization...
Optimization complete.
Final loss: 0.6226545572280884
