In [62]:
import pennylane as qml
from pennylane import numpy as np
from tensorflow import keras
from sklearn import decomposition
from pennylane.optimize import NesterovMomentumOptimizer
import cv2
from skimage.feature import local_binary_pattern
import random

In [63]:
n_epochs = 30   # Number of optimization epochs
n_train = 400    # Size of the train dataset
n_test = 240     # Size of the test dataset
n_dim = 784       # 需要降到多少维
target_label_list = [0,1] #需要提取数据的标签列表

In [64]:
SAVE_PATH = "QNN/data/" # Data saving folder
PREPROCESS = True          # If False, skip processing and load data from SAVE_PATH
PCA_DR = False              # 是否进行PCA降维处理
select_samples_with_labels = True       # 是否挑选特定标签的数据
FS_state = True              # 是否进行图像特征提取
fs_type = 'random'               #图像特征提取的方式，'LBP','HOG','random'
load_random_index = False    # 是否加载随机选择特征的索引
random_selected_feature_num = 200 #随机选择像素的个数 
mnist_dataset = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist_dataset.load_data()

In [65]:
#做2分类两种方案，1、抽取0和1作为数据集， 2、将0-4作为第一类对应0，5-9作为第二类对应1
def extract_data_with_label(origin_data, origin_label, target_label_list):
    new_data = []
    new_labels = []
    for i in range(len(origin_label)):
        for j in range(len(target_label_list)):
            if(origin_label[i] == target_label_list[j]):
                new_data.append(origin_data[i,:,:])
                new_labels.append(origin_label[i])
    new_data = np.array(new_data)
    new_labels = np.array(new_labels)
    return new_data, new_labels

if select_samples_with_labels == True:
    test_images, test_labels = extract_data_with_label(test_images, test_labels, target_label_list)
    train_images, train_labels = extract_data_with_label(train_images, train_labels, target_label_list)  

# Reduce dataset size
train_images = train_images[:n_train]
train_labels = train_labels[:n_train]
test_images = test_images[:n_test]
test_labels = test_labels[:n_test]

In [66]:
def normalization(image):
    image -= image.min()
    image = image / (image.max() - image.min())
    image *= 255
    image = image.astype(np.uint8)
    return image

def extract_HOG_features(origin_image):
    origin_image = normalization(origin_image)
    cell_size = (6,6)
    num_cells_per_block = (2,2)
    block_size = (num_cells_per_block[0] * cell_size[0], num_cells_per_block[1] * cell_size[1])
    x_cells = origin_image.shape[1] // cell_size[0]
    y_cells = origin_image.shape[0] // cell_size[1]
    h_stride = 1
    v_stride = 1
    block_stride = (cell_size[0] * h_stride, cell_size[1] * v_stride)
    num_bins = 9
    win_size = (x_cells * cell_size[0], y_cells * cell_size[1])
    hog = cv2.HOGDescriptor(win_size, block_size, block_stride, cell_size, num_bins)
    hog_descriptor = hog.compute(origin_image)
    return hog_descriptor

def fs_with_HOG(ori_imgs):
    new_images = []
    for idx, img in enumerate(ori_imgs):
        new_images.append(extract_HOG_features(img))
    new_images = np.array(new_images)
    return new_images

pixel_num = np.shape(test_images)[1] * np.shape(test_images)[2]


def get_random_selected_list(selected_feature_num): #生成随机列表，用于选择特定的像素
    selected_index_list = []

    if load_random_index == True:
        selected_index_list = np.load(SAVE_PATH + "selected_index_list_" + str(random_selected_feature_num) + "_01.npy")
        selected_index_list = selected_index_list.tolist()
    else: 
        for i in range(selected_feature_num):
            selected_index_list.append(random.randint(0,pixel_num-1))

        selected_index_list = np.array(selected_index_list, requires_grad=False)
        np.save(SAVE_PATH + "selected_index_list_" + str(random_selected_feature_num) + "_01.npy", selected_index_list)

    return selected_index_list


def fs_with_random(ori_imgs, selected_index_list): # 随机选择若干个像素
    new_images = []
    selected_index_list.sort()
    ori_imgs = np.reshape(ori_imgs,(np.shape(ori_imgs)[0],-1))

    for i in range(len(selected_index_list)):
        new_images.append(ori_imgs[:,selected_index_list[i]])

    new_images = np.array(new_images, requires_grad=False)
    new_images = np.transpose(new_images)
    print('dim:',np.shape(new_images))
    return new_images 

def fs_with_LBP(ori_imgs, radius): # radius为LBP算法中范围半径的取值
    n_points = 8 * radius
    new_images = []
    for idx, img in enumerate(ori_imgs):
        new_images.append(local_binary_pattern(img, n_points, radius))
    new_images = np.array(new_images, requires_grad=False)
    return new_images

In [67]:
random_index_list = get_random_selected_list(random_selected_feature_num)

def feature_selection(images, fs_type):
    processed_images = []
    if fs_type == 'LBP':
        processed_images = fs_with_LBP(images, 1)
    elif fs_type == 'HOG':
        processed_images = fs_with_HOG(images)
    elif fs_type == 'random':
        processed_images = fs_with_random(images, random_index_list)

    return processed_images 

if FS_state == True:
    test_images = feature_selection(test_images, fs_type)
    train_images = feature_selection(train_images, fs_type)


# Normalize pixel values within 0 and 1
train_images = train_images / (np.max(train_images) - np.min(train_images))
test_images = test_images / (np.max(test_images) - np.min(test_images))

def pca_with_origin_data(ori_data, n_dim):
    #进行PCA降维
    n_sample = np.shape(ori_data)[0]
    ori_data = ori_data.reshape(n_sample,-1)   
    pca = decomposition.PCA(n_components = n_dim)
    return pca.fit_transform(ori_data) 

if PCA_DR == True: 
    test_images = pca_with_origin_data(test_images,n_dim)
    train_images = pca_with_origin_data(train_images,n_dim)

dim: (240, 200)
dim: (400, 200)


In [68]:
n_wires = int(np.ceil(np.log(n_dim) / np.log(2)))
dev = qml.device("default.qubit", wires=n_wires)

def layer(W):

    for i in range(n_wires):
        qml.Rot(W[i,0], W[i,1], W[i,2], wires=i)
        if i == n_wires - 1:
            qml.CNOT(wires = [i, 0])
        else:
            qml.CNOT(wires = [i, i+1])

@qml.qnode(dev)
def circuit(weights, f=False):
    #初态制备
    qml.AmplitudeEmbedding(f, wires=range(n_wires), normalize=True, pad_with=0.)

    for W in weights:
        layer(W)

    return qml.expval(qml.PauliZ(0))

In [69]:
def variational_classifier(weights, bias, x):
    return circuit(weights, x) + bias

def square_loss(labels, predictions):
    loss = 0
    for l, p in zip(labels, predictions):
        loss = loss + (l - p) ** 2

    loss = loss / len(labels)
    return loss

def accuracy(labels, predictions):

    loss = 0
    for l, p in zip(labels, predictions):
        if abs(l - p) < 1e-5:
            loss = loss + 1
    loss = loss / len(labels)

    return loss

def cost(weights, bias, features, labels):
    predictions = [variational_classifier(weights, bias, f) for f in features]
    return square_loss(labels, predictions)


In [70]:
def process_img_to_features(image):
    img = image.flatten()
    return img

if PREPROCESS == True: #将图像数据拉成1维
    new_train_images = []
    print("pre-processing of train images:")
    for idx, img in enumerate(train_images):
        print("{}/{}        ".format(idx + 1, np.shape(train_images)[0]), end="\r")
        new_train_images.append(process_img_to_features(img))
    train_images = np.array(new_train_images, requires_grad=False)

    new_test_images = []
    print("\npre-processing of test images:")
    for idx, img in enumerate(test_images):
        print("{}/{}        ".format(idx + 1, np.shape(test_images)[0]), end="\r")
        new_test_images.append(process_img_to_features(img))
    test_images = np.array(new_test_images, requires_grad=False)

    # Save pre-processed images
    np.save(SAVE_PATH + "new_train_images_" + str(n_dim) + fs_type + "_01.npy", train_images)
    np.save(SAVE_PATH + "new_test_images_" + str(n_dim) + fs_type + "_01.npy", test_images)

pre-processing of train images:
1/400        2/400        3/400        4/400        5/400        6/400        7/400        8/400        9/400        10/400        11/400        12/400        13/400        14/400        15/400        16/400        17/400        18/400        19/400        20/400        21/400        22/400        23/400        24/400        25/400        26/400        27/400        28/400        29/400        30/400        31/400        32/400        33/400        34/400        35/400        36/400        37/400        38/400        39/400        40/400        41/400        42/400        43/400        44/400        45/400        46/400        47/400        48/400        49/400        50/400        51/400        52/400        53/400        54/400        55/400        56/400        57/400        58/400        59/400        60/400        61/400        62/400        63/400        64/400        65/400        66

In [None]:
train_images = np.load(SAVE_PATH + "new_train_images_" + str(n_dim) + fs_type + "_01.npy")
test_images = np.load(SAVE_PATH + "new_test_images_" + str(n_dim) + fs_type + "_01.npy")

train_labels = train_labels * 2 - np.ones(len(train_labels)) # shift label from {0, 1} to {-1, 1}
test_labels  =  test_labels * 2 - np.ones(len(test_labels))

#初始化参数
np.random.seed(0)
num_qubits = n_wires
num_layers = 6
weights_init = 0.01 * np.random.randn(num_layers, num_qubits, 3, requires_grad=True)
bias_init = np.array(0.0, requires_grad=True)

opt = NesterovMomentumOptimizer(0.01)
batch_size = 5

weights = weights_init
bias = bias_init

best_result_list = [0,0,0,0]

for it in range(n_epochs):

    # Update the weights by one optimizer step
    batch_index = np.random.randint(0, len(train_images), (batch_size,))
    X_batch = train_images[batch_index]
    Y_batch = train_labels[batch_index]

    weights, bias, _, _ = opt.step(cost, weights, bias, X_batch, Y_batch)
    
    # Compute predictions on train and validation set
    predictions_train = [np.sign(variational_classifier(weights, bias, f)) for f in train_images]
    predictions_val = [np.sign(variational_classifier(weights, bias, f)) for f in test_images]

    # Compute accuracy on train and validation set
    acc_train = accuracy(train_labels, predictions_train)
    acc_val = accuracy(test_labels, predictions_val)

    print(
        "Iter: {:5d} | Cost: {:0.7f} | Acc train: {:0.7f} | Acc validation: {:0.7f} "
        "".format(it + 1, cost(weights, bias, train_images, train_labels), acc_train, acc_val)
    )
    if(acc_val > best_result_list[3]):
        best_result_list = [it + 1, cost(weights, bias, train_images, train_labels), acc_train, acc_val]
    elif(acc_val == best_result_list[3]):
        if(cost(weights, bias, train_images, train_labels) < best_result_list[1]):
            best_result_list = [it + 1, cost(weights, bias, train_images, train_labels), acc_train, acc_val]

print(
    "best result----\nIter: {:5d} | Cost: {:0.7f} | Acc train: {:0.7f} | Acc validation: {:0.7f} "
    "".format(best_result_list[0], best_result_list[1], best_result_list[2], best_result_list[3])
)

print('the number of features:', np.shape(test_images)[1])