In [1]:
import pennylane as qml
from pennylane import numpy as np
from tensorflow import keras
import random
import pymrmr
import pandas as pd
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

In [2]:
n_train = 100    # Size of the train dataset
n_test = 60     # Size of the test dataset
n_dim = 8       # 需要降到多少维
target_label_list = [0,1,2] #需要提取数据的标签列表
run_numbers = 1           #重复运行多少次,每次随机设置gate-encoding 线路      

In [3]:
SAVE_PATH = "QNN/data/" # Data saving folder
PREPROCESS = True          # If False, skip processing and load data from SAVE_PATH
select_samples_with_labels = True       # 是否挑选特定标签的数据
FS_state = True              # 是否进行图像特征提取
fs_type = 'mRMR'               #图像特征提取的方式,'random','mRMR'
load_selected_index = False   # 是否加载随机选择或者mRMR特征的索引
mnist_dataset = keras.datasets.mnist  
(train_images, train_labels), (test_images, test_labels) = mnist_dataset.load_data()

In [4]:
#做2分类两种方案，抽取0和1作为数据集，
def extract_data_with_label(origin_data, origin_label, target_label_list):
    new_data = []
    new_labels = []
    for i in range(len(origin_label)):
        for j in range(len(target_label_list)):
            if(origin_label[i] == target_label_list[j]):
                new_data.append(origin_data[i,:,:])
                new_labels.append(origin_label[i])
    new_data = np.array(new_data)
    new_labels = np.array(new_labels)
    return new_data, new_labels

if select_samples_with_labels == True:
    test_images, test_labels = extract_data_with_label(test_images, test_labels, target_label_list)
    train_images, train_labels = extract_data_with_label(train_images, train_labels, target_label_list)  

# Reduce dataset size
train_images = train_images[:n_train]
train_labels = train_labels[:n_train]
test_images = test_images[:n_test]
test_labels = test_labels[:n_test]

In [5]:
def get_random_selected_list(selected_feature_num): #生成随机列表，用于选择特定的像素
    selected_index_list = []
    pixel_num = np.shape(test_images)[1] * np.shape(test_images)[2]
    
    if load_random_index == True:
        selected_index_list = np.load(SAVE_PATH + "selected_index_list_" + str(random_selected_feature_num) + "_01.npy")
        selected_index_list = selected_index_list.tolist()
    else: 
        for i in range(selected_feature_num):
            selected_index_list.append(random.randint(0,pixel_num-1))

        selected_index_list = np.array(selected_index_list, requires_grad=False)
        np.save(SAVE_PATH + "selected_index_list_" + str(random_selected_feature_num) + "_01.npy", selected_index_list)

    return selected_index_list

def get_mRMR_selected_list_with_train_data(orgin_imgs, labels, n_dim):
    selected_index_list = []

    if load_selected_index == True:
        selected_index_list = np.load(SAVE_PATH + "mRMR_selected_index_list_" + str(n_dim) + "_01.npy")
    else:
        labels = np.reshape(labels, (np.shape(labels)[0], -1))
        #mRMR应该是对训练集提取特征子集，然后在测试集上选取一样的特征子集来测试效果
        orgin_imgs = np.reshape(orgin_imgs,(np.shape(orgin_imgs)[0],-1))
        data = np.concatenate((labels,orgin_imgs), axis=1)

        row_index_list = []
        row_name = []
        for i in range(len(labels)):
            row_name = 'Row_' + str(i+1)
            row_index_list.append(row_name)

        column_index_list = []
        column_name = []

        for i in range(np.shape(orgin_imgs)[1] + 1):
            column_name = 'Colum_' + str(i+1)
            column_index_list.append(column_name)

        data_df = pd.DataFrame(data, index=row_index_list, columns = column_index_list)
        selected_index_list = pymrmr.mRMR(data_df, 'MID', n_dim)
        np.save(SAVE_PATH + "mRMR_selected_index_list_" + str(n_dim) + "_01.npy", selected_index_list)
    return selected_index_list

def fs_with_random(ori_imgs, selected_index_list): # 随机选择若干个像素
    new_images = []
    selected_index_list.sort()
    ori_imgs = np.reshape(ori_imgs,(np.shape(ori_imgs)[0],-1))

    for i in range(len(selected_index_list)):
        new_images.append(ori_imgs[:,selected_index_list[i]])

    new_images = np.array(new_images, requires_grad=False)
    new_images = np.transpose(new_images)
    print('dim:',np.shape(new_images))
    return new_images 

def fs_with_mRMR(ori_imgs, labels, selected_index_list):
    #mRMR应该是对训练集提取特征子集，然后在测试集上选取一样的特征子集来测试效果
    labels = np.reshape(labels, (np.shape(labels)[0], -1))
    ori_imgs = np.reshape(ori_imgs,(np.shape(ori_imgs)[0],-1))
    data = np.concatenate((labels,ori_imgs), axis=1)

    row_index_list = []
    row_name = []
    for i in range(len(labels)):
        row_name = 'Row_' + str(i+1)
        row_index_list.append(row_name)

    column_index_list = []
    column_name = []

    for i in range(np.shape(ori_imgs)[1] + 1):
        column_name = 'Colum_' + str(i+1)
        column_index_list.append(column_name)

    data_df = pd.DataFrame(data, index=row_index_list, columns = column_index_list)
    new_images = data_df[selected_index_list]
    return np.array(new_images, dtype='float64')

In [6]:
def feature_selection(images, labels, fs_type, selected_index_list):
    processed_images = []
    if fs_type == 'random':
        #random_index_list = get_random_selected_list(n_dim)
        processed_images = fs_with_random(images, selected_index_list)
    elif fs_type == 'mRMR':
        #mRMR_index_list = get_mRMR_selected_list_with_train_data(train_images, train_labels, n_dim)
        processed_images = fs_with_mRMR(images, labels, selected_index_list)

    return processed_images 

if FS_state == True:
    selected_index_list = []
    if fs_type == 'mRMR':
        selected_index_list = get_mRMR_selected_list_with_train_data(train_images, train_labels, n_dim)
    elif fs_type == 'random':
        selected_index_list = get_random_selected_list(n_dim)

    test_images = feature_selection(test_images, test_labels, fs_type, selected_index_list)
    train_images = feature_selection(train_images, train_labels, fs_type, selected_index_list)


# Normalize pixel values within 0 and 1
train_images = train_images / (np.max(train_images) - np.min(train_images))
test_images = test_images / (np.max(test_images) - np.min(test_images))

#当维数特别低时候，比如少于20维时，有些数据就全为0，需要剔除,label要同步处理
train_remove_index = train_images.sum(axis=1)!=0
test_remove_index = test_images.sum(axis=1)!=0

train_images = train_images[train_remove_index,:]
test_images = test_images[test_remove_index,:]

#process labels
train_labels = np.reshape(train_labels, (np.shape(train_labels)[0], -1))
test_labels = np.reshape(test_labels, (np.shape(test_labels)[0], -1))

train_labels = train_labels[train_remove_index,:]
test_labels = test_labels[test_remove_index,:]

train_labels = train_labels.reshape(-1)
test_labels = test_labels.reshape(-1)

In [7]:
def process_img_to_features(image):
    img = image.flatten()
    return img

if PREPROCESS == True: #将图像数据拉成1维
    new_train_images = []
    print("pre-processing of train images:")
    for idx, img in enumerate(train_images):
        print("{}/{}        ".format(idx + 1, np.shape(train_images)[0]), end="\r")
        new_train_images.append(process_img_to_features(img))
      
    train_images = np.array(new_train_images, requires_grad=False)

    new_test_images = []
    print("\npre-processing of test images:")
    for idx, img in enumerate(test_images):
        print("{}/{}        ".format(idx + 1, np.shape(test_images)[0]), end="\r")
        new_test_images.append(process_img_to_features(img))
   
    test_images = np.array(new_test_images, requires_grad=False)

    # Save pre-processed images
    np.save(SAVE_PATH + "new_train_images_" + str(n_dim) + fs_type + "_01.npy", train_images)
    np.save(SAVE_PATH + "new_test_images_" + str(n_dim) + fs_type + "_01.npy", test_images)


# Load pre-processed images
train_images = np.load(SAVE_PATH + "new_train_images_" + str(n_dim) + fs_type + "_01.npy")
test_images = np.load(SAVE_PATH + "new_test_images_" + str(n_dim) + fs_type + "_01.npy")



pre-processing of train images:
1/100        2/100        3/100        4/100        5/100        6/100        7/100        8/100        9/100        10/100        11/100        12/100        13/100        14/100        15/100        16/100        17/100        18/100        19/100        20/100        21/100        22/100        23/100        24/100        25/100        26/100        27/100        28/100        29/100        30/100        31/100        32/100        33/100        34/100        35/100        36/100        37/100        38/100        39/100        40/100        41/100        42/100        43/100        44/100        45/100        46/100        47/100        48/100        49/100        50/100        51/100        52/100        53/100        54/100        55/100        56/100        57/100        58/100        59/100        60/100        61/100        62/100        63/100        64/100        65/100        66

In [8]:
# scaling the inputs is important since the embedding we use is periodic
X_train_scaler = StandardScaler().fit(train_images)
train_images = X_train_scaler.transform(train_images)

X_test_scaler = StandardScaler().fit(test_images)
test_images = X_test_scaler.transform(test_images)

In [9]:
n_qubits = n_dim

dev_kernel = qml.device("default.qubit", wires=n_qubits)

projector = np.zeros((2**n_qubits, 2**n_qubits))
projector[0, 0] = 1

#随机生成100种路线，8根线路，RX，RY，RY就有3**8，乘以 H门有2**8 乘以CNOT门有2**8，一共有429981696
#用列表代表线路门的排列：一共3层，第一层0代表RX门，1代表RY门，2代表RZ门；第二层1代表使用H门，0代表不使用，第三层0代表不用CNOT门，1代表设置CNOT门，
#且与下一个qubit链接，比如目前在第1个qubit，则CNOT门是链接第1和第2qubit。
def generate_circuit_code(wires):
    circuit_code = []
    for i in range(3):
        for j in range(wires):
            if(i == 0):
                circuit_code.append(random.randint(0,2))
            else:
                circuit_code.append(random.randint(0,1))

    return circuit_code

In [None]:
def layer(x, wires, curcuit_code):
    """Building block of the embedding ansatz"""
    for j, wire in enumerate(wires):
        for i in range(np.shape(curcuit_code)[1]):
            if i == 0:
                if(curcuit_code[j,i] == 0):
                    qml.RX(x[j], wires=[wire])
                elif(curcuit_code[j,i] == 1):
                    qml.RY(x[j], wires=[wire])
                elif(curcuit_code[j,i] == 2):
                    qml.RZ(x[j], wires=[wire])
            elif i == 1:
                if(curcuit_code[j,i] == 1):
                    qml.Hadamard(wires=[wire])
            elif i == 2:
                if(curcuit_code[j,i] == 1):
                    if j == len(wires) - 1:
                        qml.CNOT(wires = [j, 0])
                    else:
                        qml.CNOT(wires = [j, j+1])
       

def ansatz(x, curcuit_code, wires):
    """The embedding ansatz"""
    layer(x, wires, curcuit_code)


adjoint_ansatz = qml.adjoint(ansatz)

@qml.qnode(dev_kernel)
def kernel_circuit(x1, x2, curcuit_code):
    """The quantum kernel."""
    ansatz(x1, curcuit_code, wires=range(n_qubits))
    adjoint_ansatz(x2, curcuit_code, wires=range(n_qubits))
    return qml.expval(qml.Hermitian(projector, wires=range(n_qubits)))

def multiple_kernel(x1,x2, curcuit_code_list):
    value_list = []
    for i in range(len(curcuit_code_list)):
        value_list.append(kernel_circuit(x1,x2, curcuit_code_list[i]))
    value = 0.4 * value_list[0] + 0.1 * value_list[1] + 0.25 * value_list[2] + 0.25 * value_list[3]
    #return np.mean(value_list)
    return value

def kernel_matrix(A, B, curcuit_code):
    """Compute the matrix whose entries are the kernel
       evaluated on pairwise data from sets A and B."""
    return np.array([[kernel_circuit(a, b, curcuit_code) for b in B] for a in A])

index_list = [7,12,19,28]
kernel_function_list = []
curcuit_code_list =  np.loadtxt("circuit_code_cls3_mRMR_2.txt", dtype='int')
curcuit_code_opt_list = []

for i in range(len(index_list)):
    curcuit_code = curcuit_code_list[index_list[i]]
    curcuit_code = np.reshape(curcuit_code, (n_dim,3), order='F')
    curcuit_code_opt_list.append(curcuit_code)

def multiple_kernel_matrix(A, B, curcuit_code_list):
    """Compute the matrix whose entries are the kernel
       evaluated on pairwise data from sets A and B."""
    return np.array([[multiple_kernel(a, b, curcuit_code_list) for b in B] for a in A])


accuracy_list = []
for i in range(run_numbers):
    #print(multiple_kernel_matrix(train_images[0], train_images[0], curcuit_code_opt_list))
    kernel_function = lambda A, B: multiple_kernel_matrix(A, B, curcuit_code_opt_list)
    svm = SVC(kernel=kernel_function, decision_function_shape = 'ovr').fit(train_images, train_labels)
    predictions = svm.predict(test_images)

    print(predictions)
    print(test_labels)

    accuracy_test = accuracy_score(predictions, test_labels)
    print(f"The accuracy of a kernel is {accuracy_test:.3f}")
    accuracy_list.append(accuracy_test)



np.savetxt("circuit_code_cls3_" + fs_type + "0830.txt", curcuit_code_list, fmt = '%d')
np.savetxt("accuracy_test_cls3_" + fs_type + "0830.txt", accuracy_list, fmt = '%s')

print('max accuracy is :', max(accuracy_list))
print('min accuracy is :', min(accuracy_list))