In [None]:
import time
import os
import copy
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

import pennylane as qml
from pennylane import numpy as np

torch.manual_seed(42)
np.random.seed(42)


import matplotlib.pyplot as plt


os.environ["OMP_NUM_THREADS"] = "1"

In [None]:
n_qubits = 5                
step = 0.001              
batch_size = 32             
num_epochs = 50          
q_depth = 10                
gamma_lr_scheduler = 0.1    
q_delta = 0.01             

In [None]:
dev = qml.device("default.qubit", wires=n_qubits)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
print("Using device:", device)


In [None]:
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader


transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  
])


data_dir = './cifar100_data'


trainset = datasets.CIFAR100(root=data_dir, train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=32, shuffle=True)


testset = datasets.CIFAR100(root=data_dir, train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=32, shuffle=False)

dataloaders = {
    'train': trainloader,
    'test': testloader
}


class_names = trainset.classes

In [None]:
from torch.utils.data import DataLoader, Subset
import numpy as np

def get_class_indices(dataset, classes):
    # 获取特定类别的索引
    indices = []
    for i in range(len(dataset)):
        # 检查数据集中样本的类别
        if dataset.targets[i] in classes:
            indices.append(i)
    return indices

# 您感兴趣的类别（第 3 和第 88 类）
classes_of_interest = [3, 88]  # 类别索引从 0 开始，因此减去 1

# 为训练集和测试集创建子集
train_indices = get_class_indices(trainset, classes_of_interest)
test_indices = get_class_indices(testset, classes_of_interest)

train_subset = Subset(trainset, train_indices)
test_subset = Subset(testset, test_indices)

# 创建数据加载器
trainloader = DataLoader(train_subset, batch_size=32, shuffle=True)
testloader = DataLoader(test_subset, batch_size=32, shuffle=False)

dataloaders = {
    'train': trainloader,
    'test': testloader
}

In [None]:
def H_layer(nqubits):
   
    for idx in range(nqubits):
        qml.Hadamard(wires=idx)


def RY_layer(w):
   
    for idx, element in enumerate(w):
        qml.RY(element, wires=idx)


def entangling_layer(nqubits):
   
    # In other words it should apply something like :
    # CNOT  CNOT  CNOT  CNOT...  CNOT
    #   CNOT  CNOT  CNOT...  CNOT
    for i in range(0, nqubits - 1, 2):  # Loop over even indices: i=0,2,...N-2
        qml.CNOT(wires=[i, i + 1])
    for i in range(1, nqubits - 1, 2):  # Loop over odd indices:  i=1,3,...N-3
        qml.CNOT(wires=[i, i + 1])

In [None]:
@qml.qnode(dev, interface="torch")
def quantum_net(q_input_features, q_weights_flat):
    """
    The variational quantum circuit.
    """


    q_weights = q_weights_flat.reshape(q_depth, n_qubits)


    H_layer(n_qubits)

    RY_layer(q_input_features)


    for k in range(q_depth):
        entangling_layer(n_qubits)
        RY_layer(q_weights[k])
    

  
    exp_vals = [qml.expval(qml.PauliZ(position)) for position in range(n_qubits)]
    return tuple(exp_vals)

In [None]:
@qml.qnode(dev, interface="torch")
def quantum_circuit():
    H_layer(n_qubits)
    RY_layer(np.random.random(n_qubits))
    entangling_layer(n_qubits)
    return qml.expval(qml.PauliZ(0))


drawn_circuit = qml.draw(quantum_circuit)


print(drawn_circuit())


In [None]:
class DressedQuantumNet(nn.Module):


    def __init__(self):
    

        super().__init__()
        self.pre_net = nn.Linear(100, n_qubits)
        self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))
        self.post_net = nn.Linear(n_qubits, 100)

    def forward(self, input_features):

        
        pre_out = self.pre_net(input_features)
        q_in = torch.tanh(pre_out) * np.pi / 2.0

        
        q_out = torch.Tensor(0, n_qubits).to(device)  
        for elem in q_in:
            q_out_elem_vals = quantum_net(elem.to(device), self.q_params.to(device)) 
            q_out_elem = torch.tensor(q_out_elem_vals).float().unsqueeze(0).to(device)
            q_out = torch.cat((q_out, q_out_elem))

        return self.post_net(q_out)

In [None]:
class SELayer(nn.Module):
    def __init__(self,channel, reduction):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class QCNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(6,9, kernel_size=3)
        self.conv2 = nn.Conv2d(9, 16, kernel_size=3)
        self.btch = nn.BatchNorm2d(16)
        self.conv3 = nn.Conv2d(3, 6, kernel_size=3)
        self.conv4 = nn.Conv2d(16, 16, kernel_size=3)
        #self.conv5 = nn.Conv2d(16, 16, kernel_size=3)
        self.se1 = SELayer(9 ,4)
        self.se2 = SELayer(16 ,4)
        self.dropout = nn.Dropout2d()
        self.fc1 = nn.Linear(1600, 256)
        self.fc2 = nn.Linear(256, 100)
        self.quantum_layer = DressedQuantumNet()

    def forward(self, x):
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv1(x))
        #x = self.se1(x)
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        #x = self.btch(x)
        x = F.relu(self.conv4(x))
        #x = F.relu(self.conv5(x))
        #x = self.se2(x)
        #x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = x.view(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        #x = self.dropout(x)
        x = self.fc2(x)
        x = self.quantum_layer(x)
        return x

In [None]:
model = QCNet()
#model = nn.DataParallel(model)
model.to(device)

In [None]:
optimizer = optim.Adam(model.parameters(), lr=step) 
criterion = nn.CrossEntropyLoss()

In [None]:
import time
import torch

def train_model(model, trainloader, criterion, optimizer, num_epochs, device):
    best_acc = 0.0  
    best_loss = float('inf') 

    for epoch in range(num_epochs): 
        start_time = time.time()  

        running_loss = 0.0
        correct = 0
        total = 0
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
        

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            if i % 100 == 99:    
                current_loss = running_loss / 100
                current_accuracy = 100 * correct / total
                print(f'[Epoch: {epoch + 1}, Batch: {i + 1}] Loss: {current_loss:.3f}, Accuracy: {current_accuracy:.2f}%')
                running_loss = 0.0
                correct = 0
                total = 0

        
        epoch_loss = running_loss / len(trainloader)
        epoch_acc = 100 * correct / total

        
        if epoch_acc > best_acc:
            best_acc = epoch_acc
        if epoch_loss < best_loss:
            best_loss = epoch_loss

        end_time = time.time()  
        epoch_duration = end_time - start_time  
        print(f'Epoch {epoch + 1} completed in {epoch_duration:.2f} seconds')

    print(f'Finished Training. Best Accuracy: {best_acc:.2f}%, Best Loss: {best_loss:.3f}')
    return best_acc, best_loss

In [None]:
import time
import torch

def test_model(model, trainloader, criterion, optimizer, num_epochs, device):
    best_acc = 0.0  
    best_loss = float('inf')  

    for epoch in range(num_epochs):  
        start_time = time.time()  

        running_loss = 0.0
        correct = 0
        total = 0
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)


            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()

            running_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            if i % 100 == 99:   
                current_loss = running_loss / 100
                current_accuracy = 100 * correct / total
                print(f'[Epoch: {epoch + 1}, Batch: {i + 1}] Loss: {current_loss:.3f}, Accuracy: {current_accuracy:.2f}%')
                running_loss = 0.0
                correct = 0
                total = 0

        
        epoch_loss = running_loss / len(trainloader)
        epoch_acc = 100 * correct / total

        
        if epoch_acc > best_acc:
            best_acc = epoch_acc
        if epoch_loss < best_loss:
            best_loss = epoch_loss

        end_time = time.time()  
        epoch_duration = end_time - start_time 
        print(f'Epoch {epoch + 1} completed in {epoch_duration:.2f} seconds')

    print(f'Finished Training. Best Accuracy: {best_acc:.2f}%, Best Loss: {best_loss:.3f}')
    return best_acc, best_loss

In [None]:
train_model(model, trainloader, criterion, optimizer, num_epochs, device)

In [None]:
test_model(model, testloader, criterion, optimizer, num_epochs, device)

In [None]:
import matplotlib.pyplot as plt
import torch

def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.5, 0.5, 0.5])
    std = np.array([0.5, 0.5, 0.5])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  

def visualize_model(model, num_images=6, fig_name="Predictions"):
    images_so_far = 0
    plt.figure(fig_name)
    model.eval()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['test']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images // 2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'Predicted: {class_names[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    return

In [None]:
visualize_model(model, num_images=8)
plt.show()

In [None]:
import torch
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt


model.eval() 


all_predictions = []
all_labels = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)

        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())


conf_matrix = confusion_matrix(all_labels, all_predictions)


plt.figure(figsize=(20, 16))
sns.heatmap(conf_matrix, annot=True, fmt='g')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()