## 필요한 모듈들 임포트

## Fine tuning
사전 훈련된 모델을 기반으로 아키텍쳐를 새로운 목적에 맞게 변형하고 이미 학습된 모델의 가중치를 미세하게 조정해 학습시키는 방법
    - 사전 훈련된 모델의 가중치를 초기값으로 사용하고 추가로 학습
    - CNN 베이스의 사전학습 모델을 사용할 때에는 이전에 학습한 내용들을 모두 잊어버릴 위험이 있기 때문에 작은 learning rate를 사용하는것이 바람직함
[방법1] 모델 전체를 새로 학습
- 사전 학습 모델의 구조만 사용하면서, 자신의 데이터셋에 맞게 모델을 전부 새롭게 학습시키는 방법
- 데이터의 크기가 크고 유사성이 작을 때 사용
[방법2] Conv base는 일부분 고정(Freezing)하고 나머지 Conv Base 계층과 Classifier를 새롭게 학습
- Conv base: 합성곱 층과 풀링 층이 여러 겹 쌓여있는 부분으로 특징을 추출하는 역할
- Classifier: 주로 완전연결계층으로 구성되며 Conv base가 추출한 특징들을 잘 학습해 각각의 샘플들을 알맞은 class로 분류
- 낮은 레벨의 계층은 일반적이고 독리적인 특징(신형성)을 추출하고, 높은 레벨의 계층은 구체적이고 명확한 특징(형상)을 추출
- 크기가  크고 유사성도 높은 데이터셋일 때
[방법3] Conv Base는 고정하고 Classifier만 새로 학습
- 컴퓨팅 연산 능력이 부족하거나, 데이터셋이 너무 작을 때, 혹은 적용하려는 task와 사전학습에 쓰인 데이터가 매우 비슷할 때 사용

[크기가 작고 유사성도 작은 데이터]
- 방법2를 쓰되 조금 더 깊은 계층까지 새로 학습시키
- Data Augmentation 하기

[Feature Extraction]
사전 훈련된 모델의 하위 층을 동결하고, 상위 층을 새로운 작업을 위해 수정하지 않고 사용하는 것 -> 데이터분류기(마지막 완전연결층) 부분만 새로 만드는 것
    - 사전 훈련된 모델(고정): 중요한 특성 추출(학습 X)
    - 데이터분류기: 추출된 특성을 입력받아 최종적으로 이미지에 대한 클래스를 분류(학습O)
    - 가능한 모델: Xception, Inception V3, ResNet50, VGG16, VGG19, MobileNet

[ 분류기 수정하는 방법 ]
Fine tunning을 하기 전 input값으로 받는 이미지의 크기는 무조건 확인해야함
    - 대부분의 모델은 (224, 224)이지만 inception_v3의 경우 (299, 299)
1. print(model)로 마지막 층의 in_features 확인
2. nn.Linear(in_features, num_classes)로 클래스 수 변경
    - 예시1: Resnet
        - (fc): Linear(in_features=512, out_features=1000, bias=True)
            - => model.fc = nn.Linear(512, num_classes)
    - 예시2: Alexnet
        - (classifier): Sequential(... (6): Linear(in_features=4096, out_features=1000, bias=True)
            - => model.classifier[6] = nn.Linear(4096, num_classes)
    - 예시3(특이구조): Squeezenet
        - - output은 classifier의 첫번째 레이어인 1X1 convolution layer로부터 나옴
        - (classifier): Sequential((0): Dropout(p=0.5)  (1): Conv2d(512, 1000, kernel_size=13, stride=1, padding=0)  (2): ReLU(inplace)  (3): AvgPool2d(kernel_size=13, stride=1, padding=0))
            -  => model.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
    - 예시4(특이구조): Inception v3
        - Rethinking을 하기 때문에 학습과정에서 output이 두개의 레이어로부터 나옴
        - 두번째 output(auxiliary output): AuxLogits 부분을 포함하는 네트워크에 포함됨
        - 주된 output은 네트워크의 마지막 레이어에서 출력됨
            -  test 시에는 이 output만 고려함
        - (AuxLogits): InceptionAux(... (fc): Linear(in_feature=768, out_features=1000, bias=True) ... (fc): Linear(in_features=2048, out_features=1000, bias=True)
            -  finetune 하기 위해서는 두 레이어를 reshape 해줘야함
            -  => model.AuxLogits.fc = nn.Linear(768, num_classes)
            -  model.fc = nn.Linear(2048, nuum_classes)

[ 사용가능한 함수들 ]
- get_model(name, **config): 모델 이름과 환경설정을 인수로 받아 해당 모델의 인스턴스를 반환
    - get_model("quantized_mobilenet_v3_large", weights="DEFAULT")
- get_model_weight(name): 해당 모델의 열거형 가중치 클래스 반환
    - 예1: get_model_weights("quantized_mobilenet_v3_large")
    - 예2: get_model_weights(torchvision.models.quantization.mobilenet_v3_large)
    - 예1 = 예2
- get_weight(name): 열거형 가중치 변수의 이름으로 값을 가져옴
    - 예: get_weight("MobileNet_V3_Large_QuantizedWeights.DEFAULT")
- list_models(\[module]): 해당 이름으로 등록된 모델 목록을 반환


In [18]:
import copy
import time
from tqdm.notebook import tqdm as tqdm_notebook
import cv2
import torch
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
from torchvision import datasets, models
from torch.utils.data import random_split, DataLoader

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
# 사용가능한 모델 확인
for name in dir(torchvision.models):
    print(name)

AlexNet
AlexNet_Weights
ConvNeXt
ConvNeXt_Base_Weights
ConvNeXt_Large_Weights
ConvNeXt_Small_Weights
ConvNeXt_Tiny_Weights
DenseNet
DenseNet121_Weights
DenseNet161_Weights
DenseNet169_Weights
DenseNet201_Weights
EfficientNet
EfficientNet_B0_Weights
EfficientNet_B1_Weights
EfficientNet_B2_Weights
EfficientNet_B3_Weights
EfficientNet_B4_Weights
EfficientNet_B5_Weights
EfficientNet_B6_Weights
EfficientNet_B7_Weights
EfficientNet_V2_L_Weights
EfficientNet_V2_M_Weights
EfficientNet_V2_S_Weights
GoogLeNet
GoogLeNetOutputs
GoogLeNet_Weights
Inception3
InceptionOutputs
Inception_V3_Weights
MNASNet
MNASNet0_5_Weights
MNASNet0_75_Weights
MNASNet1_0_Weights
MNASNet1_3_Weights
MaxVit
MaxVit_T_Weights
MobileNetV2
MobileNetV3
MobileNet_V2_Weights
MobileNet_V3_Large_Weights
MobileNet_V3_Small_Weights
RegNet
RegNet_X_16GF_Weights
RegNet_X_1_6GF_Weights
RegNet_X_32GF_Weights
RegNet_X_3_2GF_Weights
RegNet_X_400MF_Weights
RegNet_X_800MF_Weights
RegNet_X_8GF_Weights
RegNet_Y_128GF_Weights
RegNet_Y_16GF_We

In [4]:
# 데이터 확인
for batch in train_dataloader:
    images, labels = batch

    first_image, first_label = images[0], labels[0]
    break
print(first_image, first_label)

tensor([[[-0.4911, -0.5253, -0.5596,  ..., -0.6794, -0.6794, -0.6623],
         [-0.4739, -0.5082, -0.4911,  ..., -0.6623, -0.6623, -0.6623],
         [-0.4739, -0.4397, -0.4226,  ..., -0.6452, -0.6452, -0.6623],
         ...,
         [-0.3369, -0.4054, -0.4226,  ..., -0.7822, -0.7822, -0.7822],
         [-0.3198, -0.3883, -0.4226,  ..., -0.7822, -0.7822, -0.7822],
         [-0.3198, -0.3883, -0.4397,  ..., -0.7993, -0.7822, -0.7822]],

        [[ 0.1352,  0.1001,  0.0651,  ..., -0.1975, -0.1975, -0.1800],
         [ 0.1176,  0.1176,  0.1001,  ..., -0.1800, -0.1800, -0.1800],
         [ 0.0826,  0.1527,  0.1352,  ..., -0.1625, -0.1625, -0.1800],
         ...,
         [ 0.1702,  0.1001,  0.0476,  ..., -0.2675, -0.2675, -0.2675],
         [ 0.1877,  0.1176,  0.0476,  ..., -0.2675, -0.2675, -0.2675],
         [ 0.1877,  0.1176,  0.0476,  ..., -0.2500, -0.2325, -0.2325]],

        [[ 0.8797,  0.8448,  0.8099,  ...,  0.4962,  0.4962,  0.5136],
         [ 0.8622,  0.8622,  0.8448,  ...,  0

## 모델 구성

In [5]:
# 모델 로드
"""
        MobileNet V3 main class

        Args:
            inverted_residual_setting (List[InvertedResidualConfig]): Network structure
            last_channel (int): The number of channels on the penultimate layer
            num_classes (int): Number of classes
            block (Optional[Callable[..., nn.Module]]): Module specifying inverted residual building block for mobilenet
            norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use
            dropout (float): The droupout probability
        """
# MobileNetV3 Large 모델 구현
class CustomMobileNetV3Large(nn.Module):
    def __init__(self, num_classes=100):
        super(CustomMobileNetV3Large, self).__init__()
        self.features = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.DEFAULT, progress=True).features
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(960, num_classes, 1, 1, 0),
            nn.Flatten()
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

## 모델 학습

In [6]:
# 모델 초기화
model = CustomMobileNetV3Large(num_classes=500).to(device)
model.train()  # 모델을 학습 모드로 설정

# Fine-tuning을 위한 손실함수 및 옵티마이저  설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Fine-tuning을 위한 반복 학습
num_epochs = 2
for epoch in tqdm_notebook(range(num_epochs), desc='Epochs'):
    running_loss = 0.0
    since = time.time()
    # leave=False 설정으로 출력메세지를 깔끔하게 유지
    for i, data in enumerate(tqdm_notebook(train_dataloader, desc='Batches', leave=False)):
      inputs, labels = data
      inputs, labels = inputs.to(device), labels.to(device)
      optimizer.zero_grad()
      outputs = model(inputs)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      running_loss += loss.item()

    time_elapsed = start - time.time()
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_dataloader)} ({time_elapsed})')

Epochs:   0%|          | 0/2 [00:00<?, ?it/s]

Batches:   0%|          | 0/1001 [00:00<?, ?it/s]

KeyboardInterrupt: 

## 모델 추론 - 테스트 데이터셋

In [None]:
# 테스트 데이터셋
model.eval()

correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in tqdm_notebook(valid_dataloader):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy}')

## 모델 추론 - 임의의 이미지

In [None]:
img = cv2.imread('apple.jpg')
img = transform_test(img).unsqueeze(0)

with torch.no_grad():
    outputs = model(img.to(device))

_, predicted_class = torch.max(outputs, 1)
predicted_class = predicted_class.item()
predicted_class

## 작업중

## 기본설정

In [31]:
model_list = ['resnet', 'alexnet', 'vgg', 'squeezenet', 'densenet', 'inception', 'mobilenet']
# model list로부터 사용할 모델 고르기
model_name = 'mobilenet'

# 데이터셋에 있는 클래스 수
num_classes = 500
# 학습 배치사이즈
batch_size = 128
# 학습 에폭
num_epochs = 5
# 피쳐 추출을 위한 Flag
#   False: 모델 전체를 파인튜닝
#   True: 매개변수만 업데이트
feature_extract = True

## Helper Functions

In [40]:
# 모델학습 헬퍼 함수
def train_models(model, dataloaders, criterion, optimizer, num_epochs=10, is_inception=False):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch} / {num_epochs - 1}')
        print('-' * 10)

        # 각 에폭은 학습/검증 phase를 가짐
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 트레이닝 모드 설정
            else:
                model.eval()  # 추론 모드 설정

            running_loss = 0.0
            running_corrects = 0

            # 데이터 학습하기
            for inputs, labels in tqdm_notebook(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                # 파라미터 기울기 초기화
                optimizer.zero_grad()

                # forward
                # 학습 모드인 경우에만 history 추적
                with torch.set_grad_enabled(phase == 'train'):
                    # 모델의 ouptut과 loss를 구함
                    # inception의 경우 학습시 auxiliary output이 있는 특수 케이스임.
                    #   학습시: final output과 auxiliary output을 더하는 과정이 필요함
                    #   테스트시: final output만 고려
                    # Auxilary output을 같이 고려해야하는 학습단계
                    if is_inception and phase == 'train':
                        outputs, aux_outputs = model(inputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4 * loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # 학습(backward + optimize)
                    if phase == 'train':
                        loss.backwawrd()
                        optimizer.step()

                # loss 구하기
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {.4f}'.format(phase, epoch_loss, epoch_acc))

            # 모델 깊은복사
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:.4f}'.format(best_acc))

    # 베스트 모델 가중치를 로드
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

#### 모델 파라미터의 requires_grad 속성
- 기본적으로 사전  훈련된 모델을 로드할 때 모든 매개변수의 requires_grad = True로 설정되어 있음
    - True: 처음부터 훈련하거나 fine tuning할 때는 True
    - False(Freeze): 기존 layer의 weight를 고정

In [34]:
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

## 모델 초기화

In [35]:
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True, quantize=False):
    # 모델마다 다르게 지정될 변수들 초기화
    model_ft = None
    input_size = 0

    if model_name == 'resnet':
        if quantize:
            '''Quantized Resnet50'''
            weights = models.quantization.ResNet50_QuantizedWeights.DEFAULT
            model_ft = models.quantization.resnet50(weights=weights, quantize=True)
        else:
            '''Resnet50'''
            model_ft = models.resnet50(pretrained=use_pretrained)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        set_parameter_requires_grad(model_ft, feature_extract)
        input_size = 224

    elif model_name == 'mobilenet':
        if quantize:
            '''Quantized Mobilenet v3 large'''
            weights = models.quantization.MobileNet_V3_Large_QuantizedWeights.DEFAULT
            model_ft = models.quantization.mobilenet_v3_large(weigts=weights, quantize=True)
            num_ftrs = model_ft.classifier[3].in_features
            model_ft.classifier[3] = nn.Linear(num_ftrs, num_classes)
        else:
            model_ft = CustomMobileNetV3Large(num_classes=500).to(device)
        set_parameter_requires_grad(model_ft, feature_extract)
        input_size = 224

    elif model_name == 'alexnet':
        '''AlexNet'''
        model_ft = models.alexnet(pretrianed=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_featrues
        model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == 'vgg':
        '''VGG11_bn'''
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == 'squeezenet':
        '''Squeezenet 1.0'''
        model_ft = models.squeezenet1_0(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    elif model_name == 'densenet':
        '''Densenet 121'''
        model_ft = models.densenet121(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == 'inception':
        '''Inception V3'''
        if quantize:
            weights = models.quantization.Inception_V3_QuantizedWeights.DEFAULT
            model_ft = models.quantization.inception_v3(weights=weights, quantize=True)
        else:
            model_ft = models.inception_v3(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        # Auxilary net
        num_ftrs = model_ft.AuxLogits.fc.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
        # primary net
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 299  # 다른 모델과 다르게 299 사이즈를 사용
    else:
        print('모델의 이름을 잘못 입력하여 종료합니다...')
        exit()

    return model_ft, input_size

In [36]:
# 데이터 경로
data_root = f'D:/data/training/sources/cropped'

# 데이터 변환
data_transforms = {
    'train': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# ImageFolder로 전체 데이터셋 생성
image_datasets = {x: datasets.ImageFolder(data_root, data_transforms[x]) for x in ['train', 'val']}
batch_size = 128
dataloaders_dict = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4) for x in ['train', 'val']}

# # 데이터셋 분할 (예: 80% 훈련, 20% 유효성 검사)
# train_size = int(0.8 * len(dataset))
# valid_size = len(dataset) - train_size
# train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size])
#
# # 데이터 로더 생성
# batch_size = 128
# train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

In [41]:
model_name = 'mobilenet'
num_classes = 500
feature_extract = False
use_pretrained = True
quantize = True

model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=use_pretrained, quantize=quantize)
model_ft = model_ft.to(device)
print(model_name)

train_models(model_ft, dataloaders_dict, criterion, optimizer, num_epochs=10, is_inception=False)

mobilenet
Epoch 0 / 9
----------


  0%|          | 0/1251 [00:00<?, ?it/s]

RuntimeError: empty_strided not supported on quantized tensors yet see https://github.com/pytorch/pytorch/issues/74540

In [None]:
# 모델 인스턴스 생성하고 구조 확인하기
# 1. 일반 레즈넷, 마지막 fc레이어만 변경
model_name = 'resnet'
num_classes = 500
feature_extract = False
use_pretrained = True
quantize = False

model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True, quantize=False)
print(model_name)
print(model_ft)

In [None]:
# 2. 양자화된 레즈넷, 마지막 fc레이어만 변경 - QuantizedLinear 나와야하는데 일반 Linear만나옴
model_name = 'resnet'
num_classes = 100
feature_extract = False
use_pretrained = True
quantize = True

model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True, quantize=True)
print('quantized ' + model_name)
print(model_ft)

In [None]:
# 헬퍼함수 없이 만든 양자화된 레즈넷
weights = models.quantization.ResNet50_QuantizedWeights.DEFAULT
model_ft = models.quantization.resnet50(weights=weights, quantize=True)
print(model_ft)

In [21]:
weights = models.quantization.MobileNet_V3_Large_QuantizedWeights.DEFAULT
model_ft = models.quantization.mobilenet_v3_large(weigts=weights, quantize=True)
print(model)



CustomMobileNetV3Large(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
        )
      )
    )
    (2): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 64, kernel_size=(1, 1), stride

  1%|          | 1/81 [00:00<00:55,  1.44it/s]

{'file_name': [1, 2], 'dir': [0, 1]}


  2%|▏         | 2/81 [00:01<00:51,  1.53it/s]

{'file_name': [1, 2], 'dir': [0, 1]}


  4%|▎         | 3/81 [00:01<00:46,  1.66it/s]

{'file_name': [1, 2], 'dir': [0, 1]}


  5%|▍         | 4/81 [00:02<00:44,  1.73it/s]

{'file_name': [1, 2], 'dir': [0, 1]}


  6%|▌         | 5/81 [00:02<00:42,  1.81it/s]

{'file_name': [1, 2], 'dir': [0, 1]}


  6%|▌         | 5/81 [00:03<00:53,  1.42it/s]

{'file_name': [1, 2], 'dir': [0, 1]}




KeyboardInterrupt: 