In [1]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from efficientnet_pytorch import EfficientNet

# 시드 고정
seed = 2021
deterministic = True

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
if deterministic:
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# ======================
# 1. Early Stopping
# ======================
class EarlyStopping:
    def __init__(self, patience=10, delta=0.001):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None or val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True


# ======================
# 2. Transformer Encoder
# ======================
"""
PyTorch 내장 모듈: nn.TransformerEncoderLayer, nn.TransformerEncoder
- d_model: 토큰 임베딩 차원
- nhead: 멀티헤드 셀프 어텐션의 헤드 개수
- dim_feedforward: FFN(hidden) 차원
- num_layers: Transformer EncoderLayer 쌓는 깊이
"""
class TransformerEncoder(nn.Module):
    def __init__(self, d_model=256, nhead=4, num_layers=2, dim_feedforward=512):
        super(TransformerEncoder, self).__init__()
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            batch_first=True  # (B, N, E) 형태로 입력
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def forward(self, x):
        # x: (B, N, d_model)
        # TransformerEncoder -> (B, N, d_model)
        out = self.transformer_encoder(x)
        return out


# ======================
# 3. Hybrid CNN + Transformer
# ======================
class HybridCNNTransformer(nn.Module):
    """
    1) EfficientNet 으로부터 2D 특징 추출
    2) Flatten & Embedding -> Transformer Encoder
    3) 최종 FC로 분류
    """
    def __init__(self, model_name="efficientnet-b0", num_classes=6, freeze_backbone=False, 
                 d_model=256, nhead=4, num_layers=2):
        super(HybridCNNTransformer, self).__init__()
        
        # 1) EfficientNet 백본
        self.backbone = EfficientNet.from_pretrained(model_name)
        
        # (옵션) 동결
        if freeze_backbone:
            for param in self.backbone.parameters():
                param.requires_grad = False
        
        # 마지막 FC 제거
        in_features = self.backbone._fc.in_features  # b0의 경우 1280
        self.backbone._fc = nn.Identity()
        
        # 2) 채널 맞추기 (2D->1D로 보낼 때 임베딩 차원과 매칭)
        #    (1280 -> d_model)
        self.conv1x1 = nn.Conv2d(in_features, d_model, kernel_size=1)
        
        # 3) Transformer Encoder
        self.transformer = TransformerEncoder(
            d_model=d_model, 
            nhead=nhead, 
            num_layers=num_layers, 
            dim_feedforward=2*d_model
        )
        
        # 4) 최종 분류기
        #    Transformer 출력을 평균 풀링 or CLS 토큰 개념 사용
        #    여기서는 단순 global average over sequence
        self.classifier = nn.Sequential(
            nn.Linear(d_model, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_classes)
        )
        
    def forward(self, x):
        """
        x: (B, 3, H, W)
        1) backbone.extract_features -> (B, 1280, H', W')
        2) conv1x1 -> (B, d_model, H', W')
        3) Flatten: (B, d_model, H'*W')
        4) (B, H'*W', d_model) 형태로 transpose
        5) Transformer Encoder
        6) Sequence-wise pooling -> classification
        """
        # (a) CNN Backbone
        x = self.backbone.extract_features(x)    # (B, 1280, H', W')  (b0기준 H'/W' ~ 7)
        
        # (b) Channel 1280 -> d_model
        x = self.conv1x1(x)                     # (B, d_model, H', W')
        
        # (c) Flatten (B, d_model, H'*W') -> (B, H'*W', d_model)
        B, C, H_, W_ = x.shape
        x = x.view(B, C, H_*W_).transpose(1, 2)  # (B, N=H'*W', d_model=C)
        
        # (d) Transformer
        x = self.transformer(x)                 # (B, N, d_model)
        
        # (e) Sequence-wise Global Avg
        #    x.mean(dim=1): (B, d_model)
        x = x.mean(dim=1)
        
        # (f) Classification
        x = self.classifier(x)                  # (B, num_classes)
        return x


# ======================
# 4. 하이퍼파라미터
# ======================
model_name = 'efficientnet-b0'
num_classes = 6
batch_size = 32
epochs = 20
learning_rate = 0.001
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ======================
# 5. 데이터셋 로드
# ======================
image_size = EfficientNet.get_image_size(model_name)
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])

dataset_path = 'C:/Users/IIALAB/Desktop/kdm/solar/kaggle/input/solar-panel-images/Faulty_solar_panel'
dataset = datasets.ImageFolder(root=dataset_path, transform=transform)

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# ======================
# 6. 모델 초기화
# ======================
model = HybridCNNTransformer(
    model_name=model_name, 
    num_classes=num_classes, 
    freeze_backbone=False,
    d_model=256,      # Transformer 차원
    nhead=4,          # Multi-head 개수
    num_layers=2      # TransformerEncoderLayer 쌓는 수
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# ======================
# 7. 학습/평가 함수
# ======================
def train(model, loader, criterion, optimizer):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * images.size(0)
        _, pred = outputs.max(1)
        correct += (pred == labels).sum().item()
        total += labels.size(0)
    return running_loss/total, correct/total

def evaluate(model, loader, criterion):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * images.size(0)
            _, pred = outputs.max(1)
            correct += (pred == labels).sum().item()
            total += labels.size(0)
    return running_loss/total, correct/total

# ======================
# 8. 학습 루프
# ======================
best_test_acc = 0.0
best_epoch = 0
early_stopping = EarlyStopping(patience=5)

for epoch in range(epochs):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer)
    test_loss, test_acc = evaluate(model, test_loader, criterion)

    if test_acc > best_test_acc:
        best_test_acc = test_acc
        best_epoch = epoch + 1

    early_stopping(test_loss)
    if early_stopping.early_stop:
        print("Early stopping triggered")
        break

    print(f"[Epoch {epoch+1}/{epochs}]")
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"  Test  Loss: {test_loss:.4f}  | Test Acc: {test_acc:.4f}")

print(f"\nTraining complete! Best Test Accuracy: {best_test_acc:.4f} at Epoch {best_epoch}")


Loaded pretrained weights for efficientnet-b0
[Epoch 1/20]
  Train Loss: 1.1113 | Train Acc: 0.6229
  Test  Loss: 1.7787  | Test Acc: 0.4407
[Epoch 2/20]
  Train Loss: 0.5824 | Train Acc: 0.8136
  Test  Loss: 1.1869  | Test Acc: 0.6384
[Epoch 3/20]
  Train Loss: 0.5900 | Train Acc: 0.8008
  Test  Loss: 1.2858  | Test Acc: 0.5763
[Epoch 4/20]
  Train Loss: 0.4091 | Train Acc: 0.8785
  Test  Loss: 0.8398  | Test Acc: 0.7853
[Epoch 5/20]
  Train Loss: 0.4289 | Train Acc: 0.8545
  Test  Loss: 0.6478  | Test Acc: 0.8475
[Epoch 6/20]
  Train Loss: 0.4264 | Train Acc: 0.8757
  Test  Loss: 0.6715  | Test Acc: 0.7684
[Epoch 7/20]
  Train Loss: 0.5154 | Train Acc: 0.8277
  Test  Loss: 1.0415  | Test Acc: 0.6949
[Epoch 8/20]
  Train Loss: 0.3160 | Train Acc: 0.9110
  Test  Loss: 0.9947  | Test Acc: 0.7514
[Epoch 9/20]
  Train Loss: 0.2379 | Train Acc: 0.9336
  Test  Loss: 0.8310  | Test Acc: 0.7797
Early stopping triggered

Training complete! Best Test Accuracy: 0.8475 at Epoch 5
