In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !cp "/content/drive/MyDrive/dcgan/dataset/archive.zip" /content/

In [None]:
# # /content/portrait_paintings 에 압축 해제
# !unzip -q /content/archive.zip -d /content/portrait_paintings

In [None]:
# # 압축 푼 이미지들을 portraits/ 하위로 이동 (혹은 복사)
# !cp -r /content/portrait_paintings/* "/content/drive/MyDrive/dcgan/dataset/"

## 1. 코랩 환경 + 데이터 전처리/정규화 + 로더 확인

In [None]:
!pip -q install kornia

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m1.0/1.1 MB[0m [31m131.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m93.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import math, os, glob, re, json, time, random
import numpy as np
import torch, torchvision         # 파이토치와 이미지 처리 라이브러리
import torch.nn as nn             # 신경망 계층 정의
import matplotlib.pyplot as plt
import kornia.augmentation as K   # 이미지 증강 라이브러리
from pathlib import Path
from torchvision import transforms, datasets
from torch.utils.data import DataLoader # 배치 단위 데이터 로더

In [None]:
print("PyTorch :", torch.__version__)
print("CUDA    :", torch.cuda.is_available())
print("Torchvision :", torchvision.__version__)
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device  :", device) # 실제 사용할 디바이스 정보 출력

PyTorch : 2.8.0+cu126
CUDA    : True
Torchvision : 0.23.0+cu126
Device  : cuda


## 폴더 생성

- 체크포인트/샘플 이미지/로그 등을 Drive에 저장해서, 세션이 끊겨도 이어서 학습할 수 있게


In [None]:
PROJECT_NAME = "dcgan_portraits128"  # 출력(결과) 폴더 이름

# ---- 출력(체크포인트/샘플/로그) 루트 ----
DRIVE_ROOT_OUT = "/content/drive/MyDrive/dcgan/" # 결과물을 모아둘 상위 폴더
SAVE_DIR       = f"{DRIVE_ROOT_OUT}/{PROJECT_NAME}"       # 프로젝트 전용 출력 폴더

# 하위 출력 폴더 (관리 편의)
CKPT_DIR   = f"{SAVE_DIR}/ckpts"                          # 체크포인트(.pt) 저장
SAMPLE_DIR = f"{SAVE_DIR}/samples"                        # 생성 샘플 이미지 저장
LOG_DIR    = f"{SAVE_DIR}/logs"                           # 로그/메모 등

# ---- 데이터셋(ImageFolder) 경로 ----
DATA_DIR  = "/content/drive/MyDrive/dcgan/datasets"  # ImageFolder의 root
CLASS_DIR = f"{DATA_DIR}/Images"                          # 실제 이미지가 들어있는 '클래스' 폴더

# 폴더 생성 (출력 전용)
os.makedirs(CKPT_DIR, exist_ok=True)                      # ckpts 폴더 생성
os.makedirs(SAMPLE_DIR, exist_ok=True)                    # samples 폴더 생성
os.makedirs(LOG_DIR, exist_ok=True)                       # logs 폴더 생성

# 구조 검증 (안전장치)
DATA_DIR_PATH  = Path(DATA_DIR)
CLASS_DIR_PATH = Path(CLASS_DIR)

assert DATA_DIR_PATH.exists(), f"[경고] DATA_DIR가 존재하지 않습니다: {DATA_DIR}"
assert CLASS_DIR_PATH.exists(), f"[경고] CLASS_DIR가 존재하지 않습니다: {CLASS_DIR}\n" \
                                f"이미지가 '/DCGAN/datasets/images'에 있는지 확인하세요."

has_any_image = any(CLASS_DIR_PATH.glob("*.jpg")) or any(CLASS_DIR_PATH.glob("*.jpeg")) or any(CLASS_DIR_PATH.glob("*.png"))
assert has_any_image, f"[경고] {CLASS_DIR} 내에 이미지(.jpg/.jpeg/.png)가 보이지 않습니다."

# 경로 출력 (최종 확인)
print("SAVE_DIR   :", SAVE_DIR)
print("CKPT_DIR   :", CKPT_DIR)
print("SAMPLE_DIR :", SAMPLE_DIR)
print("LOG_DIR    :", LOG_DIR)
print("DATA_DIR   :", DATA_DIR, "(ImageFolder root)")
print("CLASS_DIR  :", CLASS_DIR,  "(클래스 폴더)")

SAVE_DIR   : /content/drive/MyDrive/dcgan//dcgan_portraits128
CKPT_DIR   : /content/drive/MyDrive/dcgan//dcgan_portraits128/ckpts
SAMPLE_DIR : /content/drive/MyDrive/dcgan//dcgan_portraits128/samples
LOG_DIR    : /content/drive/MyDrive/dcgan//dcgan_portraits128/logs
DATA_DIR   : /content/drive/MyDrive/dcgan/datasets (ImageFolder root)
CLASS_DIR  : /content/drive/MyDrive/dcgan/datasets/Images (클래스 폴더)


## 설정 값(에폭=100 포함) + 시드 고정
- 에폭: 100 → 1차 학습(100ep) 후에 품질 점검/튜닝 시작
- 이후 단계에서 체크포인트 매니저/훈련 루프를 연결할 것

In [None]:
def seed_all(seed=42):
    # 시드(seed)는 무작위(random) 결과를 일정하게 고정하는 값임
    # → 모델 학습 중간에 중단 후 '체크포인트'에서 이어갈 때, 같은 결과를 재현하려면 필요함
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    # cuDNN 최적화 기능: 속도는 빠르지만 결과가 조금씩 달라질 수 있음
    # → 완전 재현성이 꼭 필요하다면 False로 두는 게 안전
    torch.backends.cudnn.benchmark = True

# 시드값을 42로 고정 → 학습 시 항상 같은 조건에서 시작하게 됨
seed_all(42)

## 학습 설정값 모음 (Config) -> 여기를 변경

In [None]:
CFG = dict(
    img_size=128,
    z_dim=128,         # 잠재공간(latent space) 차원. 노이즈 벡터 크기
    ngf=64,
    ndf=64,
    batch_size=64,

    lr=2e-4, # 초반

    lr_g=2e-4,  # 직전: 4e-4 -> 2.5e-4 (950 -> 965)
    lr_d=2.6e-5,   # 직전: 5e-5/ 2e-5→2.6e-5

    # 학습률과 Adam 옵티마이저 파라미터 (DCGAN 논문 권장값)
    beta1=0.5, beta2=0.999,

    epochs=1500,
    device=device
)
CFG

{'img_size': 128,
 'z_dim': 128,
 'ngf': 64,
 'ndf': 64,
 'batch_size': 64,
 'lr': 0.0002,
 'lr_g': 0.0002,
 'lr_d': 2.6e-05,
 'beta1': 0.5,
 'beta2': 0.999,
 'epochs': 1500,
 'device': 'cuda'}

학습률 변화 기록

- Generator 학습률: 2e-4 -> 2.5e-4 -> 3e-4 -> 4e-4
- Discriminator 학습률 : 1.5e-4 -> 1e-4 -> 7e-5 -> 5e-5

950 에포크에서 1000 에포크 학습하는 동안 사용한 학습률
- lr_g=4e-4
- lr_d=5e-5

학습 상태
- 판별자의 로스값이 0.5에서 +-0.2 정도로 진동하는 반면 생성자는 2에서 6사의 값으로 진동함. 3, 4의 빈도가 높았음

문제 상황
- 모드 붕괴(mode collapse)

- 해석 : 판별자가 과적합/포화 직전 균형, 이때 G가 학습 신호를 충분히 못 받는것 같음

- 현재 목표. 950의 이미지 확인. -> 후반부 디테일(눈·코·입) 수렴하는 것을 목적으로 함. (학습 쌓아 생성자의 세부 묘사를 이끌기)


In [None]:
CFG.update({
    "use_inst_noise": True,   # 인스턴스 노이즈 켜기/끄기
    "inst_noise_sigma": 0.10, # 시작 σ (예: 0.05→0.10로 강화)
})

In [None]:
class InstanceNoise(nn.Module):
    """
    D 입력으로 들어가기 직전에 x += N(0, sigma) 형태로 가우시안 노이즈를 더해주는 모듈.
    real/fake 둘 다에 똑같이 적용해야 균형이 깨지지 않습니다.
    """
    def __init__(self, sigma=0.05):
        super().__init__()
        self.sigma = sigma

    def forward(self, x):
        if self.sigma <= 0:
            return x
        return x + torch.randn_like(x) * self.sigma


## 데이터로더

- 1) DataLoader에서 사용하는 "비미분" 전처리/증강
    - 여기서는 학습 전에 이미지를 고정된 방식으로 가공함
    - GAN은 마지막에 Tanh 활성화를 쓰므로 [-1,1] 범위로 정규화하는 게 핵심

In [None]:
def make_loader_transforms(img_size=128):
    resize_to = int(img_size * 1.125)  # 입력 크기보다 약간 크게 리사이즈 후 센터크롭 (128px → 약 144~160px)
    return transforms.Compose([
        transforms.Resize(resize_to),                        # 이미지 크기를 키움
        transforms.CenterCrop(img_size),                     # 중앙 기준으로 원하는 크기만큼 자름
        transforms.RandomHorizontalFlip(p=0.5),              # 50% 확률로 좌우 반전
        transforms.ColorJitter(brightness=0.10, contrast=0.10,
                               saturation=0.05, hue=0.02),   # 색상/밝기/대비를 약하게 랜덤 조정
        transforms.ToTensor(),                               # [H,W,C] 이미지를 [C,H,W] 텐서로 변환 + [0,1]로 정규화
        transforms.Normalize((0.5,)*3, (0.5,)*3),            # [0,1] → [-1,1] 범위로 변환
    ])

## 미분가능 증강
- 판별기 입력 직전에 적용
- D가 너무 강한 상황이니 DiffAug를 “조금 더” 강하게 해서 D를 덜 확신하게 만들면 G가 숨 쉴 공간이 생김

In [None]:
class DiffAug(nn.Module):
    """
    미분가능 증강.
    - 'recovery': 디테일 회복용(색/블러/노이즈 OFF, 아주 약한 기하)
    - 'mild'/'medium'/'strong': 점진적 강화
    """
    def __init__(self, img_size=128, strength="medium"):
        super().__init__()
        self.img_size = img_size
        self.set_strength(strength)

    def set_strength(self, strength="medium"):
        s = str(strength).lower()
        if s not in {"recovery", "mild", "medium", "strong"}:
            s = "medium"

        # 1) 기본값(모든 분기에서 공통으로 존재하도록 미리 선언)
        p_flip      = 0.5
        aff_deg     = 2.0
        aff_tr      = 0.02
        aff_scale   = (0.99, 1.01)
        aff_p       = 0.5

        use_color   = False; b_delta = 0.0; c_delta = 0.0; bc_p = 0.0
        use_blur    = False; blur_p  = 0.0; blur_ks = (3, 3)
        use_noise   = False; noise_p = 0.0; noise_std = 0.01

        # 2) 강도별로 값만 수정
        if s == "recovery":
            p_flip = 0.5
            aff_deg, aff_tr, aff_scale, aff_p = 2.0, 0.02, (0.99, 1.01), 0.5
            # 나머지(use_color/use_blur/use_noise)는 False 유지

        elif s == "mild":
            p_flip = 0.6
            aff_deg, aff_tr, aff_scale, aff_p = 3.0, 0.03, (0.98, 1.03), 0.6
            use_color, b_delta, c_delta, bc_p = True, 0.06, 0.06, 0.3

        elif s == "strong":
            p_flip = 0.8
            aff_deg, aff_tr, aff_scale, aff_p = 6.0, 0.05, (0.95, 1.05), 0.8
            use_color, b_delta, c_delta, bc_p = True, 0.10, 0.10, 0.5
            use_blur, blur_p, blur_ks        = True, 0.30, (3, 3)
            use_noise, noise_p, noise_std    = True, 0.20, 0.01

        else:  # medium
            p_flip = 0.7
            aff_deg, aff_tr, aff_scale, aff_p = 4.0, 0.04, (0.97, 1.04), 0.7
            use_color, b_delta, c_delta, bc_p = True, 0.08, 0.08, 0.4
            use_blur, blur_p, blur_ks        = True, 0.15, (3, 3)
            use_noise, noise_p, noise_std    = True, 0.10, 0.01

        # 3) ops 구성(색/블러/노이즈는 flag에 따라 조건부 추가)
        ops = [
            K.RandomHorizontalFlip(p=p_flip),
            K.RandomAffine(
                degrees=aff_deg,
                translate=(aff_tr, aff_tr),
                scale=aff_scale,
                p=aff_p,
                align_corners=False,
            ),
        ]
        if use_color:
            ops += [
                K.RandomBrightness(b_delta, p=bc_p),
                K.RandomContrast(c_delta, p=bc_p),
            ]
        if use_blur and blur_p > 0:
            ops.append(K.RandomGaussianBlur(kernel_size=blur_ks, sigma=(0.1, 1.0), p=blur_p))
        if use_noise and noise_p > 0:
            ops.append(K.RandomGaussianNoise(mean=0.0, std=noise_std, p=noise_p))

        self.augs = nn.Sequential(*ops)
        self._strength = s

    def forward(self, x):
        return self.augs(x)


In [None]:
def adjust_diffaug(diffaug, d_val, g_val, lf, global_step,
                   last_change_step, cool_down=600):
    """
    d_val: D total loss (BCE+R1 포함이면 가능하면 'pure D loss'로 넣는 게 더 깔끔)
    g_val: G loss
    lf   : D의 fake loss (가짜를 얼마나 쉽게 잡는지)
    """
    # 전환 너무 잦지 않게
    if global_step - last_change_step < cool_down:
        return last_change_step  # 유지

    curr = diffaug.strength
    target = curr

    # --- 우선순위 1: 체커보드/고주파 과잉 의심 구간 ---
    # 가짜를 너무 쉽게 잡음 + G가 흔들리는 패턴일 때는 증강 최소화
    if lf < 0.05 and g_val > 1.6:
        target = "recovery"

    # --- 균형 범위: mild 유지 ---
    elif 1.0 <= d_val <= 1.6 and 0.8 <= g_val <= 2.2:
        target = "mild"

    # --- D가 너무 약하고(G가 이김) 오버피팅/저주파 수렴 위험: medium까지 ---
    elif d_val < 0.9 and g_val > 2.5:
        target = "medium"

    # 그 외엔 안전하게 recovery로 수렴
    else:
        target = "recovery"

    # strong은 금지(체커보드 시기)
    if target == "strong":
        target = "medium"

    if target != curr:
        diffaug.set_strength(target)
        return global_step  # 변경된 시점 기록
    return last_change_step


### 숨기기


In [None]:
# class DiffAug(nn.Module):
#     """
#     미분가능 증강. 'strength'로 강도를 제어하고, 필요 시 에폭에 따라 동적으로 조절할 수 있게 설계.
#     회화 데이터 특성상 색/기하 변환은 '과하지 않게'가 원칙.
#     """
#     def __init__(self, img_size=128, strength="medium"):
#         super().__init__()
#         self.img_size = img_size
#         self.set_strength(strength)  # 내부적으로 self.augs를 구성

#     def set_strength(self, strength="medium"):
#         strength = strength.lower()
#         if strength not in {"mild", "medium", "strong", "recovery"}:
#             strength = "medium"

#         if strength == "recovery":
#             # 선명도 회복용: 색/블러/노이즈 OFF, 아주 약한 기하 변환만
#             p_flip = 0.5
#             aff_deg, aff_tr, aff_scale, aff_p = 2.0, 0.02, (0.99, 1.01), 0.5
#             use_color = False
#             use_blur  = False
#             use_noise = False

#         if strength == "mild":
#             p_flip = 0.6
#             aff_deg, aff_tr, aff_scale, aff_p = 3.0, 0.03, (0.98, 1.03), 0.6
#             b_delta, c_delta, bc_p = 0.06, 0.06, 0.3
#             blur_p, blur_ks = 0.0, (3, 3)       # 기본은 블러 없음
#             noise_p, noise_std = 0.0, 0.01      # 기본은 노이즈 없음

#         elif strength == "strong":
#             p_flip = 0.8
#             aff_deg, aff_tr, aff_scale, aff_p = 6.0, 0.05, (0.95, 1.05), 0.8
#             b_delta, c_delta, bc_p = 0.10, 0.10, 0.5
#             blur_p, blur_ks = 0.3, (3, 3)       # 약한 가우시안 블러 추가 (회화 질감 보존 위해 약하게)
#             noise_p, noise_std = 0.2, 0.01      # 약한 가우시안 노이즈 (너무 세게 X)

#         else:  # medium
#             p_flip = 0.7
#             aff_deg, aff_tr, aff_scale, aff_p = 4.0, 0.04, (0.97, 1.04), 0.7
#             b_delta, c_delta, bc_p = 0.08, 0.08, 0.4
#             blur_p, blur_ks = 0.15, (3, 3)
#             noise_p, noise_std = 0.1, 0.01

#         # Kornia augs (모두 미분가능)
#         ops = [
#             K.RandomHorizontalFlip(p=p_flip),
#             K.RandomAffine(degrees=aff_deg, translate=(aff_tr, aff_tr),
#                            scale=aff_scale, p=aff_p),
#             K.RandomBrightness(b_delta, p=bc_p),
#             K.RandomContrast(c_delta, p=bc_p),
#         ]
#         if strength != "recovery" and use_color:
#             # 색 변환은 회화에 과하면 디테일 손실 → 회복 단계에서는 OFF
#             ops += [K.RandomBrightness(b_delta, p=bc_p),
#                     K.RandomContrast(c_delta, p=bc_p)]
#         if strength != "recovery" and use_blur and blur_p > 0:
#             ops.append(K.RandomGaussianBlur(kernel_size=blur_ks, sigma=(0.1, 1.0), p=blur_p))
#         if strength != "recovery" and use_noise and noise_p > 0:
#             ops.append(K.RandomGaussianNoise(mean=0.0, std=noise_std, p=noise_p))
#         # if blur_p > 0:
#         #     ops.append(K.RandomGaussianBlur(kernel_size=blur_ks, sigma=(0.1, 1.0), p=blur_p))
#         # if noise_p > 0:
#         #     ops.append(K.RandomGaussianNoise(mean=0.0, std=noise_std, p=noise_p))

#         self.augs = nn.Sequential(*ops)
#         self._strength = strength

#     @torch.no_grad()
#     def example_show(self, x):
#         return self.augs(x)

#     def forward(self, x):
#         return self.augs(x)

## 체크포인트 저장 주기 정책

In [None]:
def should_save_ckpt(epoch: int, max_epoch_plan: int = 500) -> bool:
    """
    epoch: 현재 1부터 증가한다고 가정.
    max_epoch_plan: 전체 계획된 마지막 에폭(예: 500)
    """
    if epoch <= 100:
        return (epoch % 20 == 0)  # 20, 40, 60, 80, 100
    else:
        # 101~max_epoch_plan: 5 에폭마다
        return (epoch % 5 == 0) and (epoch <= max_epoch_plan)

# 간단 테스트
# for e in [1,20,40,99,100,101,105,110,500,501]:
#     if should_save_ckpt(e, max_epoch_plan=500):
#         print("save @", e)


### [Step 4-C] Matplotlib로 에폭마다 샘플 저장 헬퍼

In [None]:
def denorm_to_numpy(t):
    # 학습에 쓰던 텐서를 시각화/저장하기 위해 numpy 배열로 변환하는 함수
    t = t.detach().cpu().clamp_(-1, 1)    # 학습 그래프(detach) 끊고 CPU로 옮긴 뒤 값 범위를 [-1,1]로 자름
    t = (t + 1.0) / 2.0                   # [-1,1] → [0,1] 범위로 변환 (이미지 표현용)
    t = t.permute(0, 2, 3, 1).numpy()     # (B,C,H,W) → (B,H,W,C) 로 차원 순서 변경 후 numpy 배열로 변환
    return t

def save_grid_matplotlib(tensor_bchw, save_path, nrow=8, title=None):
    # 여러 이미지를 격자(grid) 형태로 저장하는 함수
    arr = denorm_to_numpy(tensor_bchw)    # 텐서를 numpy 이미지 배열로 변환
    B = arr.shape[0]                      # 이미지 개수(batch 크기)
    ncol = nrow                           # 열 개수 (기본 8)
    nrow_actual = math.ceil(B / ncol)     # 필요한 행 개수 계산

    # matplotlib 서브플롯 생성 (행×열 구조)
    fig, axes = plt.subplots(nrow_actual, ncol, figsize=(ncol*2, nrow_actual*2))
    axes = np.atleast_2d(axes)            # axes를 2차원 배열로 맞춰줌 (행/열 구조 고정)

    idx = 0
    for r in range(nrow_actual):          # 각 행 순회
        for c in range(ncol):             # 각 열 순회
            ax = axes[r, c]
            ax.axis('off')                # 축(좌표 눈금) 숨기기
            if idx < B:                   # 이미지가 남아있으면 출력
                ax.imshow(arr[idx])       # idx번째 이미지 표시
            idx += 1

    if title is not None:                 # 전체 제목 옵션
        plt.suptitle(title)
    plt.tight_layout()                    # 레이아웃 자동 정리 (간격 조정)
    os.makedirs(os.path.dirname(save_path), exist_ok=True)  # 저장 경로 없으면 생성
    plt.savefig(save_path, dpi=150)       # 이미지 저장 (해상도 150dpi)
    plt.close(fig)                        # 메모리 절약 위해 figure 닫기


### [Step 4-D] 성능 vs 재현성 설정 예시

In [None]:
def set_performance_mode():
    # 빠른 훈련 모드 (권장 기본값)
    # - 입력 이미지 크기가 고정이라면 cuDNN이 최적화된 알고리즘을 골라 성능 ↑
    torch.backends.cudnn.benchmark = True     # 여러 알고리즘 중 가장 빠른 것 자동 선택
    torch.backends.cudnn.deterministic = False # 결과가 실행마다 달라질 수 있음 (속도 우선)
    try:
        torch.use_deterministic_algorithms(False) # 가능한 경우 비결정적 알고리즘 허용
    except Exception:
        pass
    print("[Mode] Performance (benchmark=True)")

def set_strict_deterministic_mode():
    # 엄격한 재현성 모드
    # - 속도는 느리지만, 실행할 때마다 똑같은 결과가 나옴 (bit 단위 동일)
    torch.backends.cudnn.benchmark = False    # 빠른 알고리즘 탐색 끔
    torch.backends.cudnn.deterministic = True # 항상 같은 알고리즘만 사용
    try:
        torch.use_deterministic_algorithms(True) # 모든 연산에서 결정적 알고리즘 강제
    except Exception:
        pass
    print("[Mode] Strict Deterministic (benchmark=False, deterministic=True)")

# 기본적으로는 성능 모드 권장 (GAN 학습은 속도/안정성이 중요하기 때문)
set_performance_mode()

# 만약 "실험을 다시 돌려도 완전히 같은 결과"가 꼭 필요하다면 아래로 교체:
# set_strict_deterministic_mode()

[Mode] Performance (benchmark=True)


## 모델 정의

In [None]:
# 성능 모드: 입력크기 고정일 때 빠름 (완전 재현성 필요하면 4단계의 deterministic 모드로 변경) ----
set_performance_mode()  # cuDNN benchmark 활성화하여 속도 우선(입력 크기 고정일 때 효과적)

# ---- 데이터셋/로더 (4단계의 make_loader_transforms 사용) ----
TFM = make_loader_transforms(CFG["img_size"])  # 정규화([-1,1]) 및 약한 증강(Flip/ColorJitter/CenterCrop) 포함된 변환 생성

# 여기서는 DATA_DIR 아래에 'Images' 폴더가 클래스 폴더로 인식되어 하나의 클래스가 됨.
ds = datasets.ImageFolder(root=DATA_DIR, transform=TFM)

# 학습용 DataLoader 구성
loader = DataLoader(
    ds,                                # 위에서 만든 ImageFolder 데이터셋
    batch_size=CFG["batch_size"],      # 배치 크기(메모리에 맞춰 조정)
    shuffle=True,                      # 매 epoch마다 샘플 섞기
    num_workers=4,                     # 병렬 로딩 워커 수(Colab에서는 2~4 권장)
    pin_memory=True,                   # CUDA 전송 최적화( GPU 사용 시 권장 )
    drop_last=True                     # 마지막 배치가 작으면 버려서 배치 크기 일정하게 유지
)

# 로더/데이터셋 정보 출력(클래스 확인 포함)
print(f"[Data] root       = {DATA_DIR}")          # 현재 ImageFolder root 경로
print(f"[Data] class_dir  = {CLASS_DIR}")         # 실제 이미지가 들어있는 폴더(검증됨)
print(f"[Data] class_names= {ds.classes}")        # 예상: ['Images']
print(f"[Data] images     = {len(ds)}")           # 전체 이미지 수
print(f"[Data] batches/ep = {len(loader)}")       # 에폭당 배치 수

[Mode] Performance (benchmark=True)
[Data] root       = /content/drive/MyDrive/dcgan/datasets
[Data] class_dir  = /content/drive/MyDrive/dcgan/datasets/Images
[Data] class_names= ['Images']
[Data] images     = 5734
[Data] batches/ep = 89




In [None]:
# 판별자 정의
class Generator(nn.Module):
    def __init__(self, z_dim=128, ngf=64, nc=3):
        # z_dim : 잠재 벡터 차원
        # ngf   : feature map 크기(scale), generator 채널 폭 결정
        # nc    : 출력 채널 수 (컬러 이미지라면 3)

        super().__init__()
        self.main = nn.Sequential(
            # 4x4
            # 입력: (z_dim) 크기의 잠재벡터 → ConvTranspose2d로 4x4 feature map 생성
            nn.ConvTranspose2d(z_dim, ngf*8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf*8),  # 채널 정규화 → 학습 안정화
            nn.ReLU(True),          # 활성화 함수(ReLU)

            # 4x4 → 8x8
            nn.ConvTranspose2d(ngf*8, ngf*4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf*4), nn.ReLU(True),

            # 16x16
            nn.ConvTranspose2d(ngf*4, ngf*2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf*2), nn.ReLU(True),

            # 32x32
            nn.ConvTranspose2d(ngf*2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf), nn.ReLU(True),

            # 64x64
            nn.ConvTranspose2d(ngf, ngf//2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf//2), nn.ReLU(True),

            # 128x128
            nn.ConvTranspose2d(ngf//2, nc, 4, 2, 1, bias=False),
            nn.Tanh(),  # 출력 범위 [-1,1]
        )
    def forward(self, z):
        # z : (batch, z_dim, 1, 1) 크기의 잠재 벡터 입력
        return self.main(z) # 출력: (batch, nc, 128, 128)

In [None]:
# 생성자 정의
class Discriminator(nn.Module):
    def __init__(self, ndf=64, nc=3):
        """
        ndf : D의 채널 폭 스케일(64/96/128 등)
        nc  : 입력 채널 수(컬러=3)
        """
        super().__init__()

        def block(in_c, out_c, bn=True):
            """
            공용 다운샘플 블록
            Conv(stride=2)로 해상도를 절반으로 줄이고,
            LeakyReLU로 비선형성, (선택) BatchNorm으로 학습 안정화
            """
            layers = [nn.Conv2d(in_c, out_c, 4, 2, 1, bias=False),  # k=4, s=2, p=1 → H,W 절반
                      nn.LeakyReLU(0.2, inplace=True)]
            if bn:
                # DCGAN 권장: 첫 블록 제외하고는 BN 사용
                layers.insert(1, nn.BatchNorm2d(out_c))
            return nn.Sequential(*layers)

        self.main = nn.Sequential(
            # 입력: (B, nc, 128, 128)
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),  # 128x128 → 64x64  (주석 수정 포인트!)
            nn.LeakyReLU(0.2, inplace=True),          # 첫 블록은 BN 없음(논문 권장)

            block(ndf,   ndf*2, bn=True),  # 64x64  → 32x32
            block(ndf*2, ndf*4, bn=True),  # 32x32  → 16x16
            block(ndf*4, ndf*8, bn=True),  # 16x16  →  8x8

            # 8 -> 4  (한 층 추가해서 4x4 만들기)
            nn.Conv2d(ndf*8, ndf*16, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf*16),
            nn.LeakyReLU(0.2, inplace=True),

            # 4 -> 1
            nn.Conv2d(ndf*16, 1, 4, 1, 0, bias=False)
        )

    def forward(self, x):
        # 출력: (B,) 스칼라 로짓
        return self.main(x).view(-1)

In [None]:
# 초기화 함수
def weights_init_dcgan(m):
    """
    DCGAN 권장 초기화:
    - Conv/ConvTranspose: N(0, 0.02)
    - BatchNorm.weight  : N(1, 0.02)
      BatchNorm.bias    : 0
    목적: 초기에 각 층의 출력 분포가 안정되도록 하여
         G/D 모두에서 그래디언트 흐름이 막히지 않게 함.
    """
    classname = m.__class__.__name__

    if 'Conv' in classname or 'ConvTranspose' in classname:
        nn.init.normal_(m.weight.data, 0.0, 0.02)

    elif 'BatchNorm' in classname:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.zeros_(m.bias.data)

In [None]:
# 미분가능 증강
diffaug = DiffAug(img_size=CFG["img_size"]).to(CFG["device"])

## 체크포인트

In [None]:
def latest_ckpt_path(save_dir: str):
    """
    save_dir(=CKPT_DIR)에서 'ckpt_e*_s*.pt' 패턴을 정렬하여 가장 최신 경로 반환.
    latest.txt가 있으면 그 경로를 우선 사용.
    """
    cand = sorted(glob.glob(os.path.join(save_dir, "ckpt_e*_s*.pt")))
    if not cand:
        lat = os.path.join(save_dir, "latest.txt")
        if os.path.exists(lat):
            p = Path(lat).read().strip()
            return p if os.path.exists(p) else None
        return None
    return cand[-1]

def save_ckpt(epoch, step, netG, netD, optG, optD):
    """
    원자적 저장(.tmp → rename)으로 중단/전원off 상황에서도 파일 무결성 보장.
    latest.txt에 방금 저장한 경로를 기록해 이어학습이 자동으로 최신을 로드.
    """
    # ⬇️ CKPT_DIR로 변경
    fname = f"ckpt_e{epoch:03d}_s{step:06d}.pt"
    path  = os.path.join(CKPT_DIR, fname)
    tmp   = path + ".tmp"

    torch.save({
        "epoch": epoch, "step": step,
        "G": netG.state_dict(),
        "D": netD.state_dict(),
        "optG": optG.state_dict(),
        "optD": optD.state_dict(),
        "cfg": CFG,
    }, tmp)
    os.replace(tmp, path)

    # latest 포인터 업데이트 (CKPT_DIR 안에 저장)
    with open(os.path.join(CKPT_DIR, "latest.txt"), "w") as f:
        f.write(path)

    print(f"[CKPT] saved: {path}")
    return path

def load_ckpt_if_any(netG, netD, optG, optD):
    """
    CKPT_DIR에서 최신 체크포인트를 찾아 로드.
    없으면 (0,0) 반환 → 스크래치 학습 시작.
    """
    path = latest_ckpt_path(CKPT_DIR)  # ⬅️ CKPT_DIR 사용
    if path is None:
        print("[CKPT] no checkpoint, start fresh.")
        return 0, 0

    ckpt = torch.load(path, map_location=CFG["device"])
    netG.load_state_dict(ckpt["G"])
    netD.load_state_dict(ckpt["D"])
    optG.load_state_dict(ckpt["optG"])
    optD.load_state_dict(ckpt["optD"])
    start_epoch = int(ckpt.get("epoch", 0))
    start_step  = int(ckpt.get("step", 0))

    print(f"[CKPT] loaded: {path} (epoch {start_epoch}, step {start_step})")
    return start_epoch, start_step


## 학습 루프

In [None]:
def train_dcgan(max_epoch_plan=2000):
    device = CFG["device"]

    # 모델/옵티마/로스 구성
    netG = Generator(CFG["z_dim"], CFG["ngf"]).to(device)  # Generator 생성 후 디바이스로 이동
    netD = Discriminator(CFG["ndf"]).to(device)            # Discriminator 생성 후 디바이스로 이동
    netG.apply(weights_init_dcgan); netD.apply(weights_init_dcgan)  # DCGAN 권장 가중치 초기화 적용

    criterion = nn.BCEWithLogitsLoss()               # 시그모이드 내장형 BCE (안정적)

    # TTUR: G/D 학습률 분리 (CFG에 lr_g, lr_d 존재)
    optG = torch.optim.Adam(                         # G 옵티마이저(Adam)
        netG.parameters(), lr=CFG["lr_g"],
        betas=(CFG["beta1"], CFG["beta2"])
    )
    optD = torch.optim.Adam(                         # D 옵티마이저(Adam)
        netD.parameters(), lr=CFG["lr_d"],
        betas=(CFG["beta1"], CFG["beta2"])
    )


    # DiffAug / Instance Noise
    diffaug = DiffAug(img_size=CFG["img_size"],
                      strength=CFG.get("diffaug_strength", "medium")).to(device)
    inst_noise = InstanceNoise(CFG["inst_noise_sigma"]).to(device) if CFG.get("use_inst_noise", False) else nn.Identity()


    # 이어학습: 최신 체크포인트가 있으면 로드
    start_epoch, global_step = load_ckpt_if_any(netG, netD, optG, optD)  # 없으면 (0,0) 반환 → 스크래치 시작
    z_fixed = torch.randn(64, CFG["z_dim"], 1, 1, device=device)  # 8x8 그리드로 보기 좋게 64장

    real_val = 0.9 # 라벨 스무딩: real=0.9 (fake는 0.0, gen 목표는 1.0)

    print(f"[Train] start from epoch={start_epoch+1}, steps={global_step}")  # 재개 지점 로그


    # 에폭 루프 (예: 처음엔 100, 이후 500까지 재실행)
    for epoch in range(start_epoch + 1, CFG["epochs"] + 1):
        netG.train(); netD.train() # 학습 모드로 전환(BN/Dropout 등)

        # Instance Noise: σ 선형 감쇠(느리게)  ⇒ epochs * 1.5
        if isinstance(inst_noise, InstanceNoise):
            inst_noise.sigma = max(
                0.02, CFG["inst_noise_sigma"] * (1 - epoch / (CFG["epochs"] * 1.5))
            )

        # DiffAug 강도 스테이지 스케줄 (초중반 강하게, 후반 완화)
        # T = CFG["epochs"]
        # p = epoch / T
        # if p < 0.45:
        #     diffaug.set_strength("strong")   # 초반: D를 어렵게 → G가 숨 쉴 공간 확보
        # elif p < 0.80:
        #     diffaug.set_strength("medium")   # 중반: 균형 유지
        # else:
        #     diffaug.set_strength("mild")     # 후반: 디테일 수렴 방해 최소화


        # 미니배치 루프
        for real, _ in loader:
            real = real.to(device)
            B = real.size(0)

            # 스킵분기에서 로깅 변수 초기화
            loss_real = torch.tensor(0.0, device=device)
            loss_fake = torch.tensor(0.0, device=device)
            loss_D    = torch.tensor(0.0, device=device)

            # D 업데이트 빈도 설정
            do_update_D = (global_step % 4 == 0) # 4스텝 중 1번만

            # --------- Discriminator 업데이트 ---------
            if do_update_D:
                optD.zero_grad(set_to_none=True)

                # real 경로
                real_in = inst_noise(diffaug(real))
                d_real  = netD(real_in)
                y_real  = torch.full((B,), real_val, device=device, dtype=d_real.dtype)
                loss_real = criterion(d_real, y_real)

                # fake 경로
                z = torch.randn(B, CFG["z_dim"], 1, 1, device=device)
                with torch.no_grad():
                    fake = netG(z)
                fake_in = inst_noise(diffaug(fake.detach()))
                d_fake  = netD(fake_in)
                y_fake  = torch.zeros(B, device=device, dtype=d_fake.dtype)
                loss_fake = criterion(d_fake, y_fake)

                loss_D = loss_real + loss_fake
                # -----
                if global_step % 16 == 0:  # 빈도 조절
                  real_in_r1 = real_in.detach().requires_grad_(True)
                  d_real_r1  = netD(real_in_r1).sum()
                  grad = torch.autograd.grad(d_real_r1, real_in_r1, create_graph=True)[0]
                  r1 = grad.view(B, -1).pow(2).sum(1).mean()
                  loss_D = loss_D + (10.0/2.0)*r1  # gamma=10 권장 시작
                # -----
                loss_D.backward()
                torch.nn.utils.clip_grad_norm_(netD.parameters(), 5.0)
                optD.step()
            else:
                # D 건너뛸 때 로깅 변수 안전히 유지
                if 'loss_real' not in locals(): loss_real = torch.tensor(0.0, device=device)
                if 'loss_fake' not in locals(): loss_fake = torch.tensor(0.0, device=device)
                if 'loss_D'   not in locals(): loss_D   = torch.tensor(0.0, device=device)


            # --------- Generator 업데이트 -------------
            optG.zero_grad(set_to_none=True)
            z2 = torch.randn(B, CFG["z_dim"], 1, 1, device=device)
            fake2 = netG(z2)
            d_fake2 = netD(inst_noise(diffaug(fake2)))
            # y_gen   = torch.ones(B, device=device, dtype=d_fake2.dtype)
            # 둘 다 real_val(=0.9)로 통일
            y_gen  = torch.full((B,), real_val, device=device, dtype=d_fake2.dtype)
            loss_G = criterion(d_fake2, y_gen)
            loss_G.backward()
            torch.nn.utils.clip_grad_norm_(netG.parameters(), 5.0)
            optG.step()

            # --------- 상태 기반 DiffAug 보정 ----------
            # 200 스텝마다 최근 상태를 보고 임시 전환(너무 잦은 전환 방지)
            last_da_change_step = -10**9  # 추가

            if global_step % 200 == 0:
                try:
                    d_val = float(loss_D.item())
                    g_val = float(loss_G.item())
                    lf    = float(loss_fake.item())
                    last_da_change_step = adjust_diffaug(
                        diffaug, d_val, g_val, lf, global_step,
                        last_change_step=last_da_change_step, cool_down=600
                    )
                    if lf < 0.01:                    # 아주 낮을 때만 강제 strong
                        diffaug.set_strength("strong")
                    # G가 너무 이기면 완화
                    elif (g_val < 1.8 and d_val > 1.5):
                        diffaug.set_strength("mild")
                    # else: 스테이지 스케줄 유지
                except Exception:
                    pass


            # --------- 로깅 ----------
            if global_step % 200 == 0:
                print(f"[Ep {epoch:03d}] step {global_step:06d} | "
                      f"D:{loss_D.item():.3f} (r{loss_real.item():.3f}/f{loss_fake.item():.3f}) | "
                      f"G:{loss_G.item():.3f}")

            global_step += 1


        # 에폭 종료: 샘플 시각화 저장(Matplotlib)
        with torch.no_grad():
            netG.eval()
            sample = netG(z_fixed)  # [-1,1]
        img_path = f"{SAMPLE_DIR}/epoch_{epoch:03d}.png"
        save_grid_matplotlib(sample, img_path, nrow=8, title=f"Epoch {epoch}")
        # print(f"[Viz] saved: {img_path}")

        # 에폭 저장 규칙(0~100: 20ep, 101~500: 5ep)
        if should_save_ckpt(epoch, max_epoch_plan=max_epoch_plan):
            save_ckpt(epoch, global_step, netG, netD, optG, optD)

    # 최초 러닝 종료 안내(100 후 500까지 이어학습 가이드)
    print(f"[Train] finished. epochs={CFG['epochs']}.")

In [None]:
# 실행
# train_dcgan(max_epoch_plan=1500)

[CKPT] loaded: /content/drive/MyDrive/dcgan//dcgan_portraits128/ckpts/ckpt_e1055_s093895.pt (epoch 1055, step 93895)
[Train] start from epoch=1056, steps=93895
[Ep 1057] step 094000 | D:1.416 (r0.777/f0.620) | G:0.793
[Ep 1059] step 094200 | D:1.393 (r0.825/f0.568) | G:0.759
[CKPT] saved: /content/drive/MyDrive/dcgan//dcgan_portraits128/ckpts/ckpt_e1060_s094340.pt
[Ep 1061] step 094400 | D:1.443 (r0.732/f0.701) | G:0.754
[Ep 1063] step 094600 | D:1.399 (r0.869/f0.530) | G:0.861


KeyboardInterrupt: 

## D/G Loss 해석 가이드


### 1. **이상적 균형 상태**

* `loss_D ≈ 0.5 ~ 1.5`
* `loss_G ≈ 0.5 ~ 2.0`
* **특징**: D가 real/fake를 완벽히 구분하지 못하고, G도 꾸준히 발전 → 적당히 균형 잡힘.
* 이미지: 점점 사람 얼굴 구조가 뚜렷해지고, 에폭이 지날수록 눈·코·입이 구분됨.

<br>

### 2. **D가 너무 강함 (loss\_D ↓, loss\_G ↑)**

* `loss_D << 0.3`, `loss_G >> 3`
* **현상**:

  * D가 real/fake를 거의 완벽히 구분 → G가 제대로 학습 못 함
  * G 출력: 뭉개지거나 noise 같은 이미지
* **조치**:

  * D regularization → 라벨 스무딩(이미 적용), **instance noise** 추가
  * 학습률 줄이기: `lr_D ↓` or `전체 lr ↓`
  * 증강 약화 (DiffAug 너무 강하면 G가 힘들 수 있음)

<br>

### 3. **G가 너무 강함 (loss\_D ↑, loss\_G ↓)**

* `loss_D >> 3`, `loss_G << 0.3`
* **현상**:

  * D가 거의 속음, fake도 real처럼 인식
  * G가 모드 붕괴(mode collapse) → 같은 얼굴만 반복 출력
* **조치**:

  * D capacity 늘리기 (ndf ↑)
  * 증강 강도 ↑ (D가 구분할 힘을 얻도록)
  * 학습률 조정: `lr_G ↓`

<br>

### 4. **둘 다 높은 상태 (loss\_D ≈ 2~~5, loss\_G ≈ 2~~5)**

* **현상**:

  * 둘 다 제대로 학습 못 하는 상황 → 발산 위험
  * 이미지: 거의 잡음 수준 유지
* **조치**:

  * learning rate ↓
  * batch size ↑ (가능하다면)
  * weight init 다시 확인

<br>

### 5. **둘 다 낮은 상태 (loss\_D ≈ 0.1~~0.3, loss\_G ≈ 0.1~~0.3)**

* **현상**:

  * G와 D 모두 confidence 너무 높음 → 학습 거의 멈춤 (gradient vanish)
  * 출력: 항상 같은 퀄리티, 더 개선 안 됨
* **조치**:

  * lr ↑ 시도
  * dropout/instance noise 추가

<br>

## 📌 모니터링 팁

1. **비율 체크**

   * D/G 손실이 **항상 한쪽만 너무 유리하지 않은지** 확인 (한쪽이 완전히 낮으면 문제).
2. **시각화**

   * 손실 로그만 보지 말고 **매 에폭 샘플 이미지** 비교 → 눈·코·입 뭉개짐, 모드 붕괴, 노이즈 여부 직접 확인.
3. **학습 후반**

   * G/D 손실이 어느 정도 진동하면서 균형을 유지하는 게 가장 건강한 상태.
   * GAN은 “둘 다 0에 수렴”하지 않습니다. (그건 오히려 collapse 신호일 수 있어요)

<br>

✅ 정리

* `loss_D`가 너무 낮고 `loss_G`만 높다 → D 약화 필요.
* `loss_D`가 높고 `loss_G`만 낮다 → G 약화 필요.
* 둘 다 발산/수렴하면 → lr·batch·증강 조정.
* 가장 중요한 건 **샘플 이미지 품질**을 같이 보면서 해석하는 것.

