# 1. 라이브러리 및 데이터 불러오기

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#### tensorflow_addons : f1_score을 쓰기 위해 설치

In [None]:
!pip install tensorflow_addons

In [None]:
# 기본
import numpy as np
import pandas as pd

import os
import glob
import shutil

import math
import random

import matplotlib.pyplot as plt
import seaborn as sns
import cv2

# 이미지 수정
import albumentations as A

# sklearn
from sklearn import preprocessing
from sklearn.model_selection import StratifiedKFold

# tensorflow
import tensorflow as tf
from tensorflow.keras import applications, callbacks
from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import Sequence, to_categorical
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import tensorflow_addons as tfa

# warning 알림 끄기
import warnings
warnings.filterwarnings(action='ignore')

# 2. 모델 생성

## 1) 클래스 및 함수 선언


#### 결과를 재현하기 위한 seed 고정

In [None]:
# seed 고정(random_state=42)
def seed_everything(seed : int = 42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    tf.random.set_seed(seed)

seed_everything(42)

In [None]:
# 각종 파라미터의 디폴트값
Default_param = {
    'IMG_SIZE' : 224,
    'EPOCHS' : 50,
    'LEARNING_RATE' : 1e-3,
    'BATCH_SIZE' : 16,
    'SEED' : 42
}

In [None]:
# csv 파일에서 이미지 파일의 주소를 가져오는 함수
def get_data(df, infer=False):

    # 라벨 데이터(화가 이름)가 없는 경우 이미지 파일의 주소만 가져옴 → test.csv
    if infer:
        return df['img_path'].values

    # 라벨 데이터(화가 이름)가 있는 경우 이미지 파일의 주소와 라벨을 모두 가져옴 → train.csv
    return df['img_path'].values, df['artist'].values 

#### 라벨 인코딩 및 이미지 파일 경로 조정

In [None]:
# train 불러오기
df = pd.read_csv('/content/drive/MyDrive/2nd_project/data/train.csv')

# 라벨 데이터 One-Hot Encoding
le = preprocessing.LabelEncoder()
df['artist'] = le.fit_transform(df['artist'].values)

# test 불러오기
test_df = pd.read_csv('/content/drive/MyDrive/2nd_project/data/test.csv')
test_df['img_path'] = test_df['img_path'].apply(lambda x:'/content/drive/MyDrive/2nd_project' + x[1:])
test_img_paths = get_data(test_df, infer=True)

#### 학습의 속도를 높여주는 scheduler
#### tensorflow에서는 torch와 다르게 scheduler를 제공하지 않으므로 직접 만들어야 함
#### 밑에서는 stepLR을 직접 구현

In [None]:
# scheduler
def lr_step_decay(epoch, lr):
    drop_rate = 0.5
    epochs_drop = 5.0
    return initial_learning_rate * math.pow(drop_rate, math.floor(epoch/epochs_drop))

In [None]:
def step_decay(epoch):
    start = 1e-3
    drop = 0.5
    epochs_drop = 5.0
    lr = start * (drop ** np.floor((epoch)/epochs_drop))
    return lr

lr_scheduler = callbacks.LearningRateScheduler(step_decay, verbose=1)

#### 이미지 데이터가 용량이 커서 한번에 다 불러올 수 없는 문제가 발생
#### → img_path만을 가지고 있다가 필요할 때만 이 클래스를 통해 미니배치마다 이미지를 cv2로 읽어서 가져옴

#### torch와 다르게 tensorflow는 dataloader가 없음
#### 따라서 dataset에서 '__len__'과 '__getitem__'에서 batch size만큼의 데이터를 가져올 수 있게 클래스를 수정

In [None]:
# 데이터를 불러오는 클래스
class CustomDataset(Sequence):
    def __init__(self, img_paths, augmentations, batch_size ,labels=None):
        self.labels = labels
        self.img_paths = img_paths
        self.batch_size = batch_size
        self.augment = augmentations
        self.length = len(img_paths)
        
    def __len__(self):
        return int(np.ceil(len(self.img_paths) / float(self.batch_size)))

    # 지정 배치 크기만큼 데이터를 로드
    def __getitem__(self, idx):
        if self.length >= (idx + 1) * self.batch_size:
            inds = np.arange(idx * self.batch_size,(idx + 1) * self.batch_size)
        else:
            inds = np.arange(idx * self.batch_size,self.length)

        img_path = self.img_paths[inds]
        batch_x = [cv2.cvtColor(cv2.imread(x), cv2.COLOR_BGR2RGB) for x in img_path]

        if self.labels is not None:
            batch_label = self.labels[inds]
      
            # augmentation을 적용해서 numpy array에 stack
            return np.stack([self.augment(image=x)["image"] for x in batch_x], axis=0), np.array(batch_label)
        else:
            return np.array([self.augment(image=x)["image"] for x in batch_x])

#### 이미지 사이즈 조정 및 이미지 데이터 증폭

In [None]:
train_transform = A.Compose(
    [
        A.Resize(p=1, height=224*2, width=224*2),
        A.RandomCrop(p=1,height=224,width=224),
        A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p = 0.5),
        A.OneOf([
                                            A.MotionBlur(p=1),
                                            A.OpticalDistortion(p=1),
                                            A.GaussNoise(p=1)
                ], p= 0.3),
        A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2, always_apply=False, p=0.3),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
        ])

test_transform = A.Compose(
    [
        A.Resize(height=224, width=224),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
        ])

## 2) 학습 진행

### EfficientNet_B4 베이스 모델 + StratifiedKFold
#### 전이학습에 EfficientNet_B4 모델을 사용한 이유 : ResNet 등 다른 모델에 비해 10배 가량 가벼우면서 훨씬 좋은 성능을 보여줌
#### StratifiedKFold : kfold에 라벨의 비율을 적용해서 데이터가 일정한 라벨 비율을 갖도록 하는 방법

#### fold 하나당 소요되는 시간이 매우 길기 때문에 fold 1개를 수행할 때마다 torch 파일로 저장
#### → 학습 모델을 통해 test 데이터를 예측한 결과도 npy 파일로 저장
#### → kfold에서 지정한 split 횟수만큼 반복한 후, 그 결과들을 활용하여 voting 진행
#### → 단순히 예측 결과를 더한 뒤 argmax함수를 적용하는 soft voting(probability voting이라고도 함)

#### torch와 유사한 형태로 구성
#### 여기서는 flatten과 batchnormalization을 주었지만 없어도 무방할 것으로 생각됨
#### torch에 비해 학습이 불안정하며 scheduler나 fold에 따라 그 불안정성이 더 심해지는 경우가 많음
#### scheduler의 파라메터 조정을 잘 해주지 않으면 제대로 된 학습을 하지 못하는 경우가 많음
#### f1_score는 최대 67% ~ 68% 정도 나오지만 중간에 학습이 끊기거나 일부 fold는 수렴이 느려 f1_score가 50% 대에 머물기도 함
#### 또한 torch에 비해 과적합이 심하고 val_loss와 val_f1_score의 차가 극단적으로 큰 경우가 생기며 epoch마다 값의 편차가 무척 큼

In [None]:
skf = StratifiedKFold(n_splits=5, shuffle=False) # 구현, 5 fold, shuffle 안함
t = df.artist # 라벨

# model 폴더 만드는 함수
os.mkdir('drive/MyDrive/2nd_project/model')

# StratifiedKFold 객체는 split에 라벨도 함께 적용하는 것으로, 라벨이 같은 비율을 갖도록 index를 반환해줌
for fold, (train_index, test_index) in enumerate(skf.split(np.zeros(len(t)), t)):

    # 하나의 fold를 실행하는 데 많은 시간이 걸리므로 fold 한 개마다 저장하는 식으로 했음
    if fold!=0:
        print(f'skip {fold}')
        continue

    # train split
    train_df = df.loc[train_index]

    # validation split
    val_df = df.loc[test_index]

    # train 이미지 주소 가져오기
    train_img_paths, train_labels = get_data(train_df)

    # val 이미지 주소 가져오기
    val_img_paths, val_labels = get_data(val_df)

    # One-Hot Encoding
    one_train_labels = tf.keras.utils.to_categorical(train_labels)
    one_val_labels = tf.keras.utils.to_categorical(val_labels)

    train_gen = CustomDataset(train_img_paths, train_transform ,Default_param['BATCH_SIZE'], one_train_labels)
    val_gen = CustomDataset(val_img_paths, train_transform, Default_param['BATCH_SIZE'], one_val_labels)
    test_gen = CustomDataset(test_img_paths, test_transform, Default_param['BATCH_SIZE'])

    # 모델 구성
    model = Sequential([applications.efficientnet.EfficientNetB4(weights='imagenet', include_top=False, input_shape=(224, 224, 3), pooling='avg', classes=1000),
                        Flatten(), BatchNormalization(), Dense(50, activation='softmax')])
    # cos_decay_ann = tf.keras.experimental.CosineDecayRestarts(initial_learning_rate=Default_param['LEARNING_RATE'], first_decay_steps=2, t_mul=1, m_mul=0.99, alpha=1e-5)
    optimizer = tf.keras.optimizers.Adam(learning_rate=Default_param['LEARNING_RATE'])

    # compile
    model.compile(optimizer=optimizer, loss=tf.keras.losses.categorical_crossentropy, metrics=[tfa.metrics.F1Score(num_classes=50, average='macro')])

    # 조기종료
    earlystop = callbacks.EarlyStopping(monitor='val_f1_score', mode='max', patience=10, restore_best_weights=True)

    # train 학습
    model.fit(train_gen,epochs=100, validation_data=val_gen, callbacks=[earlystop, lr_scheduler])
    model.save('/content/drive/MyDrive/2nd_project/kfold/' + str(fold) + '.keras')

    # test 예측
    predict = model.predict(test_gen)
    predict = np.array(predict)
    np.save('/content/drive/MyDrive/2nd_project/kfold/' + str(fold) + 'keras.npy', predict)

# 3. 결과

In [None]:
files = glob.glob('/content/drive/MyDrive/2nd_project/kfold/*keras.npy')
files

In [None]:
predict = np.load(files[0]) + np.load(files[1])
predict.shape

In [None]:
preds = le.inverse_transform(predict.argmax(1))
submit = pd.read_csv('/content/drive/MyDrive/2nd_project/data/sample_submission.csv')
submit['artist'] = preds
submit.to_csv('/content/drive/MyDrive/2nd_project/submit_keras0.csv', index=False)