In [1]:
!pip install timm

In [3]:
import os
import random
import gc

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

import tensorflow as tf
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, Input, Concatenate, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers.schedules import ExponentialDecay

from sklearn.model_selection import train_test_split

In [25]:
CONFIG = dict(
    seed = 42,
    img_size = 224,
    channels = 3,
    train_batch_size = 32,
    valid_batch_size = 16,
    learning_rate = 1e-3,
    decay_rate=0.96,
    decay_steps=100
)

AUTOTUNE = tf.data.experimental.AUTOTUNE  
# AUTOTUNE은 tf.data에 런타임에 값을 동적으로 조정하도록 요청합니다 --> 계산시간을 줄인다.

In [6]:
def set_seed(seed):
    np.random.seed(seed)
    random.seed(seed)
    tf.random.set_seed(seed)
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    os.environ['PYTHONHASHSEED'] = str(seed)

set_seed(CONFIG['seed'])

In [8]:
train_dir = '../input/petfinder-pawpularity-score/train'
test_dir = '../input/petfinder-pawpularity-score/test'

# Data Processing

In [10]:
df = pd.read_csv('../input/petfinder-pawpularity-score/train.csv')
df_test = pd.read_csv('../input/petfinder-pawpularity-score/test.csv')

def get_train_file_path(id):
    return f'{train_dir}/{id}.jpg'

def get_test_file_path(id):
    return f'{test_dir}/{id}.jpg'

df['file_path'] = df['Id'].apply(get_train_file_path)
df_test['file_path'] = df['Id'].apply(get_test_file_path)

In [17]:
# 데이터 불러오기 및 프로세싱
def image_processing(is_labelled):
    def augment(image):  # augmentations
        image = tf.image.random_flip_left_right(image)
        image = tf.image.random_saturation(image, 0.95, 1.05)
        image = tf.image.random_contrast(image, 0.95, 1.05)
        return image
    
    def can_be_augmented(img, label):
        return augment(img), label
    
    return can_be_augmented if is_labelled else augment


def image_read(is_labelled):
    def decode(file_path):  # 이미지 불러오기
        image = tf.io.read_file(file_path)
        image = tf.image.decode_jpeg(image, channels=CONFIG['channels'])
        image = tf.cast(image, tf.float32)
        image = tf.image.resize(image, (CONFIG['img_size'], CONFIG['img_size']))
        image = tf.keras.applications.efficientnet.preprocess_input(image)
        # preprocess_input기능은 모델에 필요한 형식에 이미지를 적절하게 맞추기위한 것입니다.
        return image
    def can_be_decoded(file_path, label):
        return decode(file_path), label
    
    return can_be_decoded if is_labelled else decode

# Creating the Dataset
def create_dataset(df, batch_size, is_labelled=False, augment=False, shuffle=False):
    image_read_fn = image_read(is_labelled)
    image_preprocess_fn = image_processing(is_labelled)
    
    if is_labelled:
        dataset = tf.data.Dataset.from_tensor_slices((df['file_path'].values, 
                                                     df['Pawpularity'].values))
    else:
        dataset = tf.data.Dataset.from_tensor_slices((df['file_path'].values))
        
    dataset = dataset.map(image_read_fn, num_parallel_calls=AUTOTUNE)
    dataset = dataset.map(image_preprocess_fn, num_parallel_calls=AUTOTUNE) if augment else dataset
    dataset = dataset.shuffle(1024, reshuffle_each_iteration=True) if shuffle else dataset
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(AUTOTUNE)
    return dataset

In [18]:
# 데이터 나누기
trn, val = train_test_split(df, test_size=0.25, stratify=df['Pawpularity'], shuffle=True, random_state=CONFIG['seed'])

# Dataset 만들기
train = create_dataset(trn, CONFIG['train_batch_size'], is_labelled=True, augment=True, shuffle=True)
valid = create_dataset(val, CONFIG['valid_batch_size'], is_labelled=True, augment=False, shuffle=False)
test = create_dataset(df_test, CONFIG['valid_batch_size'], is_labelled=False, augment=False, shuffle=False)

# CNN Model

In [21]:
# Loading pretrained Efficientnet
img_mod = '../input/keras-applications-models/EfficientNetB0.h5'
efnet = tf.keras.models.load_model(img_mod)

# Layers of efficientnet will not be trained
efnet.trainable = False

In [24]:
# 모델 구성
model = Sequential([
    tf.keras.layers.Input(shape=(CONFIG['img_size'], CONFIG['img_size'], CONFIG['channels'])),  # (224, 224, 3), 이미지 받기
    efnet,
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(units=64, activation='relu'),
    tf.keras.layers.Dense(units=1, activation='relu')
])

# Training

In [27]:
# Early stopping
early_stopping = EarlyStopping(patience=5, restore_best_weights=True, 
                               monitor='val_loss')

lr_scheduler = ExponentialDecay(
    initial_learning_rate=CONFIG['learning_rate'],
    decay_steps=CONFIG['decay_steps'],
    decay_rate=CONFIG['decay_rate'],
    staircase=True
)

In [29]:
# Compiling and Fitting the model
model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=lr_scheduler),
              metrics=[tf.keras.metrics.RootMeanSquaredError()])

predictor = model.fit(train,
                      epochs=20,
                      validation_data = valid,
                      callbacks=[early_stopping])

In [None]:
pred = model.predict(test)

submit = pd.read_csv('../input/petfinder-pawpularity-score/sample_submission.csv')
submit['Pawpularity'] = pred
submit