### Resnet 34 모델 만들기

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class BasicBlock(nn.Module):
    def __init__(self, in_channels, last_channels, stride=1, downsample=None):
        '''
        입력 채널수, 중간 채널 수, 최종 채널수
        stride는 기본 1. stage 별로 feature map의 크기를 줄일 경우 2
        downsample은 stride가 2일 경우 BasicBlock 입력 전 값도 1x1 conv, stride 2를 적용하여 사이즈를 줄이는 Conv block
        '''
        super().__init__()
        self.conv_block = nn.Sequential(
            # 첫번째 3x3 Conv. stage별로 Feature Map의 크기를 줄일 시 첫번째 Conv에서 줄임(3x3 kernel에 stride=2로)
            nn.Conv2d(in_channels, last_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(last_channels),
            nn.ReLU(),
            # 두번째 3x3 Conv
            nn.Conv2d(last_channels, last_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(last_channels),
        )
        self.stride = stride
        self.downsample = downsample

    def forward(self, x):
        identity = x
        out = self.conv_block(x)
        if self.downsample is not None:
            identity = self.downsample(x)
        
        out += identity
        out = F.relu(out)

        return out

In [None]:
class ResNet34(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv_block_01 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
        )
        # 3, 4, 6, 3개의 BasicBlock들로 이루어진 stage들. 
        self.stage_01 = self.make_basic_stage(in_channels=64, last_channels=64, stride=1, blocks=3)
        self.stage_02 = self.make_basic_stage(in_channels=64, last_channels=128, stride=2, blocks=4)
        self.stage_03 = self.make_basic_stage(in_channels=128, last_channels=256, stride=2, blocks=6)
        self.stage_04 = self.make_basic_stage(in_channels=256, last_channels=512, stride=2, blocks=3)

        self.adaptive_pool = nn.AdaptiveAvgPool2d(output_size=1)
        self.fc = nn.Linear(512, num_classes)

    def make_basic_stage(self, in_channels, last_channels, stride, blocks):
        # 모든 BasicBlock들을 담을 List
        layers = []
        downsample = None
        # 함수의 인자로 stride가 1이 아닌 2가 들어올 경우 downsample 생성. 
        if stride != 1:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels=in_channels, out_channels=last_channels, 
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(num_features=last_channels)
            )
        # 각 stage의 첫번째 Block. 함수의 stride 인자가 1 또는 2인지에 따라 생성된 downsample을 BasicBlock의 인자로 입력. 
        layers.append(BasicBlock(in_channels=in_channels, last_channels=last_channels,
                                     stride=stride, downsample=downsample))
        for _ in range(1, blocks):
            # 각 stage의 첫번째 Block을 제외하고는 모두 stride=1, downsample=None 
            layers.append(BasicBlock(in_channels=last_channels, last_channels=last_channels, 
                                    stride=1, downsample=None))
        
        #layer list에 있는 모든 BasicBlock들을 Sequential로 연결하여 반환 
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv_block_01(x)
        x = self.stage_01(x)
        x = self.stage_02(x)
        x = self.stage_03(x)
        x = self.stage_04(x)
        
        # GAP 및 최종 Classifier Layer forward
        x = self.adaptive_pool(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)

        return x

### ResNet 50 모델 살펴 보기
* Bottleneck Block이 개별 stage 별로 3, 4, 6, 3 개로 구성.
* Bottleneck Block은 1x1 Conv block, 3x3 Conv Block, 1x1 Conv Block으로 구성됨.
* Stage의 첫번째 Block에서 Feature Map의 크기를 절반으로 줄일 경우 3x3 Conv에 stride 2를 적용

In [None]:
from torchvision import models

resnet50_model = models.resnet50(weights=None)
print(resnet50_model)

In [None]:
from torchinfo import summary

summary(model=resnet50_model, input_size=(1, 3, 224, 224),
        col_names=['input_size', 'output_size', 'num_params'], 
        row_settings=['var_names'])

### ResNet 50용 Residual(Identity) Block 생성(BottleneckBlock)
* 1x1 Conv block -> 3x3 Conv Block -> 1x1 Conv Block으로 구성된 BottleneckBlock 생성
* 첫번째 Conv block에서 입력 채널의 갯수를 (주로)감소 시키며, 마지막 1x1 Conv Block에서는 채널 수를 증가 시킴
* stride가 2로 Feature map의 크기를 줄여야 할 경우 중간 3x3 Conv Block에서 stride 2를 적용
* downsample은 stage별 첫 BottleneckBlock에 적용됨.
![Residual Block Bottleneck](https://github.com/chulminkw/CNN_PG_Torch/blob/main/image/bottleneck_block_real.png?raw=true)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class BottleneckBlock(nn.Module):
    def __init__(self, in_channels, mid_channels, last_channels, stride=1, downsample=None):
        super().__init__()
        self.conv_block = nn.Sequential(
            # 첫번째 1x1 Conv, 채널의 갯수 감소
            nn.Conv2d(in_channels=in_channels, out_channels=mid_channels, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(),
            # 중간 3x3 Conv, stage별로 Feature Map의 크기를 줄일 시 두번째 Conv에서 줄임(3x3 kernel에 stride=2로)
            nn.Conv2d(mid_channels, mid_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(),
            # 마지막 1x1 Conv. 채널의 갯수를 다시 증가.
            nn.Conv2d(mid_channels, last_channels, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(last_channels)
        )
        self.downsample = downsample

    def forward(self, x):
        identity = x
        out = self.conv_block(x)
        if self.downsample is not None:
            identity = self.downsample(x)
        
        out += identity
        out = F.relu(out)

        return out

In [None]:
downsample = nn.Sequential(
    nn.Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False),
    nn.BatchNorm2d(256)
)
bottleneck = BottleneckBlock(in_channels=64, mid_channels=64, last_channels=256, stride=1, downsample=downsample)
print(bottleneck)
print(resnet50_model.layer1[0])

In [None]:
from torchinfo import summary

bottleneck = BottleneckBlock(in_channels=64, mid_channels=64, last_channels=256, 
                             stride=1, downsample=downsample)

summary(model=bottleneck, input_size=(1, 64, 56, 56),
        col_names=['input_size', 'output_size', 'num_params'], 
        row_settings=['var_names'])

### BottleneckBlock을 연속으로 연결하여 Stage를 만드는 함수 생성

In [None]:
def make_bottleneck_stage(in_channels, mid_channels, last_channels, stride, blocks):
    '''
    in_channels는 downsample의 입력 채널수, last_channels는 downsample의 출력 채널수
    stage별 첫번째 Bottleneck Block이 아닌 경우는 나머지 Bottleneck Block의 생성 인자로
    in_channels에 함수 인자 last_channels를 입력. 
    '''
    # 모든 BasicBlock들을 담을 List
    layers = []
    downsample = None
    # stride 여부와 관계없이 downsample이 적용됨. downsample의 stride는 함수의 인자로 전달됨. 
    downsample = nn.Sequential(
        nn.Conv2d(in_channels=in_channels, out_channels=last_channels, 
                  kernel_size=1, stride=stride, bias=False),
        nn.BatchNorm2d(num_features=last_channels)
    )
    # 각 stage의 첫번째 Block. stride는 함수의 인자로 전달됨. 
    layers.append(BottleneckBlock(in_channels=in_channels, mid_channels=mid_channels, last_channels=last_channels,
                                 stride=stride, downsample=downsample))
    for _ in range(1, blocks):
        # 각 stage의 첫번째 Block을 제외하고는 모두 stride=1, downsample은 None
        layers.append(BottleneckBlock(in_channels=last_channels, mid_channels=mid_channels, last_channels=last_channels, 
                                stride=1, downsample=None))

    #layer list에 있는 모든 BottleBlock들을 Sequential로 연결하여 반환 
    return nn.Sequential(*layers)
    

In [None]:
stage_01 = make_bottleneck_stage(in_channels=64, mid_channels=64, last_channels=256, stride=1, blocks=3)
print(stage_01)

In [None]:
base_mid_channels = 128

stage_02 = make_bottleneck_stage(in_channels=base_mid_channels * 2, 
                                 mid_channels=base_mid_channels, 
                                 last_channels=base_mid_channels * 4, stride=2, blocks=4)
print(stage_02)

### ResNet 50 모델 만들기

In [None]:
class ResNet50(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv_block_01 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
        )
        # 3, 4, 6, 3개의 Bottleneck Block들로 이루어진 stage들. 
        self.stage_01 = self.make_bottleneck_stage(in_channels=64, mid_channels=64,
                                                   last_channels=256, stride=1, blocks=3)
        self.stage_02 = self.make_bottleneck_stage(in_channels=256, mid_channels=128,
                                                   last_channels=512, stride=2, blocks=4)
        self.stage_03 = self.make_bottleneck_stage(in_channels=512, mid_channels=256, 
                                                   last_channels=1024, stride=2, blocks=6)
        self.stage_04 = self.make_bottleneck_stage(in_channels=1024, mid_channels=512,
                                                   last_channels=2048, stride=2, blocks=3)
        # print(self.modules)
        # for m in self.modules():
        #     if isinstance(m, nn.Conv2d):
        #         nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
        #     elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
        #         nn.init.constant_(m.weight, 1)
        #         nn.init.constant_(m.bias, 0)

        self.adaptive_pool = nn.AdaptiveAvgPool2d(output_size=1)
        self.fc = nn.Linear(2048, num_classes)

    def make_bottleneck_stage(self, in_channels, mid_channels, last_channels, stride, blocks):
        '''
        in_channels는 downsample의 입력 채널수, last_channels는 downsample의 출력 채널수
        stage별 첫번째 Bottleneck Block이 아닌 경우는 나머지 Bottleneck Block의 생성 인자로
        in_channels에 함수 인자 last_channels를 입력. 
        '''
        # 모든 BasicBlock들을 담을 List
        layers = []
        downsample = None
        # stride 여부와 관계없이 downsample이 적용됨. downsample의 stride는 함수의 인자로 전달됨. 
        downsample = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=last_channels, 
                      kernel_size=1, stride=stride, bias=False),
            nn.BatchNorm2d(num_features=last_channels)
        )
        # 각 stage의 첫번째 Block. stride는 함수의 인자로 전달됨. 
        layers.append(BottleneckBlock(in_channels=in_channels, mid_channels=mid_channels, last_channels=last_channels,
                                     stride=stride, downsample=downsample))
        for _ in range(1, blocks):
            # 각 stage의 첫번째 Block을 제외하고는 모두 stride=1, downsample은 None
            layers.append(BottleneckBlock(in_channels=last_channels, mid_channels=mid_channels, last_channels=last_channels, 
                                    stride=1, downsample=None))
    
        #layer list에 있는 모든 BottleBlock들을 Sequential로 연결하여 반환 
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv_block_01(x)
        x = self.stage_01(x)
        x = self.stage_02(x)
        x = self.stage_03(x)
        x = self.stage_04(x)
        
        # GAP 및 최종 Classifier Layer forward
        x = self.adaptive_pool(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)

        return x

In [None]:
my_resnet50_model = ResNet50(num_classes=1000)
summary(model=my_resnet50_model, input_size=(1, 3, 224, 224),
        col_names=['input_size', 'output_size', 'num_params'],
        depth=4,
        row_settings=['var_names'])

In [None]:
torch_resnet50_model = models.resnet50(weights='DEFAULT')
summary(model=torch_resnet50_model, input_size=(1, 3, 224, 224),
        col_names=['input_size', 'output_size', 'num_params'], 
        row_settings=['var_names'])

### ResNet 50 모델 학습 및 평가
* Flowers 데이터 세트로 학습 및 평가
* Trainer 클래스는 Modular 활용

In [None]:
!ls /kaggle/input

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

def create_flowers_meta_df(file_dir):
    paths = [] # 이미지 파일 경로 리스트
    
    labels = [] # 꽃 종류
    
    # os.walk()를 이용하여 특정 디렉토리 밑에 있는 모든 하위 디렉토리를 모두 조사. 
    # kaggle/input/flowers-dataset 하위 디렉토리 밑에 jpg 확장자를 가진 파일이 모두 이미지 파일임
    # kaggle/input/flowers-dataset 밑으로 하위 디렉토리 존재
    for dirname, _, filenames in os.walk(file_dir):
        for filename in filenames:
            # 이미지 파일이 아닌 파일도 해당 디렉토리에 있음.
            if '.jpg' in filename:
                # 파일의 절대 경로를 file_path 변수에 할당. 
                file_path = dirname+'/'+ filename
                paths.append(file_path)
                
                # 파일의 절대 경로에 daily, dandelion, roses, sunflowers, tulips에 따라 labels에 값 할당.               label_gubuns.append('daisy')
                if 'daisy' in file_path:
                    labels.append('daisy')
                elif 'dandelion' in file_path:
                    labels.append('dandelion')
                elif 'roses' in file_path:
                    labels.append('rose')
                elif 'sunflowers' in file_path:
                    labels.append('sunflowers')
                elif 'tulips' in file_path:
                    labels.append('tulips')
    # DataFrame 메타 데이터 생성. 
    data_df = pd.DataFrame({'path':paths, 
                            'label':labels})
    # Target값  변환
    label_mapping = {'daisy': 0, 'dandelion': 1, 'rose': 2, 'sunflowers': 3, 'tulips': 4}
    data_df['target'] = data_df['label'].map(label_mapping)

    return data_df

In [None]:
from sklearn.model_selection import train_test_split

data_df = create_flowers_meta_df('/kaggle/input/flowers-dataset') # /kaggle/input

# 전체 데이터 세트에서 학습(전체의 70%)과 테스트용(전체의 30%) 메타 정보 DataFrame 생성.
train_df, test_df = train_test_split(data_df, test_size=0.3, stratify=data_df['target'], random_state=2025)
# 기존 학습 DataFrame을 다시 학습과 검증 DataFrame으로 분할. 80%가 학습, 20%가 검증
tr_df, val_df = train_test_split(train_df, test_size=0.2, stratify=train_df['target'], random_state=2025)

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
from PIL import Image

BATCH_SIZE = 16

class FlowerDataset(Dataset):
    # 이미지 파일리스트, 타겟 파일리스트, transforms 등 이미지와 타겟 데이터 가공에 필요한 인자들을 입력 받음
    def __init__(self, image_paths, targets=None, transform=None):
        self.image_paths = image_paths
        self.targets = targets
        self.transform = transform
    
    # 전체 건수를 반환
    def __len__(self):
        return len(self.image_paths)
        
    # idx로 지정된 하나의 image, label을 tensor 형태로 반환
    def __getitem__(self, idx):    
        # PIL을 이용하여 이미지 로딩하고 PIL Image 객체 반환.
        pil_image = Image.open(self.image_paths[idx])
        # 보통은 transform이 None이 되는 경우는 거의 없음(Tensor 변환이라도 있음)
        image = self.transform(pil_image)

        if self.targets is not None:
            # 개별 target값을 tensor로 변환.
            target = torch.tensor(self.targets[idx])
            return image, target
        # 테스트 데이터의 경우 targets가 입력 되지 않을 수 있으므로 이를 대비. 
        else:
            return image

def create_tr_val_loader(tr_df, val_df, tr_transform, val_transform):
    tr_dataset = FlowerDataset(image_paths=tr_df['path'].to_list(), 
                            targets=tr_df['target'].to_list(), transform=tr_transform)
    val_dataset = FlowerDataset(image_paths=val_df['path'].to_list(), 
                            targets=val_df['target'].to_list(), transform=val_transform)
    
    tr_loader = DataLoader(tr_dataset, batch_size = BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=2*BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)

    return tr_loader, val_loader

In [None]:
IMG_SIZE = 224
IMG_MEANS = [0.485, 0.456, 0.406] # ImageNet 데이터세트의 이미지 채널별 평균값
IMG_STD = [0.229, 0.224, 0.225] # ImageNet 데이터세트의 이미지 채널별 표준편차값

tr_transform = T.Compose([
            T.RandomHorizontalFlip(p=0.3),
            T.RandomVerticalFlip(p=0.3),
            T.RandomApply([T.CenterCrop(size=(200,200))], p=0.4),
            T.Resize(size=(IMG_SIZE, IMG_SIZE)),
            T.ToTensor(), T.Normalize(mean=IMG_MEANS, std=IMG_STD)
])
    
val_transform = T.Compose([
            T.Resize(size=(IMG_SIZE, IMG_SIZE)),
            T.ToTensor(), T.Normalize(mean=IMG_MEANS, std=IMG_STD)
])

tr_loader, val_loader = create_tr_val_loader(tr_df=tr_df, val_df=val_df, 
                                             tr_transform=tr_transform, val_transform=val_transform)
images, labels = next(iter(tr_loader))
print(images.shape, labels.shape)

#### Trainer 클래스 생성 및 적용
* Trainer 클래스는 https://raw.githubusercontent.com/chulminkw/CNN_PG_Torch/main/modular/v1/utils.py?raw=true 로 download 후 import 함

In [None]:
# /kaggle/working/modular/v1 디렉토리에 utils.py 파일 다운로드
!rm -rf ./modular/v1
!mkdir -p ./modular/v1
!wget -O ./modular/v1/utils.py https://raw.githubusercontent.com/chulminkw/CNN_PG_Torch/main/modular/v1/utils.py?raw=true
!ls ./modular/v1

import sys

# 반드시 system path를 아래와 같이 잡아줘야 함. 
sys.path.append('/kaggle/working')

#아래가 수행되는지 반드시 확인
from modular.v1.utils import Trainer, Predictor, ModelCheckpoint, EarlyStopping

In [None]:
import torch
import torch.nn as nn
from torch.optim import SGD, Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau

NUM_INPUT_CHANNELS = 3
# 5개의 꽃 종류
NUM_CLASSES = 5

# 직접 구현한 ResNet50 모델을 이용
model = ResNet50(num_classes=NUM_CLASSES)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = Adam(model.parameters(), lr=4e-4) #lr = 1e-4
loss_fn = nn.CrossEntropyLoss()
scheduler = ReduceLROnPlateau(
            optimizer=optimizer, mode='min', factor=0.5, patience=5, threshold=0.01, min_lr=1e-6)

# 검증 데이터 정확도 기준 checkpoint 파일 생성. 
model_checkpoint = ModelCheckpoint('/kaggle/working/checkpoints', monitor='val_acc', mode='max')

trainer = Trainer(model=model, loss_fn=loss_fn, optimizer=optimizer,
                  train_loader=tr_loader, val_loader=val_loader, scheduler=scheduler,
                  callbacks=[model_checkpoint], device=device)
# 학습 및 평가.
history = trainer.fit(60)

In [None]:
from modular.v1.utils import Predictor

test_image_paths = test_df['path'].to_list()
test_targets = test_df['target'].to_list()

IMG_SIZE=224
test_transform = T.Compose([
                        T.Resize(size=(IMG_SIZE, IMG_SIZE)),
                        T.ToTensor(), 
                        T.Normalize(mean=[0.485, 0.456, 0.406], 
                                    std=[0.229, 0.224, 0.225])
])

test_dataset = FlowerDataset(image_paths=test_image_paths, 
                            targets=test_targets, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

# 가장 검증 성능이 좋은 weight 파일을 모델로 로딩. 
# state_dict = torch.load('/kaggle/working/checkpoints/checkpoint_epoch_37_val_acc_0.8444.pt', weights_only=True)
# best_trained_model = ResNet50(num_classes=NUM_CLASSES)
# best_trained_model.load_state_dict(state_dict)

#또는 맨 마지막 epoch로 학습된 model weight
best_trained_model = trainer.get_trained_model()

predictor = Predictor(model=best_trained_model, device=device)
eval_metric = predictor.evaluate(test_loader)
print(f'test dataset evaluation:{eval_metric:.4f}')