In [None]:
pip install torchvision

In [None]:
pip install scikit-learn

In [None]:
pip install torch

In [7]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

from torchvision import datasets, transforms

# Qiskit imports (Estimator は Aer 版があれば優先、無ければ Terra 版を使用)
from qiskit import QuantumCircuit
from qiskit.circuit import Parameter
from qiskit.quantum_info import SparsePauliOp
try:
    from qiskit_aer.primitives import Estimator as QiskitEstimator
except Exception:
    from qiskit.primitives import Estimator as QiskitEstimator


In [8]:
# Simple Baseline Linear Model
class LinearModelBaseline(nn.Module):
    def __init__(self, image_size, num_classes=10):
        super(LinearModelBaseline, self).__init__()
        self.image_size = image_size

        # Classical part
        self.classifier = nn.Linear(image_size, num_classes)

    def forward(self, x):
        # Data is already flattened
        output = self.classifier(x)
        return output

In [9]:
# LinearModel with PCA
class LinearModelPCA(nn.Module):
    def __init__(self, image_size, pca_components, num_classes=10):
        super(LinearModelPCA, self).__init__()
        self.image_size = image_size
        self.pca_components = pca_components

        # Classical part
        self.classifier = nn.Linear(image_size + pca_components, num_classes)

    def forward(self, x, x_pca):
        # Data is already flattened, just concatenate
        combined_features = torch.cat((x, x_pca), dim=1)
        output = self.classifier(combined_features)
        return output

In [10]:
# --- Qiskit-based Reservoir layer (non-trainable) --------------------------
class QiskitReservoirLayer(nn.Module):
    """
    固定パラメータの量子回路（リザーバ）で PCA 特徴をエンコードし、
    観測演算子（既定は Z と ZZ）に対する期待値ベクトルを返す。
    """
    def __init__(self, input_size, n_qubits=8, n_layers=2, observables="Z+ZZ", seed=42, shots=None):
        super().__init__()
        self.input_size = input_size
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.observables = observables

        # 回路テンプレート（angle encoding + 軽いエンタングラ）
        self.x_params = [Parameter(f"x{i}") for i in range(input_size)]
        rng = np.random.default_rng(seed)
        thetas = rng.uniform(0, 2*np.pi, size=(n_layers, n_qubits))

        qc = QuantumCircuit(n_qubits)
        # angle encoding: x_i を順番に RY へ（round-robin で割当）
        for i in range(input_size):
            qc.ry(self.x_params[i], i % n_qubits)
        for l in range(n_layers):
            for q in range(n_qubits):
                qc.rz(float(thetas[l, q]), q)
            for q in range(n_qubits - 1):
                qc.cx(q, q+1)
        self.template = qc

        # 観測集合を構成（Z_i と Z_i Z_j）
        pauli_list = []
        if "Z" in observables:
            for i in range(n_qubits):
                s = ["I"] * n_qubits
                s[i] = "Z"
                pauli_list.append(("".join(reversed(s)), 1.0))
        if "ZZ" in observables:
            for i in range(n_qubits):
                for j in range(i+1, n_qubits):
                    s = ["I"] * n_qubits
                    s[i] = "Z"; s[j] = "Z"
                    pauli_list.append(("".join(reversed(s)), 1.0))
        self.ops = [SparsePauliOp.from_list([p]) for p in pauli_list]
        self.output_size = len(self.ops)

        # Estimator（Aer があればサンプリング、無ければ解析モード）
        self.estimator = QiskitEstimator() if shots is None else QiskitEstimator(shots=shots)

    @torch.no_grad()
    def forward(self, x_pca: torch.FloatTensor) -> torch.FloatTensor:
        """
        x_pca: [B, input_size]
        return: [B, output_size] （各観測の期待値）
        """
        device = x_pca.device
        x_np = x_pca.detach().cpu().numpy()
        features = []
        for vec in x_np:
            bound = self.template.bind_parameters({p: float(v) for p, v in zip(self.x_params, vec)})
            circuits = [bound] * len(self.ops)
            res = self.estimator.run(circuits, self.ops).result()
            features.append(res.values)
        return torch.tensor(np.array(features), dtype=torch.float32, device=device)

# definition of QuantumReservoir class - Qiskit reservoir + linear classifier
class QuantumReservoir(nn.Module):
    def __init__(self, image_size, pca_components, n_qubits=8, n_layers=2, observables="Z+ZZ", num_classes=10):
        super(QuantumReservoir, self).__init__()
        self.image_size = image_size
        self.pca_components = pca_components
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.observables = observables

        # Quantum part (non-trainable reservoir)
        self.quantum_layer = QiskitReservoirLayer(
            input_size=pca_components,
            n_qubits=n_qubits,
            n_layers=n_layers,
            observables=observables
        )

        # Classical part
        self.classifier = nn.Linear(
            image_size + self.quantum_layer.output_size,
            num_classes
        )

        print(f"\nQiskit Reservoir Created:")
        print(f"  Input size (PCA components): {pca_components}")
        print(f"  Quantum output size: {self.quantum_layer.output_size}  # observables={observables}")
        print(f"  Total features to classifier: {image_size + self.quantum_layer.output_size}")

    def forward(self, x, x_pca):
        q_out = self.quantum_layer(x_pca)
        combined = torch.cat((x, q_out), dim=1)
        return self.classifier(combined)

In [11]:
# MNIST を torchvision から取得（Merlin 依存を排除）
transform = transforms.ToTensor()
train_ds = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
test_ds  = datasets.MNIST(root="./data", train=False, download=True, transform=transform)

# 速度・再現性のため元コードに合わせてサブセット（6000/600）を使用
N_TRAIN, N_TEST = 6000, 600
train_images = train_ds.data[:N_TRAIN].view(N_TRAIN, -1).float() / 255.0
test_images  = test_ds.data[:N_TEST].view(N_TEST, -1).float() / 255.0
train_labels = train_ds.targets[:N_TRAIN]
test_labels  = test_ds.targets[:N_TEST]

X_train = train_images.clone().detach()
X_test  = test_images.clone().detach()
y_train = train_labels.clone().detach().long()
y_test  = test_labels.clone().detach().long()

print(f"Dataset loaded (torchvision): {len(X_train)} training samples, {len(X_test)} test samples")


100.0%
100.0%
100.0%
100.0%

Dataset loaded (torchvision): 6000 training samples, 600 test samples





In [12]:
n_components = 8
# Qiskit reservoir parameters
n_qubits = 8
n_layers = 2
observables = "Z+ZZ"  # 必要に応じて観測集合を拡張可（メモ参照）

In [13]:
# train PCA
from sklearn.decomposition import PCA
pca = PCA(n_components=n_components)

# Note: Data is already flattened
X_train_flat = X_train
X_test_flat = X_test

pca.fit(X_train_flat)
X_train_pca = torch.FloatTensor(pca.transform(X_train_flat))
X_test_pca = torch.FloatTensor(pca.transform(X_test_flat))
print(X_train_pca)

tensor([[ 0.5324,  1.3737, -0.1363,  ..., -1.1627,  0.7187,  0.1862],
        [ 4.2491,  1.2599, -1.9936,  ..., -1.1285, -0.2313, -0.5979],
        [-0.2846, -1.7457,  1.1546,  ...,  0.3840, -3.6403,  0.9226],
        ...,
        [ 0.0393,  2.6523, -2.9366,  ...,  0.3909,  0.4807, -1.8218],
        [ 1.1493,  0.2049, -1.3050,  ..., -3.4043,  0.2989,  1.2935],
        [ 0.8179, -3.1770, -1.8832,  ...,  0.4889,  1.4898, -0.6843]])


In [14]:
# define corresponding linear model for comparison
linear_model = LinearModelBaseline(X_train_flat.shape[1])

In [15]:
# define model using pca featues
pca_model = LinearModelPCA(X_train_flat.shape[1], n_components)

In [16]:
# define hybrid model (Qiskit reservoir)
hybrid_model = QuantumReservoir(
    image_size=X_train_flat.shape[1],
    pca_components=n_components,
    n_qubits=n_qubits,
    n_layers=n_layers,
    observables=observables
)
# フォトニック回路可視化は削除（Qiskit 版は Estimator のみ使用）


Qiskit Reservoir Created:
  Input size (PCA components): 8
  Quantum output size: 36  # observables=Z+ZZ
  Total features to classifier: 820


  self.quantum_layer = QiskitReservoirLayer(
  self.quantum_layer = QiskitReservoirLayer(


In [17]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer_linear = torch.optim.Adam(linear_model.parameters(), lr=0.001)
optimizer_pca = torch.optim.Adam(pca_model.parameters(), lr=0.001)
optimizer_hybrid = torch.optim.Adam(hybrid_model.parameters(), lr=0.001)

# Create DataLoader for batching
batch_size = 128
train_dataset = torch.utils.data.TensorDataset(X_train, X_train_pca, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Training loop
num_epochs = 25

history = {
      'hybrid': {'loss': [], 'accuracy': []},
      'pca': {'loss': [], 'accuracy': []},
      'linear': {'loss': [], 'accuracy': []},
      'epochs': []
}

for epoch in range(num_epochs):
    running_loss_hybrid = 0.0
    running_loss_linear = 0.0
    running_loss_pca = 0.0

    hybrid_model.train()
    linear_model.train()
    pca_model.train()

    for i, (images, pca_features, labels) in enumerate(train_loader):
        # Hybrid model - Forward and Backward pass
        outputs = hybrid_model(images, pca_features)
        loss = criterion(outputs, labels)
        optimizer_hybrid.zero_grad()
        loss.backward()
        optimizer_hybrid.step()
        running_loss_hybrid += loss.item()

        # Comparative linear model - Forward and Backward pass
        outputs = linear_model(images)
        loss = criterion(outputs, labels)
        optimizer_linear.zero_grad()
        loss.backward()
        optimizer_linear.step()
        running_loss_linear += loss.item()

        # Comparative pca model - Forward and Backward pass
        outputs = pca_model(images, pca_features)
        loss = criterion(outputs, labels)
        optimizer_pca.zero_grad()
        loss.backward()
        optimizer_pca.step()
        running_loss_pca += loss.item()

    avg_loss_hybrid = running_loss_hybrid/len(train_loader)
    avg_loss_linear = running_loss_linear/len(train_loader)
    avg_loss_pca = running_loss_pca/len(train_loader)

    history['hybrid']['loss'].append(avg_loss_hybrid)
    history['linear']['loss'].append(avg_loss_linear)
    history['pca']['loss'].append(avg_loss_pca)

    history['epochs'].append(epoch + 1)

    hybrid_model.eval()
    linear_model.eval()
    pca_model.eval()
    with torch.no_grad():
        outputs = hybrid_model(X_test, X_test_pca)
        _, predicted = torch.max(outputs, 1)
        hybrid_accuracy = (predicted == y_test).sum().item() / y_test.size(0)

        outputs = linear_model(X_test)
        _, predicted = torch.max(outputs, 1)
        linear_accuracy = (predicted == y_test).sum().item() / y_test.size(0)

        outputs = pca_model(X_test, X_test_pca)
        _, predicted = torch.max(outputs, 1)
        pca_accuracy = (predicted == y_test).sum().item() / y_test.size(0)

    history['hybrid']['accuracy'].append(hybrid_accuracy)
    history['linear']['accuracy'].append(linear_accuracy)
    history['pca']['accuracy'].append(pca_accuracy)

    print(f'Epoch [{epoch+1}/{num_epochs}], LOSS -- Hybrid: {avg_loss_hybrid:.4f}, Linear: {avg_loss_linear:.4f}, PCA: {avg_loss_pca:.4f}'+
          f', ACCURACY -- Hybrid: {hybrid_accuracy:.4f}, Linear: {linear_accuracy:.4f}, PCA: {pca_accuracy:.4f}')

AttributeError: 'QuantumCircuit' object has no attribute 'bind_parameters'

In [None]:
import matplotlib.pyplot as plt
def plot_training_comparison(history):
    # Create a figure with two subplots side by side
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 6))

    # Define colors and styles for consistency
    model_styles = {
      'hybrid': {'color': 'blue', 'linestyle': '-', 'marker': 'o'},
      'pca': {'color': 'green', 'linestyle': '-', 'marker': 's'},
      'linear': {'color': 'red', 'linestyle': '-', 'marker': '^'}
    }

    # Plot loss curves
    for model_name, style in model_styles.items():
      if model_name in history:
          ax1.plot(
              history['epochs'],
              history[model_name]['loss'],
              color=style['color'],
              linestyle=style['linestyle'],
              marker=style['marker'],
              markevery=max(1, len(history['epochs'])//10),  # Show markers at 10 points
              label=f'{model_name.capitalize()} Model'
          )

    ax1.set_title('Training Loss Comparison', fontsize=14)
    ax1.set_xlabel('Epochs', fontsize=12)
    ax1.set_ylabel('Loss', fontsize=12)
    ax1.legend(fontsize=12)
    ax1.grid(True, alpha=0.3)

    # Plot accuracy curves
    for model_name, style in model_styles.items():
      if model_name in history:
          ax2.plot(
              history['epochs'],
              history[model_name]['accuracy'],
              color=style['color'],
              linestyle=style['linestyle'],
              marker=style['marker'],
              markevery=max(1, len(history['epochs'])//10),  # Show markers at 10 points
              label=f'{model_name.capitalize()} Model'
          )

    ax2.set_title('Training Accuracy Comparison', fontsize=14)
    ax2.set_xlabel('Epochs', fontsize=12)
    ax2.set_ylabel('Accuracy', fontsize=12)
    ax2.legend(fontsize=12)
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

# Call the function to generate the plot
plot_training_comparison(history)