# **Title Name :  개 VS 고양이 이진분류**

<p style="font-weight:bolder; font-size : 21px">
   RegDate : 2023.12.18
<p>

------------------------------------------------------------

In [1]:
#===============================================================================
#                               초기 입력값 설문조사
#===============================================================================

# ▶ 프로젝트의 카테고리는?
PROJECT_CATEGORY = 'classification'

# ▶ 프로젝트 파일의 버전은?
MODELING_VERSION = 'v0.0.3'

# ▶ 사용할 모델은?
MODEL_NAME = 'efficientnet_b4'

# ▶ Batch 를 몇으로 지정할까요?
BATCH = 32

# ▶ Epochs를 몇으로 지정할까요?
EPOCHS = 100

# [!]압축이 필요한 데이터셋입니까?
NEED_UNZIP = True

# [!]루트경로에 압축을 해제합니까? (코랩전용 : 단발성)
UNZIP_TO_ROOT = True

#======[옵션]=========================================================================

# ▶ 현재 코랩 폴더경로를 복사합니까?(코랩용 : 자동설정 //안쓸경우 주석처리)
COPY_CURRENT_FOLDER_PATH_FOR_COLAB = \
'/content/drive/MyDrive/프로젝트/[CV]개&고양이 분류/'

# ▶ 현재 캐글 대회 주소를 복사합니까?(캐글용 : 자동설정 //안쓸경우 주석처리)
COPY_CURRENT_COMPETITIONS_URL_FOR_KAGGLE = \
'https://www.kaggle.com/competitions/dogs-vs-cats-redux-kernels-edition'
#===============================================================================
#  ⬇⬇⬇ 설문의 상세설명이 필요하다면 `최하단` 부록1의 설명참고 ⬇⬇⬇

# 1. 환경설정
-------------

In [2]:
#===============================================================================
# ▶ [모듈] 불러오기
#===============================================================================

# 시스템
import os
import sys
import random
from glob import glob
from time import time

# 데이터분석
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

#이미지작업
from PIL import Image
import cv2

# 파이토치
from torch import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader , Dataset ,ConcatDataset, random_split
from torchsummary import summary
from torch.autograd import Variable

# CV용 토치비전
import torchvision
from torchvision.transforms import transforms

# 전이학습
import torchvision.models as models


# 사이킷런
import sklearn
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import GroupKFold, train_test_split


# 유틸
import gc
import psutil
from tqdm.auto import tqdm
import warnings
warnings.filterwarnings('ignore')

# 기타추가옵션
import copy
import zipfile

# 시각화테마
sns.set_style(style='white')
plt.style.use('dark_background')

  from .autonotebook import tqdm as notebook_tqdm
  _torch_pytree._register_pytree_node(


In [3]:
#===============================================================================
# ▶ [트리거] 설정모음
#===============================================================================
IS_GOOGLE = True if 'google.colab'                 in sys.modules   else False
IS_KAGGLE = True if 'KAGGLE_KERNEL_RUN_TYPE'       in os.environ    else False
IS_LOCAL  = True if  not (IS_GOOGLE or IS_KAGGLE)                   else False

In [4]:
#===============================================================================
# ▶ [에러] 설정모음
#===============================================================================
class ModelNotFoundError(Exception):
    def __init__(self, model_name):
        self.model_name = model_name
        self.message = f"Model '{model_name}'을 찾을 수 없습니다. \
                         필요하다면 사전에 추가하거나 오탈자를 확인하세요."
        super().__init__(self.message)
    def __str__(self):
        return self.message

class ZipFileNotFoundError(Exception):
    def __init__(self, base_path):
        self.base_path = base_path
        self.message = f"'{base_path}' 경로에 압축 파일이 없거나,\
                          지원하지 않는 확장자입니다."
        super().__init__(self.message)

    def __str__(self):
        return self.message

In [5]:
#===============================================================================
# ▶ [폴더경로] 설정모음
#===============================================================================
class Directory():

    def __init__(self):
        self._creat_working_folders = None
        self._root_path = None
        self._working_path = None
        self._temp_path = None
        self._save_path = None
        self._base_path = None
        self._unzip_path = None
        self._zip_path = None
        self._train_path = None
        self._test_path = None
        self._save_model_pt = None
        self._save_submission_csv = None
        self._save_model_pt_by_10_epoch_in_temp = None
        self.copied_raw_path  = COPY_CURRENT_FOLDER_PATH_FOR_COLAB \
            if IS_GOOGLE else COPY_CURRENT_COMPETITIONS_URL_FOR_KAGGLE \
            if IS_KAGGLE else None


    @property
    def creat_working_folders(self):
        if self._creat_working_folders is None:
            self._creat_working_folders = create_working_folders(self.working_path)
        return self._creat_working_folders


    @property
    def root_path(self):
        if self._root_path is None:
            self._root_path = get_root_path()
        return self._root_path

    @property
    def working_path(self):
        if self._working_path is None:
            self._working_path = get_working_path()
        return self._working_path

    @property
    def temp_path(self):
        if self._temp_path is None:
            self._temp_path = get_temp_path(self.working_path)
        return self._temp_path

    @property
    def save_path(self):
        if self._save_path is None:
            self._save_path = get_save_path(self.working_path)
        return self._save_path

    @property
    def base_path(self):
        if self._base_path is None:
            self._base_path = get_base_path(self.working_path)
        return self._base_path

    @property
    def unzip_path(self):
        if self._unzip_path is None:
            self._unzip_path = get_unzip_path()
        return self._unzip_path

    @property
    def zip_path(self):
        if self._zip_path is None:
            self._zip_path = get_zip_path(self.working_path)
        return self._zip_path

    @property
    def train_path(self):
        if self._train_path is None:
            self._train_path = get_train_path(self.working_path)
        return self._train_path

    @property
    def test_path(self):
        if self._test_path is None:
            self._test_path = get_test_path(self.working_path)
        return self._test_path

    @property
    def save_model_pt(self):
        if self._save_model_pt is None:
            self._save_model_pt = f'[Category_{PROJECT_CATEGORY}]EPC{EPOCHS}_BAT{BATCH}_{MODEL_NAME}_{MODELING_VERSION}.pt'
        return self._save_model_pt

    @property
    def save_model_pt_by_10_epoch_in_temp(self):
        if self._save_model_pt_by_10_epoch_in_temp is None:
            self._save_model_pt_by_10_epoch_in_temp = f'EPC{EPOCHS}_BAT{BATCH}_{MODEL_NAME}_{MODELING_VERSION}.pt'
        return self._save_model_pt_by_10_epoch_in_temp

    @property
    def save_submission_csv(self):
        if self._save_submission_csv is None:
            self._save_submission_csv = f'[Category_{PROJECT_CATEGORY}]EPC{EPOCHS}_BAT{BATCH}_{MODEL_NAME}_Submission.csv'
        return self._save_submission_csv
# 객체인스턴스생성
directory = Directory()

In [6]:
#===============================================================================
# ▶ [딕셔너리] 설정모음
#===============================================================================
class Dict():
    def __init__(self):
        self.dictionary = None

    @property
    def train_path(self):
        if self.dictionary is None:
            self.dictionary =\
            {
                'zipfiles': ['train.zip', 'test.zip'],
            }
        return self.dictionary
# 객체인스턴스생성
dictionary = Dict()

In [7]:
#===============================================================================
# ▶ [유틸함수] 설정모음
#===============================================================================
# 구분자설정
def get_seperater() :
    return os.path.sep


# 라이브러리 설치 설정(코랩,캐글)
def install_modules() :
    if not IS_LOCAL :
        '''
        ⬇⬇⬇설치할 라이브러리를 추가⬇⬇⬇
        '''
        !pip install -qqq timm
        !pip install -qqq efficientnet_pytorch
        import timm
        import efficientnet_pytorch
        print('module install complete')

# 코랩 마운트설정
def mount_colab() :
    from google.colab import drive
    drive.mount('/content/drive')





# 루트경로설정(코랩용)
def get_root_path() :
    if IS_GOOGLE :
        root_path = '/content/'
        return root_path
    pass

# 작업장 경로설정
def get_working_path() :
    copied_raw_path = directory.copied_raw_path
    print(copied_raw_path)
    # 코랩용
    if IS_GOOGLE:
        mount_colab()
        parent_path_name = copied_raw_path.split('MyDrive')[1].split('/')[1]
        project_name = copied_raw_path.split('MyDrive')[1].split('/')[2]
        working_path = f'/content/drive/MyDrive/{parent_path_name}/{project_name}'
    # 캐글용
    elif IS_KAGGLE :
        project_name = copied_raw_path.split('competitions')[1].split('/')[1]
        working_path = f'/kaggle/input/{project_name}'
    # 로컬용
    elif IS_LOCAL :
        # working_path = os.getcwd() # 절대경로로 표시됨
        # working_path = './'        # .//data 형태로표시되서 에러발생할수도있음
        working_path = os.path.relpath(os.getcwd()) # '.' 만 출력 './data 형태로 표시됨
    return working_path


# 작업장 폴더생성
def create_working_folders(working_path):
    current_directory = working_path
    folders_to_check = ['save', 'temp', 'data']

    for folder_name in folders_to_check:
        folder_path = os.path.join(current_directory, folder_name)
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
            print(f"폴더 생성 완료 : {folder_path}")
        else:
            print(f"폴더가 이미 존재합니다: {folder_path}")

# 데이터 경로설정
def get_base_path(working_path) :
    sep = get_seperater()
    return working_path + f'{sep}data{sep}'

# 저장용 경로설정
def get_save_path(working_path) :
    sep = get_seperater()
    return working_path + f'{sep}save{sep}'

# 임시용 경로설정
def get_temp_path(working_path) :
    sep = get_seperater()
    return working_path + f'{sep}temp{sep}'

# 압축파일 경로설정
def get_zip_path(working_path) :
    base_path = get_base_path(working_path)
    # 폴더에 압축파일이 없다면, 예외처리
    if not has_zipfile_in(base_path):
        base_path = None
        raise ZipFileNotFoundError(base_path)
    return base_path

# 압축해제 경로설정(root || data)
def get_unzip_path() :
    if IS_GOOGLE and UNZIP_TO_ROOT :
        upzip_path = '/content/'
    else :
        working_path = get_working_path()
        base_path    = get_base_path(working_path)
        upzip_path   = base_path
    return upzip_path

# 압축파일 존재여부
def has_zipfile_in(base_path) :
    answer = False
    ext_list = ['.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.xz', '.tar.gz', '.tar.bz2', '.tar.xz']
    for file_name in os.listdir(base_path) :
        name, ext = os.path.splitext(file_name)
        if ext in ext_list:
            answer = True
            break
    return answer

# 압축해제 설정
def extract_zipfile(zipfiles,zip_path,unzip_path) :
    '''
    args : unzip_path\n
    example :\n
    >>> unzip_data_in(unzip_path->base_path) : data 폴더용\n
    >>> unzip_data_in(unzip_path->root_path) : content 폴더용\n
    '''
    global NEED_UNZIP
    if NEED_UNZIP :
        # 로컬 or 캐글 or 코랩루트 (3 of 1택)
        if  UNZIP_TO_ROOT or not IS_GOOGLE :
            print('압축푸는중..')
            for file_name in tqdm(zipfiles) :
                with zipfile.ZipFile(zip_path + file_name) as target_zip :
                    target_zip.extractall(unzip_path)
                    print(f'{file_name} has been successfully extracted.')
        # 코랩내부
        else :
            %cd {upzip_path}
            print('압축푸는중..')
            for file_name in tqdm(zipfiles) :
                with zipfile.ZipFile(zip_path + file_name) as target_zip :
                    target_zip.extractall(unzip_path)
                    print(f'{file_name} has been successfully extracted.')
            %cd {root_path}

    # 압축해제 비활성화(여러번 압축되지않도록하기위함)
    else :
        print('message : 압축 해제가 이미 완료 되었습니다')
    NEED_UNZIP = False

# 트레인데이터 경로설정
def get_train_path(working_path) :
    if UNZIP_TO_ROOT :
        root_path  = get_root_path()
        train_path = root_path + 'train/'
    else :
        base_path    = get_base_path(working_path)
        train_path   = base_path + 'train/'
    return  train_path

# 테스트데이터 경로설정
def get_test_path(working_path) :
    if UNZIP_TO_ROOT :
        root_path  = get_root_path()
        test_path = root_path + 'test/'
    else :
        base_path    = get_base_path(working_path)
        test_path    = base_path + f'test'
    return  test_path


# 경로 구분자 컨버터설정
def convert_path_separator(path):
    # 기존경로에서 구분자 추출
    original_separator = os.path.sep
    # 로컬(Windows)인지 확인
    is_local = os.name == 'nt' # 'nt'는윈도우
    # 변경할 구분자선택
    new_separator = '\\' if is_local else '/'
    # 기존 구분자를 변경하고 재조립
    converted_path = path.replace(original_separator, new_separator)

    return converted_path






# 시드설정
def set_seed(SEED):
    random.seed(SEED)
    np.random.seed(SEED)
    torch.manual_seed(SEED)
    torch.cuda.manual_seed(SEED)
    os.environ['PYTHONHASHSEED'] = str(SEED)

# Cudnn시드 결정론 설정
def set_deterministic():
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark     = False

# 버전 및 코어체크
def print_moule_version(module_name=None) :
    '''
    info : 기본적으로 numpy,pandas,seaborn,pyplot,pytorch의\n
           버전과 현재 cpu코어수를 출력합니다.\n
           추가로 모듈을 인자로 입력받아 버전을 출력할수 있습니다.\n

    dependency : numpy , pandas, seaborn, pyplot, pytorch\n

    example :
            >>> print_moule_version(torchvision)\n
            >>> --------------------------------\n
            >>> selected_Ver   :  0.16.1+cpu버전\n
            >>> --------------------------------\n
    '''
    print('-'*20+'기본모듈'+'-'*20)
    print(f'numpy_Ver   :  {np.__version__}버전')
    print('-'*50)
    print(f'pandas_Ver  :  {pd.__version__}버전')
    print('-'*50)
    print(f'seaborn_Ver :  {sns.__version__}버전')
    print('-'*50)
    print(f'torch_Ver   :  {torch.__version__}버전')
    if module_name :
        print('-'*20+'선택한모듈'+'-'*20)
        print(f'selected_Ver   :  {module_name.__version__}버전')
    print('-'*21+'CPU코어수'+'-'*21)
    print(f'cpu_count   :  {os.cpu_count()}코어')
    print('-'*50)


# 메모리청소
def clean_memory():
    gc.collect()
    torch.cuda.empty_cache()

    # 메모리 정보 확인
    mem = psutil.virtual_memory()

    # 메모리사용량 90이상일 경우 Swap메모리 클리닝(SSD,HDD등...)
    if mem.percent > 90:
        print("현재 사용 중인 메모리가 너무 높습니다!\nSwap 메모리를 비웁니다")
        try:
            psutil.swap_memory()
        except psutil.NoSuchProcess:
            print("해당 프로세스가 존재하지 않습니다!")

    print("메모리 청소 완료")


# 시간계산
def get_elapsed_time(start_time,end_time) :
    elapsed_time =  end_time - start_time
    hours   = int(elapsed_time // 3600)
    minutes = int((elapsed_time % 3600) // 60)
    seconds = int(elapsed_time % 60)
    print(f' 총 소요시간 : {hours}시간 {minutes}분 {seconds}초')


# 모델별 적정 이미지사이즈 로드
def get_img_size_for_each_model(MODEL_NAME) :
    if MODEL_NAME   == 'efficientnet_b0' :
        img_size_h, img_size_w  = 224,224
    elif MODEL_NAME == 'efficientnet_b1' :
        img_size_h, img_size_w  = 240,240
    elif MODEL_NAME == 'efficientnet_b2' :
        img_size_h, img_size_w  = 260,260
    elif MODEL_NAME == 'efficientnet_b3' :
        img_size_h, img_size_w  = 300,300
    elif MODEL_NAME == 'efficientnet_b4' :
        img_size_h, img_size_w  = 380,380
    elif MODEL_NAME == 'efficientnet_b5' :
        img_size_h, img_size_w  = 456,456
    else : # 디폴트값
        img_size_h, img_size_w  = 256,256
    return img_size_h, img_size_w




# 데이터로더 에서 shape출력
def get_shape_from_dataloader(dataloader) :
    for batch, (img_tensor, label) in enumerate(dataloader):
        print('dataloder : ')
        print(f'\t shape of tensor X [N,C,H,W] : {img_tensor.shape}')
        print(f'\t shape of tensor y           : {label.shape}')
        # print("X->img tensor1: ", X)                                                           # X      가 3채널의 텐서정보? + 배치사이즈
        # print("X->img tensor1: ", X[0])                                                        # X[0]1개가 3채널의 텐서정보?, X[0]이 4개까지확인 가능
        # print("X->img tensor2: ", X[0][0])                                                     # X[0]3개가 2채널의 텐서정보?, X[0]이 4개까지확인 가능
        # print("X->img tensor3: ", X[0][0][0])                                                  # X[0]3개가 1채널의 텐서정보?, X[0]이 4개까지확인 가능
        print("\t X->img tensor4: ", img_tensor[0][0][0][0])                                     # X[0]4개가 1픽셀의 스칼라값?, X[0]이 4개까지확인 가능
        print("\t y->label      : \n\t", label)                                                  # y      가 개인지 고양이인지 이진분류값의 `텐서`모음
        print("\t y->label size   : ", len(label),'=batch_size')
        # print("y->label      : ", y[0])
        print()
        break



# 작업환경 설정
def set_working_environ() :
    mount_colab()
    directory = Directory()
    root_path = directory.root_path # <- 폴더세팅 start
    working_path = directory.working_path
    base_path = directory.base_path
    create_working_folders(base_path)
    save_path = directory.save_path
    temp_path = directory.temp_path
    zip_path = directory.zip_path
    unzip_path = directory.unzip_path
    zipfiles =  ['train.zip','test.zip']
    extract_zipfile(zipfiles,zip_path,unzip_path)

    print('working enviroment setting complete.')

In [8]:
#===============================================================================
# ▶ [파라미터] 설정모음
#===============================================================================
class Config :
    seed = 2023
    deterministic = True

    num_workers = int(os.cpu_count())-6 if IS_LOCAL else int(os.cpu_count())
    batch_size = BATCH  # 최상단 설문
    lerning_rate = 1e-3
    epochs = EPOCHS     # 최상단 설문
    img_size_h, img_size_w = get_img_size_for_each_model(MODEL_NAME)  #디폴트값=256
    val_ratio = 0.1
    num_classes = 2

    mean = (0.48805536, 0.45548936, 0.41698721)
    std  = (0.26211068, 0.25550992, 0.25817073)
    # mean = (0.485, 0.456, 0.406)
    # std  = (0.229, 0.224, 0.225)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #.cuda()가 더편함

# 1.1 코드실행
-------------------------------------------

In [9]:
# 모듈설치&로드
install_modules()
import timm
import efficientnet_pytorch

In [10]:
# 환경설정 코드 자동실행
set_working_environ()

ModuleNotFoundError: No module named 'google.colab'

# 2. 전처리
---------------------------------------------------

In [None]:
#===============================================================================
# ▶ [wrong 라벨링] 설정모음
#===============================================================================
noise = [
            'cat.3672', 'cat.4338', 'cat.5351', 'cat.5418', 'cat.7377',
            'cat.7564', 'cat.8456', 'cat.9171', 'cat.10029', 'cat.10712',
            'cat.11184', 'cat.12272',
            'dog.1043', 'dog.1194', 'dog.1773', 'dog.2614', 'dog.4367',
            'dog.5604', 'dog.6475', 'dog.8736', 'dog.9517', 'dog.10237',
            'dog.10747', 'dog.10801', 'dog.11299', 'dog.12376'
        ]
cat_dog = [
            'cat.724', 'cat.1450', 'cat.2159', 'cat.3731', 'cat.3738',
            'cat.3822', 'cat.4104', 'cat.4688', 'cat.5355', 'cat.5583',
            'cat.7194', 'cat.7920', 'cat.9444', 'cat.9882', 'cat.10181',
            'cat.10266', 'cat.10863', 'cat.11222', 'cat.11724',
            'dog.8507', 'dog.11538'
           ]
photo_unavailable = ['cat.11565', 'dog.2877', 'dog.10401', 'dog.10797']

# 삭제하거나 label 변경이 필요한 것
to_delete = noise + cat_dog + photo_unavailable
to_cat = ['dog.4334', 'dog.11731']
to_dog = ['cat.4085']

In [None]:
#===============================================================================
# ▶ [전처리함수] 설정모음
#===============================================================================

# 데이터 추출
def get_raw_files() :
    directory = Directory()
    train_list = sorted(glob(os.path.join(directory.train_path,'*.jpg')))
    test_list  = sorted(glob(os.path.join(directory.test_path,'*.jpg')))
    print('train size / test size : ','전처리전(25000)','/', len(test_list))
    print('train size / test size : ', len(train_list),'/', len(test_list))
    return train_list, test_list

# 개, 고양이 클래스추출(for train)
def get_class_category_for_train(raw_file):
    file_name = os.path.basename(raw_file)
    category, img_id, _ = file_name.split(".")
    if category == 'dog':
        return 1
    else:
        return 0


# 고유이미지번호(ids) 추출(for test)
def get_ids_for_test(raw_file) :
    file_name = os.path.basename(raw_file)
    if  IS_LOCAL :
        img_id=  int(file_name.split('\\')[-1].split('.')[0])
    else :
        img_id= int(file_name.split('/')[-1].split('.')[0])
    return img_id

# 이미지 데이터 라벨링 전처리(삭제&이름변경)
def preprocess_image(train_path, to_delete, to_cat, to_dog):
    if  UNZIP_TO_ROOT:
        for fname in to_delete:
            os.remove(train_path + fname + '.jpg')
        for fname in to_cat:
            f2name = fname.replace('dog', 'cat2')
            os.rename(train_path + fname + '.jpg', train_path + f2name + '.jpg')
        for fname in to_dog:
            f2name = fname.replace('cat', 'dog2')
            os.rename(train_path + fname + '.jpg', train_path + f2name + '.jpg')
    else :
        pass

# 이미지 메타데이터 확인
def get_metadata_of_image(raw_files) :
    raw_file = raw_files[Config.seed]
    img_info = Image.open(raw_file)
    return img_info

# 이미지데이터 채널 가져오기
def get_channel_size(raw_file) :
    img_info = Image.open(raw_file)
    return 3 if img_info.mode == 'RGB' else 1


In [None]:
#===============================================================================
# ▶ [증강함수] 설정모음 (!warn :This is agumentation, it'll increase total images)
#===============================================================================
# 이미지 출력
def show_img(raw_file) :
    img = cv2.imread(raw_file)
    plt.imshow(img)

# 이미지 리사이즈
def resize_img(raw_file) :
    h = Config.img_size_h
    w = Config.img_size_w
    img = cv2.imread(raw_file)
    resize_img = cv2.resize(img, dsize=(h, w))
    print(resize_img)
    return resize_img

# 이미지 플립
def flip_img(raw_file):
    flipped = cv2.flip(raw_file, 1)
    return flipped

# 이미지 노이즈추가
def add_noise_img(raw_file):
    channel_size = get_channel_size(raw_file)
    h = Config.img_size_h
    w = Config.img_size_w
    noise = ( (h/6) * np.random.random((h, w,channel_size)) )
    raw_file = raw_file + noise
    raw_file[raw_file>255] = 255  # 234개만 노이즈생성
    return raw_file

# 증강한 이미지출력
def show_agumented_img(raw_file) :

    # 리사이즈, Flip, 노이즈 추가 이미지를 한 번에 출력
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))

    # 이미지 리사이즈
    resize_version = resize_img(raw_file)
    # Flip 버전 생성
    flip_version = flip_img(resize_version)
    # 노이즈 추가 버전 생성
    noise_version = add_noise_img(resize_version)

    # 원본 이미지
    axes[0].imshow(resize_version)
    axes[0].set_title("Original Image")
    # Flip 이미지
    axes[1].imshow(flip_version)
    axes[1].set_title("Flipped Image")
    # 노이즈 추가 이미지
    axes[2].imshow(noise_version)
    axes[2].set_title("Noised Image")

# 3. 데이터셋 생성
---------------------------------------------------

In [None]:
#======================================================
# ▶ [클래스정의] 데이터셋 정의
#======================================================

# 트랜스폼 클래스
class ImageTransform():
    def __init__(self, splite_mode='train'):
        self.split_mode = splite_mode
        if self.split_mode == 'train' :
            self.transform = \
            transforms.Compose \
            ([
                transforms.Resize((Config.img_size_h,Config.img_size_w)),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize(mean=Config.mean,std=Config.std)
            ])
        else : # val, test
            self.transform = \
            transforms.Compose\
            ([
                transforms.Resize((Config.img_size_h,Config.img_size_w)),
                # transforms.CenterCrop(Config.img_size_h,Config.img_size_w),
                transforms.ToTensor(),
                transforms.Normalize(mean=Config.mean,std=Config.std)
            ])
    def __call__(self, img):
        return self.transform(img)

# 데이터셋 클래스
class DogsVsCatsDataset(Dataset) :
    def __init__(self,file_list,transform) :
        self.file_list  = file_list
        self.transform  = transform

    def __len__(self) :
        return len(self.file_list)

    def __getitem__(self, idx):
        raw_file = self.file_list[idx]

        # 이미지텐서화
        img_info = Image.open(raw_file)
        img_tensor = self.transform(img_info)

        # 이미지라벨링
        if self.transform.split_mode == 'test' :
            ids = get_ids_for_test(raw_file)
            return img_tensor, ids
        else :
            label = get_class_category_for_train(raw_file)
            return img_tensor, label




In [None]:
#======================================================
# ▶ Dataset & Loader 객체생성 (트/테/발 설정)
#======================================================
train_list, test_list = get_raw_files()
# preprocess_image(directory.train_path, to_delete, to_cat, to_dog)
train_list, val_list  = train_test_split(train_list, test_size = Config.val_ratio, random_state=Config.seed, shuffle=True)

train_dataset = DogsVsCatsDataset(train_list,ImageTransform(splite_mode='train'))
val_dataset   = DogsVsCatsDataset(val_list,ImageTransform(splite_mode='val'))
test_dataset  = DogsVsCatsDataset(test_list,ImageTransform(splite_mode='test'))

train_loader = DataLoader(train_dataset,Config.batch_size,shuffle=True)
val_loader   = DataLoader(val_dataset,Config.batch_size,shuffle=False)
test_loader  = DataLoader(test_dataset,Config.batch_size,shuffle=False)

/content/drive/MyDrive/프로젝트/[CV]개&고양이 분류/
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
train size / test size :  전처리전(25000) / 12500
train size / test size :  25000 / 12500


In [None]:
# 훈련 데이터로더 shape 체크
get_shape_from_dataloader(train_loader)

dataloder : 
	 shape of tensor X [N,C,H,W] : torch.Size([32, 3, 456, 456])
	 shape of tensor y           : torch.Size([32])
	 X->img tensor4:  tensor(1.5642)
	 y->label      : 
	 tensor([0, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0,
        1, 1, 1, 0, 0, 1, 1, 1])
	 y->label size   :  32 =batch_size



# 4. 전이학습(with 커스텀튜닝)
--------------------------

In [None]:
#===============================================================================
# ▶ [모델링함수] 설정모음
#===============================================================================

# 전이학습 모델 로드
def get_transfer_model(MODEL_NAME,num_classes, model_dict=None):
    # 사전에서 모델을 로드
    if  MODEL_NAME in model_dict:
        return model_dict[MODEL_NAME]

    # torchvision에서 모델을 로드
    elif hasattr(models, MODEL_NAME):
        return getattr(models, MODEL_NAME)(pretrained=True)

    # timm에서 모델을 로드.
    try:
        return timm.create_model(MODEL_NAME, pretrained=True, num_classes=num_classes)
    except ValueError:
        pass

    # 어떤 방법으로도 모델을 가져오지 못했을 경우 예외발생
    raise ModelNotFoundError

# 모델 프리셋
def get_model_dict(num_classes) :
    model_dict =\
    {
        'efficientnet_b0': timm.create_model(MODEL_NAME, pretrained=True, num_classes=num_classes),
        'efficientnet_b1': timm.create_model(MODEL_NAME, pretrained=True, num_classes=num_classes),
        'efficientnet_b2': timm.create_model(MODEL_NAME, pretrained=True, num_classes=num_classes),
        'efficientnet_b3': timm.create_model(MODEL_NAME, pretrained=True, num_classes=num_classes),
        'efficientnet_b4': timm.create_model(MODEL_NAME, pretrained=True, num_classes=num_classes),
        'efficientnet_b5': timm.create_model(MODEL_NAME, pretrained=True, num_classes=num_classes),
        'resnet50'       : models.resnet50,
    }
    return model_dict

# 프리징(분류문제용:EfficientNet)
def freeze_pretrained_model_for_classifier(model,model_dict,MODEL_NAME='efficientnet_b1'):
    """
    Info : 사전 훈련된 모델의 파라미터를 프리징 합니다.\n
           classifier 레이어만 훈련하고 나머지 레이어는 훈련하지 않습니다.\n
           모델명이 arguments에 없을경우 디폴트값으로\n
           `efficientnet_b1`이 설정됩니다.\n

    Args:
        model: Pre-trained model, \n
        model_dict : model dictionary preset\n
        MODEL_NAME : source model name

    Returns:
        params_to_freeze   : 훈련하지 않을 파라미터 리스트
        params_to_optimizer: 훈련할 파라미터 리스트
    """
    # 인자로들어온 모델명이 있는지 모델 딕셔너리와 비교후, final layer식별자 지정
    if any(keyword in MODEL_NAME for keyword in model_dict):
        final_layer = "classifier"
    else:
        final_layer = None  # 모델추가예정 e.g. _fc, head, ...

    # classifier층 이름 얻기(일반적으로 분류문제인경우 마지막 계층 식별)
    classifier_names = [name for name, _ in model.named_parameters() if final_layer in name]

    # 동결할 파라미터 리스트생성
    params_to_freeze = [param for name, param in model.named_parameters() if name not in classifier_names]

    # 훈련할 파라미터 리스트생성
    params_to_optimizer = [param for name, param in model.named_parameters() if name in classifier_names]

    # 훈련하지 않을 파라미터를 프리징
    for param in params_to_freeze:
      param.requires_grad = False
    # 훈련시킬 파라미터는 optimizer로 보냄
    for param in params_to_optimizer :
      param.requires_grad = True

    return params_to_freeze, params_to_optimizer


# 커스텀분류층 추가(옵션)
def add_custom_classifier(model) :
    # 모델 파라미터 커스터마이징
    num_features = model.classifier[1].in_features
    model.classifier = nn.Sequential()

    # 커스텀분류층 추가
    custom_classifier = nn.Sequential(
                                        nn.Linear(int(num_features), 512),
                                        nn.ReLU(),

                                        nn.Linear(512, 2),
                                        nn.Sigmoid() # or Softmax()
                                    )
    # 새로운층을 적용
    model.classifier = custom_classifier


# 파라미터 플로우 브리핑
def show_summary_of_params(model) :
    train_list,test_list = get_raw_files()
    channel_size = get_channel_size(train_list[Config.seed])
    summary(model,(channel_size,Config.img_size_h,Config.img_size_w))


In [None]:
#===============================================================================
# ▶ 모델링 코드진행
#===============================================================================
model_dict = get_model_dict(Config.num_classes)
model = get_transfer_model(MODEL_NAME,Config.num_classes,model_dict)
model = model.cuda()
params_to_freeze, params_to_optimizer = freeze_pretrained_model_for_classifier(model,model_dict,MODEL_NAME)
show_summary_of_params(model)

model.safetensors:   0%|          | 0.00/122M [00:00<?, ?B/s]

/content/drive/MyDrive/프로젝트/[CV]개&고양이 분류/
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
train size / test size :  전처리전(25000) / 12500
train size / test size :  25000 / 12500
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 48, 228, 228]           1,296
          Identity-2         [-1, 48, 228, 228]               0
              SiLU-3         [-1, 48, 228, 228]               0
    BatchNormAct2d-4         [-1, 48, 228, 228]              96
            Conv2d-5         [-1, 48, 228, 228]             432
          Identity-6         [-1, 48, 228, 228]               0
              SiLU-7         [-1, 48, 228, 228]               0
    BatchNormAct2d-8         [-1, 48, 228, 228]              96
            Conv2d-9             [-1, 12, 1, 1]             588
             SiLU-10     

# 5. 훈련 & 검증 평가
-------------------------------------------------------

In [None]:
#===============================================================================
# ▶ [훈련함수] 설정모음
#===============================================================================

# optimizer 생성
optimizer = optim.Adam(params_to_optimizer,Config.lerning_rate)

# loss function 생성
criterion = nn.CrossEntropyLoss()


In [None]:
#======================================================
# ▶ 훈련함수 정의
#======================================================
def fit(model, train_loader, val_loader):
    start = time()
    clean_memory()
    print('='*70)

    # 공통 평가지표 초기화
    best_train_score = 0.0
    best_val_score = 0.0
    val_correct = 0.0

    # 미니배치 트레이닝
    for epoch in tqdm(range(Config.epochs)):
        # 훈련 평가지표 초기화
        total_loss = 0.0
        n_correct = 0.0

        # 훈련모드
        model.train()

        # [훈련용] 반복추출
        for batch_idx, (tensor, label) in enumerate(train_loader):

            # GPU용 검증 파라미터(CrossEntrophy)
            tensor = tensor.cuda()
            label = label.cuda()

            # 순전파
            output = model(tensor)
            loss = criterion(output, label)

            # 역전파
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # CrossEntrophy용 정확도(맞은갯수 예측)
            predicted = torch.max(model(tensor), dim=1)[1]
            n_correct += (predicted == label).sum()

            # 결과 계산
            total_loss += loss.item()
            average_loss = total_loss / len(train_loader)
            str_train_accuracy = f'{batch_idx*Config.batch_size} / {len(train_loader.dataset)}'
            train_accuracy = float(n_correct * 100) / float(Config.batch_size * (batch_idx+1))

            # 결과 출력
            if batch_idx % 400 == 0:
                print('-'*70)
                print(f'Epoch : {epoch}')
                print(f'[{str_train_accuracy}]\t Train Accuracy(정확도):{train_accuracy:.2f}%')
                print(f'\t\t Train Average Loss (평균손실률) :{average_loss:.2f}')

        #======================================================
        # ▶ 검증!
        #======================================================
        # 그래디언트 초기화
        with torch.no_grad():

            # 평가모드
            model.eval()
            # 검증 평가지표 초기화
            val_total_loss = 0.0
            val_correct = 0.0

            # [검증용] 반복추출
            for batch_idx, (tensor, label) in enumerate(val_loader):

                # GPU용 검증 파라미터(CrossEntrophy)
                tensor = tensor.cuda()
                label = label.cuda()

                # 순전파
                val_output = model(tensor)
                val_loss = criterion(val_output, label)

                # CrossEntrophy용 정확도(맞은갯수 예측)
                val_predicted = torch.max(model(tensor), dim=1)[1]
                val_correct += (val_predicted == label).sum()

                # 결과 계산
                val_total_loss += val_loss.item()
                val_average_loss = val_total_loss / len(val_loader)
                str_val_accuracy = f'{batch_idx*Config.batch_size} / {len(val_loader.dataset)}'
                val_accuracy = float(val_correct * 100) / float(Config.batch_size * (batch_idx+1))

                # 결과 출력
                if batch_idx % 100 == 0:
                    print('-'*70)
                    print(f'Epoch : {epoch}')
                    print(f'[{str_val_accuracy}]\t Validation Accuracy(정확도):{val_accuracy:.2f}%')
                    print(f'\t\t Validation Average Loss (평균손실률) :{val_average_loss:.2f}')

        if epoch % 5 == 0 :
            torch.save(model.state_dict(),directory.temp_path+f'[STOPED_EPC_{epoch}]'+directory.save_model_pt_by_10_epoch_in_temp)
            print(f"epoch:{epoch}\ntemp폴더에 모델 저장 성공!")


        print('='*70)
        if train_accuracy >= best_train_score:
            best_train_score = train_accuracy
            best_train_loss = average_loss
            print(f'Best Train Acc: {best_train_score:.2f}%')
            print(f'Best Train Loss: {best_train_loss:.2f}')

        print('='*70)
        if val_accuracy >= best_val_score:
            best_val_score = val_accuracy
            best_val_loss = val_average_loss
            print(f'Best Val Acc: {best_val_score:.2f}%')
            print(f'Best Val Loss: {best_val_loss:.2f}')

    print('='*70)
    clean_memory()
    end = time()
    print('='*70)
    return start, end

In [None]:
zipfiles = ['train.zip','test.zip']
extract_zipfile(zipfiles,directory.zip_path,directory.unzip_path)

/content/drive/MyDrive/프로젝트/[CV]개&고양이 분류/
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
message : 압축 해제가 이미 완료 되었습니다


In [None]:
# 훈련&검증 실행
start_time, end_time = fit(model, train_loader, val_loader)
get_elapsed_time(start_time,end_time)

메모리 청소 완료


  0%|          | 0/100 [00:00<?, ?it/s]

----------------------------------------------------------------------
Epoch : 0
[0 / 22500]	 Train Accuracy(정확도):62.50%
		 Train Average Loss (평균손실률) :0.00
----------------------------------------------------------------------
Epoch : 0
[12800 / 22500]	 Train Accuracy(정확도):88.28%
		 Train Average Loss (평균손실률) :0.39
----------------------------------------------------------------------
Epoch : 0
[0 / 2500]	 Validation Accuracy(정확도):100.00%
		 Validation Average Loss (평균손실률) :0.00
Best Train Acc: 91.18%
Best Train Loss: 0.49
Best Val Acc: 96.12%
Best Val Loss: 0.16
----------------------------------------------------------------------
Epoch : 1
[0 / 22500]	 Train Accuracy(정확도):96.88%
		 Train Average Loss (평균손실률) :0.00
----------------------------------------------------------------------
Epoch : 1
[12800 / 22500]	 Train Accuracy(정확도):96.50%
		 Train Average Loss (평균손실률) :0.10
----------------------------------------------------------------------
Epoch : 1
[0 / 2500]	 Validation Accurac

KeyboardInterrupt: ignored

In [None]:
#======================================================
# ▶ 모델저장
#======================================================
torch.save(model.state_dict(),directory.save_path+directory.save_model_pt)
print("모델 저장 성공!")

# 6. 예측(Inference)
----------------------------------------------------------

In [None]:
#======================================================
# ▶ 모델불러오기 (모델 아키텍쳐를동일하게 세팅)
#======================================================
model.load_state_dict(torch.load(directory.save_path+directory.save_model_pt))
print("모델 로드 성공!")

In [None]:
#======================================================
# ▶ 예측함수 정의
#======================================================
# 예측평가
def evaluate(model, test_loader):
    start = time()
    clean_memory()
    print('-'*70)

    # 평가모드
    model.eval()

    iid_list = []
    preds_list = []

    # 그래디언트 초기화
    with torch.no_grad():

        for img_tensor, ids in tqdm(test_loader):
            ch = img_tensor.shape[0]
            h  = img_tensor.shape[1]
            w  = img_tensor.shape[2]
            img_tensor = img_tensor.reshape(1, ch, h,w).cuda() # (1 of batch_size, channel_size, height, width)
            outputs = model(img_tensor)
            preds = F.softmax(outputs, dim=1)[:,1]

            # 반환용 리스트
            iid_list.append(ids)
            preds_list += preds.tolist()

    print("="*70)
    clean_memory()
    end = time()
    print("_"*70)
    get_elapsed_time(start,end)
    return iid_list, preds_list

# submmision제출
def create_and_save_submission_csv(iid_list, preds_list):
     submission_df = pd.DataFrame({
         'id': iid_list,
         'label': preds_list
     })

     submission_df.sort_values(by='id', inplace=True)
     submission_df.reset_index(drop=True, inplace=True)
     display(submission_df)
     submission_df.to_csv(Directory.save_path +Directory.save_submission_csv, index=False)
     print('Submission 저장 완료!')

In [None]:
# 예측성능 채점
iid_list, preds_list= evaluate(model, test_dataset)

# 7. 제출(submission)
------------------------------------------------------------

In [None]:
# 예측결과
create_and_save_submission_csv(iid_list,preds_list)