# Library

In [1]:
from typing import List, Dict, Any, Tuple, Callable, Union, Optional

import sys, os, cv2, re
import torch
from torch import nn
import numpy as np
import pandas as pd

from scripts.default_setting import *

from GGUtils.utils.path import do_or_load, GetAbsolutePath
from GGUtils.img.viewer import show_img, show_imgs
from GGUtils.utils.utils import time_checker
from GGUtils.log.progressbar import ProgressBar
from GGDL.utils import set_seed_everything, make_basic_directory, GetDevice, Option
from GGDL.idx_dict.key_df import make_basic_key_df, binary_label_convertor
from GGDL.idx_dict.make_dict import make_stratified_idx_dict
from GGDL.data_loader.classification import ImgDataset, GetLoader, show_dataset_img, show_batch_imgs
from GGDL.model.vision import Classification, TimmHelper
from GGDL.model.fine_tuning import Tuner
from GGDL.model.optimzer import Optim, LabelDtype
from GGDL.pipeline.pipeline import BackPropagation
from GGDL.pipeline.log import Log
from GGDL.metrics.classification import MetricsClassification

from GGImgMorph.scenario import sample_augment      # 증강 알고리즘

# Option

In [2]:
# process id
PROCESS_ID = os.getpid()
print("해당 process의 id: ", PROCESS_ID)

# 학습 상태 출력
VERBOSE = 0

# log save 여부
SAVE_LOG = True

# 모델 학습 및 추론을 Test code로 진행할지
# CODE_TEST가 None이 아닌 실수인 경우, 전체 데이터의 양을 n%로 감소시켜 전체 프로세스를 진행한다.
CODE_TEST = 0.1
if CODE_TEST is not None:
    print(f"Code Test: True, 전체 데이터의 {CODE_TEST}로 프로세스를 처리한다.")

해당 process의 id:  419802
Code Test: True, 전체 데이터의 0.1로 프로세스를 처리한다.


### Option 1. model

In [3]:
# timm에서 사용하고자 하는 모델의 이름을 찾는다.
_model_name_ptn = "mobilenetv1_.+_r224"
TimmHelper.search(_model_name_ptn)

['mobilenetv1_100.ra4_e3600_r224_in1k',
 'mobilenetv1_100h.ra4_e3600_r224_in1k',
 'mobilenetv1_125.ra4_e3600_r224_in1k']

In [4]:
# model 관련 설정
MODEL_NAME = 'mobilenetv1_100.ra4_e3600_r224_in1k'
IMG_CHANNEL = 3              # image의 channel 크기
CLASS_SIZE = 0               # model이 추론할 class의 크기
PRETRAINED = True            # pre-training 여부
USE_AMP = True               # AMP 사용 여부
USE_CLIPPING = True          # Grad clipping 사용 여부


# Custom header
class HeaderBlock(nn.Module):
    def __init__(self, input_dim:int, output_dim:int, dropout_prob:float):
        super(HeaderBlock, self).__init__()
        self.batch_norm = nn.BatchNorm1d(input_dim)
        self.linear = nn.Linear(input_dim, output_dim)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        x = self.batch_norm(x)
        x = self.linear(x)
        x = self.gelu(x)
        return self.dropout(x)


def custom_header(x:int)->nn.Module:
    """
    Pre-Activation Batch Normalization
    - 깊은 Backbone 모델의 header이므로, Internal Covariate Shift 문제 해결을 위해 사용

    BottleNeck
    - 정보를 확장하여 중요한 정보만 남겨, 계산 효율성을 유지하면서 높은 표현력 제공
    """
    header = nn.Sequential(
        HeaderBlock(input_dim=x, output_dim=1024, dropout_prob=0.1),
        HeaderBlock(input_dim=1024, output_dim=512, dropout_prob=0.3),
        HeaderBlock(input_dim=512, output_dim=128, dropout_prob=0.5),
        nn.Linear(128, 1)
    )
    return header

CUSTOM_HEAD_FN = custom_header
EXTRA_ACTIVATION_FN = "sigmoid"     # 모델의 출력 결과가 통과하는 추가 활성화 함수 존재

# 모델 추론을 위한 metrics
METRICS = MetricsClassification(is_binary=True, threshold=0.5)
# 모델의 종류
MODEL_TYPE = "classification"
# 모델 log 설정
LOG_INS = Log(log_dir=LOG, process_id=PROCESS_ID, model_type=MODEL_TYPE, save_log=SAVE_LOG)

### Option 2. pipe line setting

In [5]:
# gpu 상태 확인
GET_DEVICE = GetDevice()
GET_DEVICE.summary()

CUDA is available.
GPU size: 2
------------------------------------------------------------
GPU number: 0
Name: NVIDIA GeForce RTX 3080 Ti
Computer capability: 8.6
VRAM: 12GB
------------------------------------------------------------
GPU number: 1
Name: NVIDIA GeForce GTX 750
Computer capability: 5.0
VRAM: 1GB
------------------------------------------------------------


In [6]:
# device 설정
GPU = 0
DEVICE = GET_DEVICE(GPU)
torch.cuda.set_device(DEVICE)

# data loader 관련 설정
DATASET_CLASS = ImgDataset           # dataset의 class
TRAIN_DATASET = {
    "augments":sample_augment, 
    "resize":224,
    "resize_how":0,                  # resize 방법
    "resize_how_list":[2, 3, 4],     # 무작위 resize 시, 방법의 list
    "resize_padding_color":"random"  # # resize padding 시, pixel의 색
}
VALID_DATASET = {
    "resize":224,
    "resize_how":2,                  # resize 방법
    "resize_padding_color":"black"  # # resize padding 시, pixel의 색
}
BATCH_SIZE = 16
WORKER = 0                          # DataLoader의 num_worker

# early stopping 시 경로
ESTOP_PATH = f"{SOURCE}/{ESPOINT_DIR}/process_{PROCESS_ID}"

# process 진행 중 생성되는 파일들이 저장되는 초기 디렉터리 초기화 여부
MAKE_NEW_DEFAULT_DIR = True

### Option 3. Hyper Parameter

In [7]:
HP_DICT = {
    'epochs':5,
    'lr':0.0001,
    'betas':(0.9, 0.999),
    'eps':1e-08,
    'clipping_max_norm':5
}
OPTIM_INS = Optim(name='Adam', hp_dict=HP_DICT)
LOSS_FN = nn.BCEWithLogitsLoss()
# label의 dtype을 loss function에 맞게 설정
LABEL_DTYPE_INS = LabelDtype(loss_fn=LOSS_FN)

# Process
### Process 1. make key_df
* key_df는 img의 절대 경로("path")와 label("label") 두 개의 컬럼으로 구성된 DataFrame 이다.

In [8]:
# 경로 정보
TRAIN_SET = "/mnt/d/rawdata/dogs-vs-cats/train/"        # train set의 경로
TEST_SET = "/mnt/d/rawdata/dogs-vs-cats/test1/"         # test set의 경로

In [9]:
# key_df 생성
path_list = GetAbsolutePath(None).get_all_path(parents_path=TRAIN_SET)
key_df = make_basic_key_df(
    paths=path_list,
    labels=[re.split(r".+/", i, maxsplit=1)[1].split('.')[0] for i in path_list]
)
# label을 이진 분류로 변환
key_df['label'] = binary_label_convertor(array=key_df['label'], positive_class='dog')

# 이해를 돕기 위한 key_df 출력
key_df

Unnamed: 0,path,label
0,/mnt/d/rawdata/dogs-vs-cats/train/cat.0.jpg,0
1,/mnt/d/rawdata/dogs-vs-cats/train/cat.1.jpg,0
2,/mnt/d/rawdata/dogs-vs-cats/train/cat.10.jpg,0
3,/mnt/d/rawdata/dogs-vs-cats/train/cat.100.jpg,0
4,/mnt/d/rawdata/dogs-vs-cats/train/cat.1000.jpg,0
...,...,...
24995,/mnt/d/rawdata/dogs-vs-cats/train/dog.9995.jpg,1
24996,/mnt/d/rawdata/dogs-vs-cats/train/dog.9996.jpg,1
24997,/mnt/d/rawdata/dogs-vs-cats/train/dog.9997.jpg,1
24998,/mnt/d/rawdata/dogs-vs-cats/train/dog.9998.jpg,1


### Process 2. make idx_dict

In [10]:
MAKE_NEW_IDX_DICT = True                            # idx_dict을 새로 생성할지 여부
IDX_DICT_PATH = f"{SOURCE}/idx_dict.pickle"         # idx_dict이 저장될 경로

# idx_dict 생성 방식
K_SIZE = 5                  # k-fold의 크기 (Stratified sampling)
TEST_RATIO = 0.2            # test dataset ratio
VALID_RATIO = 0.1           # validation dataset ratio, None인 경우 생성하지 않음

In [11]:
# 기초 디렉터리 생성
make_basic_directory(source=SOURCE, estop_dir=ESPOINT_DIR, log=LOG, result=RESULT, make_new=MAKE_NEW_DEFAULT_DIR)

# idx_dict 생성
IDX_DICT = do_or_load(
    savepath=IDX_DICT_PATH, makes_new=MAKE_NEW_IDX_DICT, 
    fn=make_stratified_idx_dict,
    key_df=key_df, stratified_columns=['label'], is_binary=True,
    path_col='path', label_col='label', 
    k_size=K_SIZE, test_ratio=TEST_RATIO, valid_ratio=VALID_RATIO, 
    code_test=CODE_TEST
)

### Process 3. make option instance

In [12]:
option = Option(
    process_id=PROCESS_ID, verbose=VERBOSE, log_ins=LOG_INS,

    model_name=MODEL_NAME, pretrained=PRETRAINED, device=DEVICE, use_amp=USE_AMP, use_clipping=USE_CLIPPING,
    img_channel=IMG_CHANNEL, class_size=CLASS_SIZE, custom_header=CUSTOM_HEAD_FN, extra_activation_fn=EXTRA_ACTIVATION_FN,
    metrics=METRICS,

    optimizer=OPTIM_INS, loss_fn=LOSS_FN, label_dtype_fn=LABEL_DTYPE_INS, 
    tuner_how=2, hp_dict=HP_DICT, 

    dataset_class=DATASET_CLASS, trainset_kwargs=TRAIN_DATASET, validset_kwargs=VALID_DATASET, batch_size=BATCH_SIZE, worker=WORKER,

    idx_dict=IDX_DICT, results_parents=RESULT, espoint_parents=f"{SOURCE}/{ESPOINT_DIR}"
)

### Process 4. model training

In [13]:
# 학습 전 모든 seed 고정
set_seed_everything(seed=SEED)
# Log directory와 file 생성
option.log_ins.make()

# k-fold cross validation
for k in option.idx_dict.keys():
    k_idx_dict = option.idx_dict[k]     # k-fold에 대한 idx_dict
    break

In [14]:
option.log_ins.k = k        # log의 k값 설정

# Data Loader 정의
loader = GetLoader(
    idx_dict=k_idx_dict, dataset_class=option.dataset_class, 
    batch_size=option.batch_size, workers=option.worker
)
loader(key="train", **option.trainset_kwargs)
loader(key="test", **option.validset_kwargs)
if "valid" in k_idx_dict:
    loader(key="valid", **option.validset_kwargs)

# model 정의
model = Classification(
    model_name=option.model_name, pretrained=option.pretrained, 
    channel=option.img_channel, class_size=option.class_size,
    custom_head_fn=option.custom_header
).to(option.device)

# Optimizer 설정
optimizer = option.optimizer(param=model.parameters())
grad_scaler = torch.GradScaler(device=option.device) if option.use_amp else None
back_propagation = BackPropagation(
    optimizer=optimizer, use_amp=option.use_amp, grad_scaler=grad_scaler,
    use_clipping=option.use_clipping, 
    max_norm=option.hp_dict['clipping_max_norm'] if 'clipping_max_norm' in option.hp_dict else None
)

# Fine tuning 방법 정의
tuner = Tuner(model, how=2, freezing_ratio=0.9)
tuner(epoch=0)      # model parameter 초기 변화

In [15]:
import time
from torch.utils import data

In [16]:
class Classification:
    def __init__(self, model, loader, optimizer, back_propagation, option):
        self.model = model
        self.loader = loader
        self.optimizer = optimizer
        self.back_propagation = back_propagation
        self.option = option


    def fit(self):
        pass


    def _fit_iterator(self, epoch_log_txt:str)->Tuple[float, float]:
        pbar_ins = ProgressBar(header="Iterator ", time_log=True, memory_log=False, step=5, verbose=self.option.verbose)
        stack_output = np.array([], dtype=np.float32)
        stack_label = np.array([], dtype=np.float32)
        loss_list = []

        # 학습 모드로 설정
        self.model.train()
        for i, (pbar_txt, (imgs, labels)) in enumerate(pbar_ins(self.loader.train)):

            # load to device
            imgs = imgs.to(self.option.device)
            # label dtype을 loss_fn에 맞게 수정 및 shape 등 변형
            labels = self.option.label_dtype_fn(labels).reshape(-1, 1).to(self.option.device)
            # 모델 학습 및 평활된 numpy 배열 출력
            loss, output_array, label_array = self._fit_and_flatten_in_one_iter(imgs, labels)
            # 결과 정리
            loss_list.append(loss.item())
            stack_output = np.concatenate((stack_output, output_array), axis=0)
            stack_label = np.concatenate((stack_label, label_array), axis=0)
            # Iterator 종료 시점을 기준으로 log 출력
            _, _ = pbar_ins.end_of_iterator(progress_txt=f"{epoch_log_txt}{pbar_txt}", i=i)
        
        # iterator의 loss 평균, Accuracy출력
        train_loss = np.mean(loss_list)
        train_accuracy = self.option.metrics(predict=stack_output, label=stack_label, accuracy_only=True)
        return train_loss, train_accuracy
    

    def _fit_and_flatten_in_one_iter(
            self, imgs:torch.Tensor, labels:torch.Tensor
        )->Tuple[torch.Tensor, np.ndarray, np.ndarray]:
        """
        모델을 학습하고, 추론된 결과와 label을 1차원 numpy 배열로 출력한다.

        Args:
            imgs (torch.Tensor): Tensor 이미지
            labels (torch.Tensor): Tensor label

        Returns:
            Tuple[torch.Tensor, np.ndarray, np.ndarray]: loss, img 배열, label 배열
        """
        # AMP
        with torch.autocast(device_type=self.option.device, enabled=self.option.use_amp):
            output = self.model(imgs)
            loss = self.option.loss_fn(output, labels)
            
        # back propagation
        self.back_propagation(loss)

        # output과 label을 평활하고 numpy 배열로 변환
        output = self.output_handler(tensor=output, extra_activation_fn=self.option.extra_activation_fn)
        labels = self.output_handler(tensor=labels)

        return loss, output, labels


    @torch.inference_mode()
    def inference(self, loader:data.DataLoader):
        stack_output = np.array([], dtype=np.float32)
        stack_label = np.array([], dtype=np.float32)
        loss_list = []

        # 추론 모드로 설정
        self.model.eval()
        for imgs, labels in loader:

            imgs = imgs.to(self.option.device)
            labels = self.option.label_dtype_fn(labels).reshape(-1, 1).to(self.option.device)
            # 모델 추론 및 평활된 numpy 배열 출력
            loss, output_array, label_array = self._inference_and_flatten(imgs, labels)
            # 결과 정리
            loss_list.append(loss.item()) 
            stack_output = np.concatenate((stack_output, output_array), axis=0)
            stack_label = np.concatenate((stack_label, label_array), axis=0)
            
        # inference의 loss 평균, Accuracy출력
        eval_loss = np.mean(loss_list)
        eval_accuracy = self.option.metrics(predict=stack_output, label=stack_label, accuracy_only=True)
        return eval_loss, eval_accuracy
    

    def _inference_and_flatten(
            self, imgs:torch.Tensor, labels:torch.Tensor
        )->Tuple[torch.Tensor, np.ndarray, np.ndarray]:
        # GPU를 사용하는 동안 추론
        with torch.no_grad():
            output = self.model(imgs)
            loss = self.option.loss_fn(output, labels)

        # 각 배치마다 동기화 GPU 사용이 종료된 후 CPU 사용
        if torch.cuda.is_available():
            torch.cuda.synchronize()

        # output과 label을 평활하고 numpy 배열로 변환
        output = self.output_handler(tensor=output, extra_activation_fn=self.option.extra_activation_fn)
        labels = self.output_handler(tensor=labels)

        return loss, output, labels


    def output_handler(self, tensor:torch.Tensor, extra_activation_fn:Optional[str]=None)->np.ndarray:
        """
        모델 추론 결과를 1차원 numpy 배열로 변환
        >>> 모델의 추론 결과를 추가 활성화 함수(sigmoid, softmax)에 통과시킬 필요가 있다면, 그에 해당하는 활성화 함수에 통과시킴

        Args:
            output (torch.Tensor): 모델의 추론 결과
            extra_activation_fn (Optional[str], optional): 추가 활성화 함수. Defaults to None.
                - 'sigmoid', 'softmax' 존재

        Returns:
            np.ndarray: 1 차원 배열로 변환된 모델의 추론 결과
        """
        # extra_activateion_fn이 존재하는 경우, 그에 해당하는 활성화 함수를 통과 시킴.
        act_fns = {'sigmoid':torch.sigmoid, 'softmax': lambda x: torch.softmax(x, dim=1)}
        act_fn= act_fns.get(extra_activation_fn, None)
        if act_fn is not None:
            tensor = act_fn(tensor)
        
        # numpy로 전환
        np_array = tensor.to('cpu').detach().numpy()
        # 1 차원으로 평활
        return np_array.flatten()
        

In [17]:
TEST_INS = Classification(
    model=model, loader=loader, optimizer=optimizer, 
    back_propagation=back_propagation, option=option
)

In [None]:
class EarlyStopping:
    def __init__(
            self, 
            patience:int=5, delta:float=0.0, mode='min', 
            verbose:bool=False, path:str='checkpoint.pt',
            auto_remove:bool=True
        ):
        # 고정된 instance variable
        self.patience = patience
        self.delta = delta
        self.mode = mode
        self.verbose = verbose
        self.path = path
        self.auto_remove = auto_remove

        # 변하는 instance variable
        self.best_score = None          # 최고 점수
        self.stop = False               # Early stopping으로 학습을 종료할지 여부



    def __call__(self, val_loss:float, model:torch.nn.Module):
        pass


In [19]:
pbar_ins = ProgressBar(header="Epoch    ", time_log=True, memory_log=True, step=1, verbose=0)

acc_dict = {"train":[], "valid":[]}
loss_dict = {"train":[], "valid":[]}

epoch_log_txt = None
for bar_txt, epoch in pbar_ins(range(option.hp_dict['epochs'])):

    # Epochs 시작 시 log txt
    epoch_log_txt = TEST_INS.option.log_ins.epoch_log_txt(epoch_log_txt=epoch_log_txt, bar_txt=bar_txt)

    # model training
    train_loss, train_acc = TEST_INS._fit_iterator(epoch_log_txt)
    loss_dict['train'].append(train_loss)
    acc_dict['train'].append(train_acc)

    # validation
    if TEST_INS.loader.valid is not None:
        valid_loss, valid_acc = TEST_INS.inference(loader=TEST_INS.loader.valid)
        loss_dict['valid'].append(train_loss)
        acc_dict['valid'].append(train_acc)
    else:
        valid_loss, valid_acc = (None, None)

    # 모델 성능에 대한 log txt
    performance_txt = TEST_INS.option.log_ins.log_text(
        train_acc=train_acc, train_loss=train_loss, valid_acc=valid_acc, valid_loss=valid_loss)

    # epoch iteration 종료 시, 로그
    epoch_log_txt, epoch_log_dict = pbar_ins.end_of_iterator(
        progress_txt=bar_txt + "\n" + performance_txt, i=epoch)

Epoch    [#########################################]( 5/ 5)
[Metrics] [Train]: acc: 1.0, loss: 0.0242 / [valid]: acc: 1.0, loss: 0.0156
[time] spent: 0:02:08.74, eta: 0:00:25.74, 25.749s/it, current: 2024.11.19 17:43:34
[memory] used:6.469 gb/62.707 gb(11.6 %), rest:55.430 gb, process:1.914 gb, cpu:50.7 %


In [20]:
# from torch.optim import lr_scheduler


# class Scheduler:
#     def __init__(self, name:str, hp_dict:Dict[str, Any]):
#         self.methods = {
#             'LambdaLR': lr_scheduler.LambdaLR,
#             'MultiplicativeLR': lr_scheduler.MultiplicativeLR,




#             'StepLR': lr_scheduler.StepLR,
#             'MultiStepLR': lr_scheduler.MultiStepLR,
#             'ExponentialLR': lr_scheduler.ExponentialLR,
#             'CosineAnnealingLR': lr_scheduler.CosineAnnealingLR,
#             'CyclicLR': lr_scheduler.CyclicLR,
#             'CosineAnnealingWarmRestarts': lr_scheduler.CosineAnnealingWarmRestarts,
#             'ReduceLROnPlateau': lr_scheduler.ReduceLROnPlateau,
#             'OneCycleLR': lr_scheduler.OneCycleLR,
#             'PolynomialLR': lr_scheduler.PolynomialLR,
#             'LinearLR': lr_scheduler.LinearLR,
#             'ConstantLR': lr_scheduler.ConstantLR,
            
            
            
            
            
#             'ChainedScheduler': lr_scheduler.ChainedScheduler,
#             'SequentialLR': lr_scheduler.SequentialLR,
            
#         }


#     def lambdalr(self):
#         pass


# class Schedulers:
#     def __init__(self):
#         pass