In [None]:
import pennylane as qml
from pennylane import numpy as np
import torch
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchinfo import summary
import torch.nn as nn
#from torchsummary import summary
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset
from PIL import Image
import time
import logging
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import pickle

In [None]:
class Args:
    num_heads = 5
    num_layers = 7 
    dim_crossAtt = 500 
    dim_amp = pow(2, 6) 
    n_qubits = 4
    bsz = 32
    epochs = 50
    inter_step = int(n_qubits/2) + 2
    

def read_pickle(path):
    with open(path, 'rb')as f:
        feats = pickle.load(f)
    return feats

def save_pickle(path, feats):
    with open(path, 'wb')as f:
        pickle.dump(feats, f)

In [None]:
dev = qml.device("default.qubit", wires=Args.n_qubits)
@qml.qnode(dev)
def circuit(inputs, **weights):
    
    qml.AmplitudeEmbedding(inputs, wires=range(Args.n_qubits), normalize=True)

    for layer in range(Args.num_layers): 

        # 编码；
        s = Args.n_qubits * 6 * layer
        for idx, i in enumerate(range(Args.n_qubits)):
            qml.RX(weights["weights_"+str(i+s)], wires=idx)
    
        for idx, i in enumerate(range(Args.n_qubits)):
            qml.RZ(weights["weights_"+str(i+Args.n_qubits+s)], wires=idx)
    
        for idx, i in enumerate(range(Args.n_qubits)):
            qml.RY(weights["weights_"+str(i+Args.n_qubits*2+s)], wires=idx) 

    
    # 进行intra-modal fusion; 量子纠缠 (无参数)；
    n_qubits_img = [i for i in range(int(Args.n_qubits/2))]
    n_qubits_txt = [i+int(Args.n_qubits/2) for i in range(int(Args.n_qubits/2))]

    # 图片 intra_modal；
    for i in n_qubits_img:
        if i < n_qubits_img[-1]:
            qml.CNOT(wires=[i, i+1])
        else:
            qml.CNOT(wires=[i, 0])

    # 文本 intra_modal；
    for i in n_qubits_txt:
        if i < n_qubits_txt[-1]:
            qml.CNOT(wires=[i, i+1])
        else:
            qml.CNOT(wires=[i, n_qubits_txt[0]])

    # 进行角度编码 <X, Z, Y>；
    for layer in range(Args.num_layers): 
        s = Args.n_qubits * 6 * layer

        # 进行量子编码；
        for idx, i in enumerate(range(Args.n_qubits)):
            qml.RX(weights["weights_"+str(i+Args.n_qubits*3+s)], wires=idx)
    
        for idx, i in enumerate(range(Args.n_qubits)):
            qml.RZ(weights["weights_"+str(i+Args.n_qubits*4+s)], wires=idx)
    
        for idx, i in enumerate(range(Args.n_qubits)):
            qml.RY(weights["weights_"+str(i+Args.n_qubits*5+s)], wires=idx)

#####################################################

    if int(Args.n_qubits/2) == Args.inter_step:
    
        # 多模态融合 inter-modal；
        for i in range(Args.n_qubits):
            if i < int(Args.n_qubits/2):
                qml.CNOT(wires=[i, i+int(Args.n_qubits/2)])
            else:
                qml.CNOT(wires=[i, i-int(Args.n_qubits/2)])

    else:
        # 多模态融合 inter-modal；
        loc_0, loc_1 = [], []
        for i in range(Args.n_qubits):
            if i < int(Args.n_qubits/2):
                if i + Args.inter_step <= Args.n_qubits - 1:
                    loc = [i, i+Args.inter_step]
                    loc_0.append(loc)
                else:
                    loc = [i, i+Args.inter_step-int(Args.n_qubits/2)]
                    loc_0.append(loc)
            else:
                dic_reverse = dict([[i[1], i[0]] for i in loc_0])
                loc = [i, dic_reverse[i]]
                loc_1.append(loc)
        loc = loc_0 + loc_1
        for l in loc:
            qml.CNOT(wires=[l[0], l[1]])            
#####################################################
    
    
    # 进行量子测量；
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(Args.n_qubits)]


weight_shapes = {"weights_"+str(i): 1 for i in range(Args.n_qubits*6*Args.num_layers)}

init_method_ = torch.nn.init.normal_
qlayer = qml.qnn.TorchLayer(circuit, weight_shapes=weight_shapes, init_method=init_method_)

In [None]:
class QCMFM(nn.Module):
  # Quanvolution --> Convolution;
  def __init__(self):
    super().__init__()
    self.conv = nn.Sequential(
        nn.Conv2d(in_channels=2048, out_channels=512,
              kernel_size=2, stride=1, padding=0),
        nn.ReLU(), nn.MaxPool2d(kernel_size=2))

    # 放缩到同一个维度；
    self.fc_txt = nn.Sequential(nn.Linear(768, Args.dim_crossAtt), nn.ReLU())
    self.fc_img = nn.Sequential(nn.Linear(512, Args.dim_crossAtt), nn.ReLU())
    
    # 正则化；
    self.lm_t = nn.LayerNorm(Args.dim_crossAtt)
    self.lm_i = nn.LayerNorm(Args.dim_crossAtt)

    # 跨模态交叉注意力机制；
    self.att_img_txt = nn.MultiheadAttention(embed_dim=Args.dim_crossAtt, num_heads=Args.num_heads, batch_first=True)
    self.att_txt_img = nn.MultiheadAttention(embed_dim=Args.dim_crossAtt, num_heads=Args.num_heads, batch_first=True)

    # 线性层：图片和文本；
    self.fc_t_res = nn.Sequential(nn.Linear(Args.dim_crossAtt*30, int(pow(2, Args.n_qubits)/2)), nn.ReLU())
    self.fc_i_res = nn.Sequential(nn.Linear(Args.dim_crossAtt*49, int(pow(2, Args.n_qubits)/2)), nn.ReLU())
    
    # 量子线路多模态融合；
    self.qlayer = qlayer

    # 量子线路融合特征非线性激活；
    self.fc_q = nn.Linear(Args.n_qubits, 4)
    self.name = "HQCMFM-qubits=" + str(Args.n_qubits) + "-layers=" + str(Args.num_layers) + "-step=" + str(Args.inter_step)

  def forward(self, x_img, x_txt,):
    x_img = x_img.view(-1, 49, 512) # 图片
    x_txt = x_txt.view(-1, 30, 768) # 文本

    # 放缩到同一个维度；
    x_img = self.fc_img(x_img)
    x_txt = self.fc_txt(x_txt)

    # 正则化；
    x_img = self.lm_i(x_img)
    x_txt = self.lm_t(x_txt)

    # 跨模态交叉注意力机制；
    x_img_att, i_att_weights = self.att_img_txt(query=x_img, key=x_txt, value=x_txt)
    x_txt_att, t_att_weights = self.att_txt_img(query=x_txt, key=x_img, value=x_img)

    # reshape;
    x_img_att_res = x_img_att.reshape(x_img_att.shape[0], 500*49)
    x_txt_att_res = x_txt_att.reshape(x_txt_att.shape[0], 500*30)

    # 线性层；拼接
    x_img_att_res = self.fc_i_res(x_img_att_res)
    x_txt_att_res = self.fc_t_res(x_txt_att_res)
    x_conca = torch.cat([x_img_att_res, x_txt_att_res], dim=-1)

    # 量子多模态融合；
    x_qfusion = self.qlayer(x_conca)

    # 融合后的输出；
    x_qfusion = self.fc_q(x_qfusion)

    return F.log_softmax(x_qfusion, -1)

In [None]:
textiles_data_seq = read_pickle("data.pickle")

imgs = np.concatenate([textiles_data_seq['chou']['img'], textiles_data_seq['duan']['img'], 
                       textiles_data_seq['jin']['img'], textiles_data_seq['si']['img']], 
                       axis=0)

txts = np.concatenate([textiles_data_seq['chou']['txt_emb'], textiles_data_seq['duan']['txt_emb'], 
                       textiles_data_seq['jin']['txt_emb'], textiles_data_seq['si']['txt_emb']], 
                       axis=0)

labs = textiles_data_seq['chou']['labs'] + textiles_data_seq['duan']['labs'] + \
       textiles_data_seq['jin']['labs'] + textiles_data_seq['si']['labs']

In [None]:
# 抽取图片特征；

from tensorflow.keras.applications import VGG19, ResNet50, Xception
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Add, TimeDistributed, Multiply, Softmax, Reshape, MultiHeadAttention
from tensorflow.python.keras.utils import np_utils 

base_model = VGG19(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))
head_model = base_model.output
pre_model = Model(inputs=base_model.input, outputs=head_model)

imgs = pre_model.predict(imgs, batch_size=256)

In [None]:
# 构建数据集；

class LpDataset(Dataset):

  def __init__(self, imgs, txts, labels):
    """
    :param labels: [0, 1, 2, ...]
    """
    self.name2label = {"falang": 0, "jinyin": 1, "qiqi": 2, 
                       "tongqi": 3, "yushi": 4}
    self.labels = labels
    self.imgs = imgs
    self.txts = txts

  def __getitem__(self, idx):
    img, txt, label = self.imgs[idx], self.txts[idx], self.labels[idx]

    return img, txt, label

  def __len__(self):
    return len(self.labels)


trainData = LpDataset(imgs=trainX_img, txts=trainX_txt, labels=trainY)
testData = LpDataset(imgs=testX_img, txts=testX_txt, labels=testY)

trainLoader = DataLoader(dataset=trainData, batch_size=Args.bsz,
              sampler=torch.utils.data.RandomSampler(trainData),
              num_workers=0,
              pin_memory=True)
testLoader = DataLoader(dataset=testData, batch_size=Args.bsz,
              sampler=torch.utils.data.RandomSampler(testData),
              num_workers=0,
              pin_memory=True)


def train(trainLoader, model, device, optimizer):
    for idx, trainD in enumerate(trainLoader):
        #print(trainD[1].shape)
        inputs_i, inputs_t, targets = trainD[0], trainD[1], trainD[2]
        inputs_i, inputs_t, targets = inputs_i.to(device), inputs_t.to(device), targets.to(device)
        outputs = model(inputs_i, inputs_t)
        loss = F.nll_loss(outputs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        #logger.info("loss: %f"%loss.item())
        print(f"loss: {loss.item()}", end="\r")

def test(testLoader, model, device):
    target_all = []
    output_all = []
    with torch.no_grad():
        for idx, testD in enumerate(testLoader):
            inputs_i, inputs_t, targets = testD[0], testD[1], testD[2]
            inputs_i, inputs_t, targets = inputs_i.to(device), inputs_t.to(device), targets.to(device)
            outputs = model(inputs_i, inputs_t)

            target_all.append(targets)
            output_all.append(outputs)
        target_all = torch.cat(target_all, dim=0)
        output_all = torch.cat(output_all, dim=0)

    _, indices = output_all.topk(1, dim=1)
    masks = indices.eq(target_all.view(-1, 1).expand_as(indices))
    size = target_all.shape[0]
    corrects = masks.sum().item()
    accuracy = corrects / size
    loss = F.nll_loss(output_all, target_all).item()

    print(f"test set accuracy: {accuracy}")
    print(f"test set loss: {loss}")

    return accuracy, loss, target_all, output_all

In [None]:
np.random.seed(42)
torch.manual_seed(42)
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

# 进行模型加载；
model = QCMFM().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = CosineAnnealingLR(optimizer, T_max=Args.epochs)
accs, y_preds, y_trues = [], [], []

f1_best = 0.
for epoch in range(1, Args.epochs+1):
    # train
    print(f"******Train {epoch} epoch*******")
    s_time = time.time()
    train(trainLoader, model, device, optimizer)
    e_time = time.time()
    print(f"|| Training time: {e_time-s_time}")
    #times.append((e_time-s_time))
    
    # test
    acc_tes, loss_test, y_true_test, y_pred_test = test(testLoader, model, device)
    try:
      y_true_test_, y_pred_test_ = y_true_test.cpu(), y_pred_test.cpu().argmax(axis=-1)
    except:
      y_true_test_, y_pred_test_ = y_true_test, y_pred_test.argmax(axis=-1)

    # 保存最优模型，以F1值为标准；
    f1 = f1_score(y_true_test_, y_pred_test_, average="macro")
    if f1 > f1_best:
        f1_best = f1
        torch.save(model.state_dict(), "../pt/textiles/"+model.name+".pt")
        save_pickle("../pt/textiles/"+model.name+".pickle", y_pred_test)
        save_pickle("../pt/textiles/"+model.name+"-yTrueTest.pickle", y_true_test_)

    else:
        continue
    
    print(classification_report(y_true_test_, y_pred_test_, digits=4))