<a href="https://colab.research.google.com/github/hwkim3330/blog/blob/main/%EC%A0%9C1%ED%9A%8C_%ED%80%80%ED%85%80AI_%EA%B2%BD%EC%A7%84%EB%8C%80%ED%9A%8C_baseline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================================================================
# STEP 0: 라이브러리 설치 및 임포트
# ==============================================================================
# 필수 라이브러리를 설치합니다.
!pip install pennylane --quiet

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader, Subset, random_split

import torchvision
from torchvision import transforms

import pennylane as qml
from pennylane import numpy as np

from tqdm.auto import tqdm
import os
from datetime import datetime

# ==============================================================================
# STEP 1: 하이퍼파라미터 및 환경 설정
# ==============================================================================
# 이곳의 값들을 조정하며 추가 실험을 할 수 있습니다.
class HParams:
    # 데이터 및 학습 관련
    BATCH_SIZE = 64
    EPOCHS = 15  # 충분한 학습을 위해 에폭 증가
    LEARNING_RATE = 1e-3
    VALIDATION_SPLIT = 0.1 # 학습 데이터 중 10%를 검증에 사용

    # 양자 회로(QNN) 관련
    N_QUBITS = 4 # 양자 회로에 사용할 큐빗 수 (최대 8)
    N_LAYERS = 4 # 양자 회로의 깊이 (StronglyEntanglingLayers 반복 횟수)
    Q_DEVICE = "default.qubit" # PennyLane 시뮬레이터

    # 실행 환경
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    SEED = 42

# 재현성을 위한 시드 고정
torch.manual_seed(HParams.SEED)
np.random.seed(HParams.SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(HParams.SEED)

print(f"Hyperparameters loaded. Using device: {HParams.DEVICE}")
print(f"Quantum circuit config: {HParams.N_QUBITS} qubits, {HParams.N_LAYERS} layers.")


# ==============================================================================
# STEP 2: 데이터셋 준비 (데이터 증강 포함)
# ==============================================================================
# 학습 데이터에는 회전, 뒤집기 등 데이터 증강을 적용하여 모델의 일반화 성능 향상
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# 테스트 데이터는 증강을 적용하지 않음
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# FashionMNIST 데이터셋 다운로드
full_train_ds = torchvision.datasets.FashionMNIST("./", train=True, download=True, transform=train_transform)
test_ds = torchvision.datasets.FashionMNIST("./", train=False, download=True, transform=test_transform)

# T-shirt(0)와 Shirt(6) 라벨만 필터링
train_mask = (full_train_ds.targets == 0) | (full_train_ds.targets == 6)
train_indices = torch.where(train_mask)[0]
full_train_ds.targets[full_train_ds.targets == 6] = 1 # 라벨 6을 1로 변환
binary_train_ds = Subset(full_train_ds, train_indices)

# 학습/검증 데이터 분리
num_train = int((1 - HParams.VALIDATION_SPLIT) * len(binary_train_ds))
num_val = len(binary_train_ds) - num_train
train_ds, val_ds = random_split(binary_train_ds, [num_train, num_val])

# 데이터로더 생성
train_loader = DataLoader(train_ds, batch_size=HParams.BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=HParams.BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=HParams.BATCH_SIZE, shuffle=False) # 테스트 시에도 배치 처리로 속도 향상

print(f"Data loaded. Train: {len(train_ds)}, Validation: {len(val_ds)}, Test: {len(test_ds)}")

# ==============================================================================
# STEP 3: 하이브리드 모델 설계 (CNN + QNN)
# ==============================================================================
torch.set_default_dtype(torch.float64)

class HybridClassifier(nn.Module):
    def __init__(self, n_qubits, n_layers):
        super().__init__()
        self.n_qubits = n_qubits

        # 1. 클래식 파트 (CNN) - 특징 추출기
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(8, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        # 클래식 파트의 최종 출력을 양자 회로의 입력 수에 맞게 조정
        self.fc_classic = nn.Sequential(
            nn.Linear(16 * 7 * 7, 128),
            nn.ReLU(),
            nn.Linear(128, self.n_qubits) # 최종 출력을 n_qubits개로 맞춤
        )

        # 2. 양자 파트 (QNN)
        self.q_device = qml.device(HParams.Q_DEVICE, wires=self.n_qubits)
        # 학습 가능한 양자 회로의 가중치
        q_weight_shape = (n_layers, self.n_qubits, 3)
        self.q_weights = nn.Parameter(torch.rand(q_weight_shape) * 2 * torch.pi)

        # qml.qnode 데코레이터를 사용하여 파이토치와 호환되는 양자 회로 정의
        @qml.qnode(self.q_device, interface="torch", diff_method="backprop")
        def quantum_circuit(features, weights):
            # 데이터 인코딩: 클래식 데이터를 양자 상태로 변환
            qml.AngleEmbedding(features, wires=range(self.n_qubits))
            # 학습 가능한 변분 회로(Variational Circuit)
            qml.StronglyEntanglingLayers(weights, wires=range(self.n_qubits))
            # 측정: 0번 큐빗의 Pauli-Z 연산자 기댓값을 반환
            return qml.expval(qml.PauliZ(0))

        self.qnn = quantum_circuit

        # 3. 최종 분류기
        # 양자 회로의 출력(-1 ~ 1)을 클래스 로짓(logits)으로 변환
        self.fc_classifier = nn.Linear(1, 2)

    def forward(self, x):
        # x.shape: [batch, 1, 28, 28]
        x = self.cnn(x)
        x = x.view(x.size(0), -1) # Flatten
        x = self.fc_classic(x) # shape: [batch, n_qubits]

        # 양자 회로를 배치 단위로 실행
        q_out_list = []
        for features in x:
            q_out = self.qnn(features, self.q_weights)
            q_out_list.append(q_out)

        q_out_batch = torch.stack(q_out_list).view(-1, 1) # shape: [batch, 1]

        # 최종 분류
        logits = self.fc_classifier(q_out_batch) # shape: [batch, 2]
        return F.log_softmax(logits, dim=1)

# ==============================================================================
# STEP 4: 대회 규격 검증
# ==============================================================================
model = HybridClassifier(HParams.N_QUBITS, HParams.N_LAYERS).to(HParams.DEVICE)

# 더미 입력을 만들어 회로 제약 검증
dummy_features = torch.randn(HParams.N_QUBITS, dtype=torch.float64)
dummy_weights = torch.randn((HParams.N_LAYERS, HParams.N_QUBITS, 3), dtype=torch.float64)
specs = qml.specs(model.qnn)(dummy_features, dummy_weights)

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("─" * 30)
print("대회 규격 자동 검증")
print("─" * 30)
try:
    assert specs["num_tape_wires"] <= 8, f"❌ 큐빗 수 초과 ({specs['num_tape_wires']} > 8)"
    print(f"✅ 큐빗 수: {specs['num_tape_wires']} (제한: 8)")

    assert specs['resources'].depth <= 30, f"❌ 회로 깊이 초과 ({specs['resources'].depth} > 30)"
    print(f"✅ 회로 깊이: {specs['resources'].depth} (제한: 30)")

    num_quantum_params = model.q_weights.numel()
    assert num_quantum_params <= 60, f"❌ 학습 퀀텀 파라미터 수 초과 ({num_quantum_params} > 60)"
    print(f"✅ 퀀텀 파라미터 수: {num_quantum_params} (제한: 60)")

    assert total_params <= 50000, f"❌ 학습 전체 파라미터 수 초과 ({total_params} > 50000)"
    print(f"✅ 전체 파라미터 수: {total_params} (제한: 50000)")

    print("\n🎉 모든 규격 통과! 학습을 시작합니다.")
except AssertionError as e:
    print(f"\n🚨 규격 위반! {e}")
print("─" * 30)


# ==============================================================================
# STEP 5: 모델 학습 및 검증
# ==============================================================================
optimizer = AdamW(model.parameters(), lr=HParams.LEARNING_RATE)
scheduler = CosineAnnealingLR(optimizer, T_max=len(train_loader) * HParams.EPOCHS)
loss_func = nn.NLLLoss()

best_val_acc = 0.0
BEST_MODEL_PATH = "best_model.pth"

for epoch in range(HParams.EPOCHS):
    # --- 학습 ---
    model.train()
    total_train_loss = 0
    train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{HParams.EPOCHS} [Train]", leave=False)
    for data, target in train_bar:
        data, target = data.to(HParams.DEVICE), target.to(HParams.DEVICE)

        optimizer.zero_grad()
        output = model(data)
        loss = loss_func(output, target)
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_train_loss += loss.item()
        train_bar.set_postfix(loss=loss.item())

    avg_train_loss = total_train_loss / len(train_loader)

    # --- 검증 ---
    model.eval()
    total_val_loss = 0
    correct = 0
    total = 0
    val_bar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{HParams.EPOCHS} [Val]", leave=False)
    with torch.no_grad():
        for data, target in val_bar:
            data, target = data.to(HParams.DEVICE), target.to(HParams.DEVICE)
            output = model(data)
            loss = loss_func(output, target)
            total_val_loss += loss.item()

            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += data.size(0)

    avg_val_loss = total_val_loss / len(val_loader)
    val_acc = 100. * correct / total

    print(f"Epoch {epoch+1:02d}/{HParams.EPOCHS} | "
          f"Train Loss: {avg_train_loss:.4f} | "
          f"Val Loss: {avg_val_loss:.4f} | "
          f"Val Acc: {val_acc:.2f}%")

    # 최고 성능 모델 저장
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        print(f"✨ New best model saved with accuracy: {best_val_acc:.2f}%")


# ==============================================================================
# STEP 6: 최종 추론 및 제출 파일 생성
# ==============================================================================
print("\nLoading best model for final inference...")
model.load_state_dict(torch.load(BEST_MODEL_PATH))
model.eval()

all_preds = []
all_targets = []

with torch.no_grad():
    for data, target in tqdm(test_loader, desc="Final Inference"):
        data = data.to(HParams.DEVICE)
        logits = model(data)
        pred = logits.argmax(dim=1)
        all_preds.append(pred.cpu())
        all_targets.append(target) # target은 이미 CPU에 있음

y_pred = torch.cat(all_preds).numpy().astype(int)
y_true = torch.cat(all_targets).numpy().astype(int)

# --- 평가 및 결과 저장 ---
# 0(T-shirt/top)과 6(Shirt) 라벨만 평가
test_mask = (y_true == 0) | (y_true == 6)
target_samples_count = test_mask.sum()

# 모델의 예측 결과(0, 1)를 원래 라벨(0, 6)으로 되돌림
y_pred_mapped = np.where(y_pred == 1, 6, y_pred)

# 0과 6 라벨에 대한 정확도 계산
final_acc = (y_pred_mapped[test_mask] == y_true[test_mask]).mean()
print(f"\nFinal Accuracy on Target Labels (0/6): {final_acc:.4f}")

# --- 제출 파일 생성 ---
now = datetime.now().strftime("%Y%m%d_%H%M%S")
y_pred_filename = f"y_pred_{now}.csv"
np.savetxt(y_pred_filename, y_pred_mapped, fmt="%d", delimiter=',')
print(f"Submission file '{y_pred_filename}' created.")


# ==============================================================================
# STEP 7: 결과 파일 다운로드
# ==============================================================================
try:
    from google.colab import files
    files.download(y_pred_filename)
    print("File download initiated.")
except ImportError:
    print("\nNot in Google Colab. Please download the file manually:")
    print(f"-> {os.path.join(os.getcwd(), y_pred_filename)}")