https://github.com/wongwsvincent/Pennylane_quantum-classifier/blob/main/Variational%20quantum%20circuit.ipynb

In [1]:
import pandas as pd
import glob
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from qiskit import QuantumCircuit, transpile, Aer, execute
from qiskit.visualization import plot_histogram
from qiskit.circuit.library import ZZFeatureMap
from qiskit_machine_learning.kernels import QuantumKernel
from qiskit_machine_learning.algorithms import QSVC

import random

In [2]:
# Read the combined dataset csv

df = pd.read_csv('combined_dataset.csv')
df

Unnamed: 0,F1,F2,F3,F4
0,410,316,749,520
1,513,297,739,509
2,511,297,738,491
3,492,273,733,558
4,472,285,710,555
...,...,...,...,...
45051,507,546,475,470
45052,512,498,489,500
45053,519,507,462,513
45054,498,528,450,533


In [3]:
# Transpose

df = df.transpose()
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,45046,45047,45048,45049,45050,45051,45052,45053,45054,45055
F1,410,513,511,492,472,451,487,498,499,488,...,507,523,537,508,489,507,512,519,498,475
F2,316,297,297,273,285,321,325,330,304,294,...,522,500,532,541,564,546,498,507,528,553
F3,749,739,738,733,710,752,767,758,737,695,...,467,486,466,452,468,475,489,462,450,480
F4,520,509,491,558,555,527,520,494,545,554,...,470,493,495,522,503,470,500,513,533,519


In [6]:
import pandas as pd
from scipy.signal import butter, filtfilt

# Assuming your EEG dataset is stored in a pandas DataFrame called 'df'
# The EEG data should be in columns representing different channels

# Define the filter parameters
lowcut = 0.1  # Lower cutoff frequency in Hz
highcut = 30  # Upper cutoff frequency in Hz
fs = 1000  # Sampling frequency in Hz
padlen = 27  # Padding length

# Apply bandpass filter to each channel in the dataset
filtered_data = pd.DataFrame()
for column in df.columns:
    # Pad the input vector to meet the minimum length requirement
    padded_x = df[column].values.tolist() + [0] * padlen

    # Apply Butterworth bandpass filter
    b, a = butter(4, [lowcut / (fs / 2), highcut / (fs / 2)], btype='band')
    filtered_channel = filtfilt(b, a, padded_x, padtype='odd', padlen=3*(max(len(b), len(a))-1))

    # Remove the padded values
    filtered_channel = filtered_channel[:-padlen]

    filtered_data[column] = filtered_channel

  filtered_data[column] = filtered_channel


In [7]:
filtered_data

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,45046,45047,45048,45049,45050,45051,45052,45053,45054,45055
0,347.466939,434.702802,433.004611,416.925868,399.983526,382.195532,412.687135,421.997431,422.853181,413.533987,...,429.592743,443.150963,455.008311,430.450724,414.355882,429.594233,433.835867,439.765435,421.982747,402.502436
1,304.30206,379.107787,377.553507,364.025436,349.398324,334.187253,360.401706,368.243058,369.119303,360.995792,...,373.966555,385.757445,395.939374,374.95091,361.12604,374.038548,377.799056,382.88021,367.703546,351.048915
2,261.672935,324.235862,322.825053,311.804732,299.459747,286.785826,308.786322,315.183378,316.077427,309.134056,...,319.077527,329.124816,337.656298,320.181977,308.593191,319.217636,322.502173,326.747813,314.136627,300.264081
3,220.100108,270.787954,269.519688,260.923021,250.795114,240.580331,258.490973,263.49202,264.39866,258.604809,...,265.639607,273.989977,280.920048,266.851987,257.432767,265.843207,268.661956,272.097392,261.972145,250.796169


In [8]:
import pandas as pd
import numpy as np
from scipy.stats import skew, kurtosis
from scipy.signal import welch

segment_size = 256
feature_df = pd.DataFrame(columns=["mean", "variance", "skewness", "kurtosis", "psd", "frequency"])

# Iterate over each row
for rownum in range(filtered_data.shape[0]):
    # Extract segments of segment_size
    for i in range(0, filtered_data.shape[1], segment_size):
        # Extract current segment
        segment = filtered_data.iloc[rownum, i:i+segment_size]

        # Calculate features for the segment
        features = []
        features.append(segment.mean())
        features.append(segment.var())
        features.append(skew(segment))
        features.append(kurtosis(segment))

        # Calculate power spectral density (PSD)
        f, psd = welch(
          segment)
        features.append(psd.mean())  # Mean of PSD


        features.append(rownum)  # Append target frequency

        # Append features to feature_df
        feature_df.loc[len(feature_df)] = features

In [9]:
feature_df

Unnamed: 0,mean,variance,skewness,kurtosis,psd,frequency
0,425.906361,4637.904896,-0.009724,-0.826606,6410.837739,0.0
1,551.659794,2304.352609,-0.451809,-0.003270,3887.976974,0.0
2,635.578799,2900.197315,0.970588,0.921783,2905.458353,0.0
3,552.335149,22790.486041,0.377306,-1.223400,16129.607092,0.0
4,501.169717,3458.247232,-0.936677,0.136579,2900.989626,0.0
...,...,...,...,...,...,...
699,266.158515,193.060911,-0.283150,-0.635364,310.609248,3.0
700,241.872231,103.425253,0.104029,-0.368583,188.273587,3.0
701,256.513993,104.256258,0.088085,-0.253929,209.712606,3.0
702,300.312222,188.100847,-0.573447,0.722081,195.639759,3.0


In [10]:
feature_df['frequency'] = feature_df['frequency'].astype('int64')

In [11]:
import numpy as np
import pandas as pd
from scipy.signal import hilbert
from sklearn.decomposition import FastICA

# Segment size
segment_size = 256

# Create an empty DataFrame to store the HHT coefficients
hht_df = pd.DataFrame()

# Iterate over each segment
for i in range(0, len(filtered_data), segment_size):
    # Extract the current segment
    segment = filtered_data.iloc[i:i+segment_size]

    # Handle missing values by interpolation
    segment_interpolated = segment.interpolate()

    # Apply ICA to the segment
    ica = FastICA(n_components=1)
    ica_components = ica.fit_transform(segment_interpolated.T)

    # Perform Hilbert-Huang Transform on each ICA component
    for j in range(ica_components.shape[1]):
        # Calculate the instantaneous amplitude using Hilbert transform
        analytic_signal = hilbert(ica_components[:, j])
        amplitude_envelope = np.abs(analytic_signal)

        # Create a column name for the component
        column_name = f"HHT_Component_{j+1}"

        # Add the HHT coefficients to the DataFrame
        hht_df = pd.concat([hht_df, pd.DataFrame(amplitude_envelope, columns=[column_name])], axis=0, ignore_index=True)
# Truncate the DataFrame to the desired shape (704, 2)
hht_df = hht_df.iloc[:704]

# Print the shape of the HHT DataFrame
print("HHT DataFrame Shape:", hht_df.shape)

HHT DataFrame Shape: (704, 1)




In [24]:
hht_df

Unnamed: 0,HHT_Component_1
0,0.003950
1,0.002150
2,0.000641
3,0.001005
4,0.001921
...,...
699,0.006515
700,0.007861
701,0.009023
702,0.008149


In [26]:
# Rearranging columns

freq_col = feature_df['frequency']
feature_df.drop(columns=['frequency'], inplace=True)
feature_df['HHT_Component_1'] = hht_df['HHT_Component_1']
feature_df['frequency'] = freq_col
feature_df

Unnamed: 0,mean,variance,skewness,kurtosis,psd,HHT_Component_1,frequency
0,425.906361,4637.904896,-0.009724,-0.826606,6410.837739,0.003950,0
1,551.659794,2304.352609,-0.451809,-0.003270,3887.976974,0.002150,0
2,635.578799,2900.197315,0.970588,0.921783,2905.458353,0.000641,0
3,552.335149,22790.486041,0.377306,-1.223400,16129.607092,0.001005,0
4,501.169717,3458.247232,-0.936677,0.136579,2900.989626,0.001921,0
...,...,...,...,...,...,...,...
699,266.158515,193.060911,-0.283150,-0.635364,310.609248,0.006515,3
700,241.872231,103.425253,0.104029,-0.368583,188.273587,0.007861,3
701,256.513993,104.256258,0.088085,-0.253929,209.712606,0.009023,3
702,300.312222,188.100847,-0.573447,0.722081,195.639759,0.008149,3


In [27]:
# feature_df.to_csv('/Users/sinner/Desktop/feature_df_6.csv', index=False)

In [28]:
# # Feature Scaling

# from sklearn.preprocessing import MinMaxScaler

# scaled_df = feature_df.copy()

# columns_to_standardize = scaled_df.columns[:-1]

# # Create a subset DataFrame with the selected columns
# df_to_standardize = scaled_df[columns_to_standardize]

# scaler = MinMaxScaler()
# df_standardized = pd.DataFrame(scaler.fit_transform(df_to_standardize))

# # Update the original DataFrame with the standardized values
# scaled_df[columns_to_standardize] = df_standardized

# scaled_df

In [2]:
feature_df = pd.read_csv("feature_df_6.csv")
feature_df

Unnamed: 0,mean,variance,skewness,kurtosis,psd,HHT_Component_1,frequency
0,425.906361,4637.904896,-0.009724,-0.826606,6410.837739,0.003950,0
1,551.659794,2304.352609,-0.451809,-0.003270,3887.976974,0.002150,0
2,635.578799,2900.197315,0.970588,0.921783,2905.458353,0.000641,0
3,552.335149,22790.486041,0.377306,-1.223400,16129.607092,0.001005,0
4,501.169717,3458.247232,-0.936677,0.136579,2900.989626,0.001921,0
...,...,...,...,...,...,...,...
699,266.158515,193.060911,-0.283150,-0.635364,310.609248,0.006515,3
700,241.872231,103.425253,0.104029,-0.368583,188.273587,0.007861,3
701,256.513993,104.256258,0.088085,-0.253929,209.712606,0.009023,3
702,300.312222,188.100847,-0.573447,0.722081,195.639759,0.008149,3


In [11]:
# Feature Scaling

from sklearn.preprocessing import MinMaxScaler

scaled_df = feature_df.copy()

columns_to_standardize = scaled_df.columns[:-1]

# Create a subset DataFrame with the selected columns
df_to_standardize = scaled_df[columns_to_standardize]

scaler = MinMaxScaler()
df_standardized = pd.DataFrame(scaler.fit_transform(df_to_standardize))

# Update the original DataFrame with the standardized values
scaled_df[columns_to_standardize] = df_standardized

scaled_df

Unnamed: 0,mean,variance,skewness,kurtosis,psd,HHT_Component_1,frequency
0,0.490051,0.100536,0.686403,0.101319,0.071774,0.286590,0
1,0.674511,0.048798,0.556955,0.218854,0.042853,0.130745,0
2,0.797607,0.062008,0.973451,0.350910,0.031590,0.000000,0
3,0.675502,0.503004,0.799731,0.044674,0.183184,0.031565,0
4,0.600450,0.074381,0.414980,0.238818,0.031539,0.110863,0
...,...,...,...,...,...,...,...
699,0.255726,0.001987,0.606341,0.128619,0.001845,0.508802,3
700,0.220101,0.000000,0.719711,0.166704,0.000442,0.625406,3
701,0.241579,0.000018,0.715043,0.183071,0.000688,0.725989,3
702,0.305824,0.001877,0.521338,0.322401,0.000527,0.650360,3


In [12]:
feature_df = scaled_df.copy()

In [21]:
import pennylane as qml
import torch
import pandas as pd
import numpy as np
from torch.autograd import Variable
import torch.optim as optim

np.random.seed(0)
torch.manual_seed(0)

num_classes = 4
margin = 0.20
feature_size = 6
batch_size = 500
lr_adam = 0.001
train_split = 0.80
# # the number of the required qubits is calculated from the number of features
num_qubits = int(np.ceil(np.log2(feature_size)))
num_layers = 7
total_iterations = 100

dev = qml.device("default.qubit", wires=num_qubits)

# defining a basic block
def layer(W):
    for i in range(num_qubits):
        qml.Rot(W[i, 0], W[i, 1], W[i, 2], wires=i)
    for j in range(num_qubits - 1):
        qml.CNOT(wires=[j, j + 1])
    if num_qubits >= 2:
        # Apply additional CNOT to entangle the last with the first qubit
        qml.CNOT(wires=[num_qubits - 1, 0])

# defining the quantum circuit, in which the number of layers depends on the number of weights taken
def circuit(weights, feat=None):
    # Encoding the features into the amplitude vector of the n qubits/wires
    # 'normalize' is set to True, to ensure the L2-norm of features equals to one
    qml.templates.embeddings.AmplitudeEmbedding(feat, range(num_qubits), pad_with=0., normalize=True)
    for W in weights:
        layer(W)
    return qml.expval(qml.PauliZ(0))

# creating a variational circuit for each one-vs-all classifier, and appending to qnodes
qnodes = []
for iq in range(num_classes):
    qnode = qml.QNode(circuit, dev, interface="torch")
    qnodes.append(qnode)
    
    
# defining a variational circuit for each one-vs-all classifier, where an additional classical bias term is added
def variational_classifier(q_circuit, params, feat):
    weights = params[0]
    bias = params[1]
#     return q_circuit(weights, feat=feat) + bias
    return q_circuit(weights, feat=feat)

# This function takes a number of samples and compute the average multiclass loss
def multiclass_svm_loss(q_circuits, all_params, feature_vecs, true_labels):
    loss = 0
    num_samples = len(true_labels)
    for i, feature_vec in enumerate(feature_vecs):
        # Compute the score given to this sample by the classifier corresponding to the
        # true label. So for a true label of 1, get the score computed by classifer 1,
        # which distinguishes between "class 1" or "not class 1".
        s_true = variational_classifier(
            q_circuits[int(true_labels[i])], # pick circuit based on true label
            (all_params[0][int(true_labels[i])], all_params[1][int(true_labels[i])]), # weight and bias
            feature_vec
        )
        s_true = s_true.float()

        li = 0
        # Get the scores computed for this sample by the other classifiers
        for j in range(num_classes):
            if j != int(true_labels[i]):
                s_j = variational_classifier(
                    q_circuits[j], (all_params[0][j], all_params[1][j]), feature_vec
                )
                s_j = s_j.float()
                li += torch.max(torch.zeros(1).float(), s_j + margin - s_true)
        loss += li

    return loss / num_samples

# This function takes a number of samples and return the predicted labels
def classify(q_circuits, all_params, feature_vecs, labels):
    predicted_labels = []
    for i, feature_vec in enumerate(feature_vecs):
        scores = np.zeros(num_classes)
        for c in range(num_classes):
            score = variational_classifier(
                q_circuits[c], (all_params[0][c], all_params[1][c]), feature_vec
            )
            scores[c] = float(score)
        pred_class = np.argmax(scores)
        predicted_labels.append(pred_class)
    return predicted_labels

from sklearn.metrics import accuracy_score

def accuracy(labels, predictions):
    accuracy = accuracy_score(labels, predictions)
    return accuracy
    

# Load up the data
def load_and_process_data():
    
    ### CHANGE THIS PART
    
    
    
    df = feature_df
    
    
    ###
    
    
    data=df.to_numpy()
    unique_elements, counts_elements = np.unique(data[:,-1], return_counts=True)
    
    X = torch.tensor(data[:, 0:feature_size])

    Y = torch.tensor(data[:, -1])
    return X, Y


from sklearn.model_selection import train_test_split

# Create a train and test split.
def split_data(feature_vecs, Y):

    X = feature_vecs
    y = Y

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=0, stratify=y)
    return X_train, X_test, y_train, y_test

In [None]:
def training(features, Y):
    num_data = Y.shape[0]
    feat_vecs_train, feat_vecs_test, Y_train, Y_test = split_data(features, Y)
    num_train = Y_train.shape[0]
    q_circuits = qnodes

    # Initialize the parameters, with Gaus(0, 0.1) distribution
    all_weights = [
        Variable(0.1 * torch.randn(num_layers, num_qubits, 3), requires_grad=True)
        for i in range(num_classes)
    ]
    all_bias = [Variable(0.1 * torch.ones(1), requires_grad=True) for i in range(num_classes)]
    optimizer = optim.Adam(all_weights + all_bias, lr=lr_adam)
    params = (all_weights, all_bias)
    print("Num params: ", 3 * num_layers * num_qubits * 3 + 3)

    costs, train_acc, test_acc = [], [], []

    # train the variational classifier
    for it in range(total_iterations):
        batch_index = np.random.randint(0, num_train, (batch_size,))
        feat_vecs_train_batch = feat_vecs_train[batch_index]
        Y_train_batch = Y_train[batch_index]

        optimizer.zero_grad()
        curr_cost = multiclass_svm_loss(q_circuits, params, feat_vecs_train_batch, Y_train_batch)
        curr_cost.backward()
        optimizer.step()

        # Compute predictions on train and validation set
        predictions_train = classify(q_circuits, params, feat_vecs_train, Y_train)
        predictions_test = classify(q_circuits, params, feat_vecs_test, Y_test)
        acc_train = accuracy(Y_train, predictions_train)
        acc_test = accuracy(Y_test, predictions_test)

        print(
            "Iter: {:5d} | Cost: {:0.7f} | Acc train: {:0.7f} | Acc test: {:0.7f} "
            "".format(it + 1, curr_cost.item(), acc_train, acc_test)
        )

        costs.append(curr_cost.item())
        train_acc.append(acc_train)
        test_acc.append(acc_test)

    return costs, train_acc, test_acc


# now run the training algorithm and plot the results
features, Y = load_and_process_data()
costs, train_acc, test_acc = training(features, Y)

import matplotlib.pyplot as plt

fig, ax1 = plt.subplots()
iters = np.arange(0, total_iterations, 1)
colors = ["tab:red", "tab:blue"]
ax1.set_xlabel("Iteration", fontsize=17)
ax1.set_ylabel("Cost", fontsize=17, color=colors[0])
ax1.plot(iters, costs, color=colors[0], linewidth=4)
ax1.tick_params(axis="y", labelsize=14, labelcolor=colors[0])

ax2 = ax1.twinx()
ax2.set_ylabel("Test Acc.", fontsize=17, color=colors[1])
ax2.plot(iters, test_acc, color=colors[1], linewidth=4)

ax2.tick_params(axis="x", labelsize=14)
ax2.tick_params(axis="y", labelsize=14, labelcolor=colors[1])

plt.grid(False)
plt.tight_layout()
plt.show()

Num params:  192
Iter:     1 | Cost: 0.5680646 | Acc train: 0.2664298 | Acc test: 0.2482270 
