In [25]:
import os, numpy as np, torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets, transforms
import pennylane as qml

In [26]:
# --------- 설정값 ----------
TRAIN_COUNT = 600         # 처음엔 작게! (느리면 더 줄이세요)
TEST_COUNT  = 200
CHANNELS    = 4           # 처음엔 4, OK면 8로
EPOCHS      = 3
BATCH_TRAIN = 128
BATCH_TEST  = 256
MAX_QNODE_BATCH = 64      # 내부 루프 분할 크기 (메모리/속도 트레이드오프)
USE_LIGHTNING = True      # 설치했다면 True (빠름)
CACHE_DIR   = "./quanv_cache"
os.makedirs(CACHE_DIR, exist_ok=True)

In [27]:
def make_device(use_lightning=True, wires=4):
    if use_lightning:
        try:
            # 설치/호환 OK면 lightning 사용
            dev = qml.device("lightning.qubit", wires=wires)
            print("[Backend] lightning.qubit")
            return dev
        except Exception as e:
            print(f"[Backend] lightning.qubit 사용 불가 → default.qubit로 대체 ({e})")
    # 기본 백엔드
    dev = qml.device("default.qubit", wires=wires)
    print("[Backend] default.qubit")
    return dev

DEV = make_device(use_lightning=USE_LIGHTNING, wires=4)

[Backend] lightning.qubit


In [28]:
def load_mnist_10x10(train_count=TRAIN_COUNT, test_count=TEST_COUNT):
    tfm = transforms.Compose([transforms.ToTensor(), transforms.Resize((10,10))])
    train_ds = datasets.MNIST(root="./data", train=True,  download=True, transform=tfm)
    test_ds  = datasets.MNIST(root="./data", train=False, download=True, transform=tfm)
    x_train = torch.stack([train_ds[i][0] for i in range(train_count)])  # (N,1,10,10)
    y_train = torch.tensor([train_ds[i][1] for i in range(train_count)])
    x_test  = torch.stack([test_ds[i][0]  for i in range(test_count)])
    y_test  = torch.tensor([test_ds[i][1] for i in range(test_count)])
    return (x_train, y_train), (x_test, y_test)

In [29]:
def random_weights(seed=0):
    rng = np.random.default_rng(seed)
    # 각 큐빗에 (RZ, RX) 두 개의 각도
    return np.array(rng.uniform(0, 2*np.pi, size=(4,2)), dtype=np.float32)

In [30]:
@qml.qnode(DEV, interface="torch")
def patch_qnode(pix4, weights):
    # 데이터 인코딩
    for w in range(4):
        qml.RX(np.pi * pix4[w], wires=w)
    # 얽힘
    qml.CZ(wires=[0,1]); qml.CZ(wires=[2,3]); qml.CZ(wires=[0,2]); qml.CZ(wires=[1,3])
    # 랜덤 단일 큐빗 회전
    for w in range(4):
        qml.RZ(weights[w,0], wires=w)
        qml.RX(weights[w,1], wires=w)
    # ⬇️ 중요한 포인트: 명시적으로 4개 스칼라를 "튜플"로 반환
    return (
        qml.expval(qml.PauliZ(0)),
        qml.expval(qml.PauliZ(1)),
        qml.expval(qml.PauliZ(2)),
        qml.expval(qml.PauliZ(3)),
    )

WEIGHTS = [torch.tensor(random_weights(1000+c), dtype=torch.float32) for c in range(CHANNELS)]


In [31]:
# --------- 패치 추출 & Quanvolution ----------
def extract_patches_2x2_stride2(x):  # x: (N,1,10,10)
    N = x.shape[0]
    patches = torch.zeros((N,5,5,4), dtype=torch.float32)
    for i in range(5):
        for j in range(5):
            patch = x[:, :, 2*i:2*i+2, 2*j:2*j+2].reshape(N, 4)
            patches[:, i, j, :] = patch
    return patches

In [32]:
@torch.no_grad()
def quanv_transform(x, channels=CHANNELS, max_batches=MAX_QNODE_BATCH, cache_key=None):
    # 캐시 사용
    if cache_key is not None:
        fpath = os.path.join(CACHE_DIR, cache_key)
        if os.path.isfile(fpath):
            npz = np.load(fpath)
            return torch.tensor(npz["feat"]), torch.tensor(npz["label"]) if "label" in npz else None

    patches = extract_patches_2x2_stride2(x)  # (N,5,5,4)
    N = patches.shape[0]
    feats = torch.zeros((N,5,5,channels), dtype=torch.float32)

    for i in range(5):
        for j in range(5):
            p = patches[:, i, j, :]  # (N,4)
            # 채널별
            for ch in range(channels):
                out_chunks = []
                # 배치 분할
                for s in range(0, N, max_batches):
                    batch = p[s:s+max_batches]
                    vals = []
                    for b in batch:
                        expvals = patch_qnode(b, WEIGHTS[ch])  # expvals: tuple of 4 scalars
                        # ⬇️ 튜플/리스트 → 텐서로
                        if isinstance(expvals, (list, tuple)):
                            expvals = torch.stack(expvals)     # shape: (4,)
                        else:
                            expvals = torch.as_tensor(expvals) # 혹시 모를 타입 섞임 방지

                        vals.append(expvals.mean().unsqueeze(0))  # (1,)
                    out_chunks.append(torch.cat(vals, dim=0))
                feats[:, i, j, ch] = torch.cat(out_chunks, dim=0)

    if cache_key is not None:
        np.savez_compressed(os.path.join(CACHE_DIR, cache_key), feat=feats.numpy())
    return feats, None

In [33]:
# --------- 분류기 ----------
class Classifier(nn.Module):
    def __init__(self, channels=CHANNELS):
        super().__init__()
        self.fc1 = nn.Linear(5*5*channels, 64)
        self.fc2 = nn.Linear(64, 10)
    def forward(self, x):  # x: (N,5,5,C)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

def evaluate(model, loader):
    model.eval(); correct = total = 0
    with torch.no_grad():
        for xb, yb in loader:
            logits = model(xb)
            pred = logits.argmax(dim=1)
            correct += (pred == yb).sum().item()
            total += yb.numel()
    return correct/total

In [34]:
def main():
    # 1) 데이터 로드
    (x_train, y_train), (x_test, y_test) = load_mnist_10x10()
    print("data:", x_train.shape, x_test.shape)

    # 2) Quanvolution 특징 생성 (캐시 사용)
    train_key = f"train_feat_C{CHANNELS}_N{TRAIN_COUNT}.npz"
    test_key  = f"test_feat_C{CHANNELS}_N{TEST_COUNT}.npz"
    x_train_feat, _ = quanv_transform(x_train, channels=CHANNELS, cache_key=train_key)
    x_test_feat,  _ = quanv_transform(x_test,  channels=CHANNELS, cache_key=test_key)
    print("features:", x_train_feat.shape, x_test_feat.shape)  # (N,5,5,C)

    # 3) 학습/평가
    clf = Classifier(CHANNELS)
    opt = torch.optim.Adam(clf.parameters(), lr=1e-3)
    loss_fn = nn.CrossEntropyLoss()

    train_loader = DataLoader(TensorDataset(x_train_feat, y_train), batch_size=BATCH_TRAIN, shuffle=True)
    test_loader  = DataLoader(TensorDataset(x_test_feat,  y_test),  batch_size=BATCH_TEST)

    for epoch in range(EPOCHS):
        clf.train()
        for xb, yb in train_loader:
            opt.zero_grad()
            logits = clf(xb)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()
        acc = evaluate(clf, test_loader)
        print(f"[epoch {epoch+1}] test_acc = {acc:.4f}")

In [35]:
if __name__ == "__main__":
    main()

data: torch.Size([600, 1, 10, 10]) torch.Size([200, 1, 10, 10])
features: torch.Size([600, 5, 5, 4]) torch.Size([200, 5, 5, 4])
[epoch 1] test_acc = 0.1850
[epoch 2] test_acc = 0.1850
[epoch 3] test_acc = 0.1900
