In [None]:
import numpy as np                                          
from mindquantum.core.circuit import Circuit                
from mindquantum.core.gates import H, RX, RY, RZ,X    
from mindquantum.core.parameterresolver import PRGenerator  
from mindquantum.simulator import Simulator
from sklearn.datasets import fetch_openml
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split   
from mindquantum.algorithm.library import amplitude_encoder
from mindquantum.algorithm.nisq import HardwareEfficientAnsatz     
from mindquantum.core.operators import QubitOperator           # 导入QubitOperator模块，用于构造泡利算符
from mindquantum.core.operators import Hamiltonian             # 导入Hamiltonian模块，用于构建哈密顿量
import mindspore as ms                                                                         # 导入mindspore库并简写为ms
from mindquantum.framework import MQLayer,MQN2Layer                                              # 导入MQLayer
# 导入HardwareEfficientAnsatz
from mindquantum.core.gates import RY           
import torch
from torchvision import datasets, transforms# 导入量子门RY
from scipy.ndimage import zoom
import random

In [None]:
# 定义数据预处理
transform = transforms.Compose([
    transforms.ToTensor()
])
np.random.seed(10)
def filter_3_and_6(data):
    images, labels = data
    mask = (labels == 3) | (labels == 6)
    return images[mask], labels[mask]

# 下载和加载 MNIST 数据集
mnist_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=None)
filtered_data = filter_3_and_6((mnist_dataset.data, mnist_dataset.targets))
X_data, y = filtered_data  # X 图像数据 y 标签

# 分别对标签为 3 和 6 的数据进行随机抽样
def sample_data(X, y, label, sample_ratio=0.2):
    label_mask = (y == label)
    X_label = X[label_mask]
    y_label = y[label_mask]
    
    sample_size = int(len(y_label) * sample_ratio)
    sample_indices = np.random.choice(len(y_label), sample_size, replace=False)
    
    return X_label[sample_indices], y_label[sample_indices]

X_data_3, y_data_3 = sample_data(X_data, y, label=3, sample_ratio=0.2)
X_data_6, y_data_6 = sample_data(X_data, y, label=6, sample_ratio=0.2)

# 合并抽样后的数据
X_sampled = torch.cat((X_data_3, X_data_6), dim=0)
y_sampled = torch.cat((y_data_3, y_data_6), dim=0)

Compressed_X = np.array([zoom(img,0.4) for img in X_sampled])
Compressed_X = Compressed_X/255

# Compressed_X = Compressed_X[:1200]
# y_sampled = y_sampled[:1200]

# y_sampled[y_sampled ==3] =1
# y_sampled[y_sampled ==6] =0

# Compressed_X = torch.tensor(Compressed_X, dtype=torch.float32)

print(Compressed_X.shape, y_sampled.shape)


In [None]:
X_train, X_test, y_train, y_test = train_test_split(Compressed_X, y_sampled, test_size=0.2, random_state=0, shuffle=True) # 将数据集划分为训练集和测试集
y_train[y_train==3]=1
y_train[y_train==6]=0
y_test[y_test==3]=1
y_test[y_test==6]=0
print(X_train.shape)                                                                                   # 打印训练集中样本的数据类型
print(X_test.shape)                                                                                    # 打印测试集中样本的数据类型

In [None]:
def amplitude_param(pixels):
    param_rd = []
    _, parameterResolver = amplitude_encoder(pixels, 6)   
    for _, param in parameterResolver.items():
        param_rd.append(param)
    param_rd = np.array(param_rd)
    return param_rd

trian_params = np.array([amplitude_param(pixels=i.flatten()) for i in X_train])
test_params  = np.array([amplitude_param(pixels=i.flatten()) for i in X_test])
print(f'trian_params shape={trian_params.shape}')

In [38]:
# prg = PRGenerator('alpha')
# encoder = Circuit()
encoder, parameterResolver  = amplitude_encoder([0],6)
encoder = encoder.as_encoder()
encoder = encoder.no_grad()
ansatz = HardwareEfficientAnsatz(6, single_rot_gate_seq=[RY], entangle_gate=X, depth=3).circuit     # 通过
hams = [Hamiltonian(QubitOperator(f'Z{i}')) for i in [0,1]]

ms.set_context(mode=ms.PYNATIVE_MODE, device_target="CPU")
ms.set_seed(2)                                                     # 设置生成随机数的种子
circuit = encoder+ ansatz.as_ansatz()         
sim = Simulator('mqvector', 6)
grad_ops = sim.get_expectation_with_grad(hams,
                                         circuit,
                                         parallel_worker=5)
QuantumNet = MQLayer(grad_ops)          # 搭建量子神经网络

In [None]:
from mindspore.nn import SoftmaxCrossEntropyWithLogits                         # 导入SoftmaxCrossEntropyWithLogits模块，用于定义损失函数
from mindspore.nn import Adam                                                  # 导入Adam模块用于定义优化参数
from mindspore.train import Accuracy, Model, LossMonitor                       # 导入Accuracy模块，用于评估预测准确率
import mindspore as ms
from mindspore.dataset import NumpySlicesDataset
from torch.utils.data import DataLoader# 导入NumpySlicesDataset模块，用于创建模型可以识别的数据集



loss = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')            # 通过SoftmaxCrossEntropyWithLogits定义损失函数，sparse=True表示指定标签使用稀疏格式，reduction='mean'表示损失函数的降维方法为求平均值
opti = Adam(QuantumNet.trainable_params(), learning_rate=0.1)                  # 通过Adam优化器优化Ansatz中的参数，需要优化的是Quantumnet中可训练的参数，学习率设为0.1

model = Model(QuantumNet, loss, opti, metrics={'Acc': Accuracy()})             # 建立模型：将MindSpore Quantum构建的量子机器学习层和MindSpore的算子组合，构成一张更大的机器学习网络

y_test[y_test == 3] = 1
# 将值 6 替换为 0
y_test[y_test == 6] = 0
y_test_for_loader = y_test.numpy()

train_loader = NumpySlicesDataset({'features': trian_params, 'labels': y_train}, shuffle=False).batch(20) # 通过NumpySlicesDataset创建训练样本的数据集，shuffle=False表示不打乱数据，batch(5)表示训练集每批次样本点有5个
test_loader = NumpySlicesDataset({'features': test_params, 'labels': y_test},shuffle=False).batch(20)                   # 通过NumpySlicesDataset创建测试样本的数据集，batch(5)表示测试集每批次样本点有5个


class StepAcc(ms.Callback):                                                      # 定义一个关于每一步准确率的回调函数
    def __init__(self, model, test_loader):
        self.model = model
        self.test_loader = test_loader
        self.acc = []

    def on_train_step_end(self, run_context):
        
        self.acc.append(self.model.eval(self.test_loader, dataset_sink_mode=False)['Acc'])
        # print(f'ACC = {self.acc[-1]}')


monitor = LossMonitor(100)                                                       # 监控训练中的损失，每16步打印一次损失值

acc = StepAcc(model, test_loader)                                               # 使用建立的模型和测试样本计算预测的准确率

model.train(30, train_loader, callbacks=[monitor, acc], dataset_sink_mode=False)# 将上述建立好的模型训练20次

In [None]:
plt.plot(acc.acc)
plt.title('Statistics of accuracy', fontsize=20)
plt.xlabel('Steps', fontsize=20)
plt.ylabel('Accuracy', fontsize=20)