In [1]:
import os
import shutil
from pathlib import Path

try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    DATA_DIR = Path('/content/unpacked/')
    PACK_DIR = Path('/content/drive/My Drive/colab_drive/prepacked.zip')
    shutil.copy(PACK_DIR, '/content/')
    !unzip -o -q /content/prepacked.zip -d {DATA_DIR}
else:
    DATA_DIR= Path(os.path.join(os.getcwd(), "../data/")).resolve()
DATA_DIR

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


PosixPath('/content/unpacked')

In [2]:
import re
import glob
import pandas as pd
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tqdm.notebook import tqdm
from tensorflow import keras
from tensorflow.keras import layers

# CSV 로드 및 정리, 본인 경로에 맞게 변환
CSV_PATH = DATA_DIR / 'whiskies_recategorized.csv'
IMAGE_SIZE = (256, 256)
RANDOM_STATE = 42

In [3]:
df = pd.read_csv(CSV_PATH, dtype={"id": str})
df["id"] = df["id"].astype(str).str.strip().str.replace(r"\.0$", "", regex=True)
df["category"] = df["category"].astype(str).str.strip()
paths = [DATA_DIR / p for p in df["local_full_path"]]

bar = tqdm(paths, desc="Processing Images", unit="img")

# 이미지 로드
X_list = []
for p in bar:
    with Image.open(p) as im:
        im = im.convert("RGB")
        im = im.resize(IMAGE_SIZE)
        arr = np.asarray(im, dtype=np.uint8)
        X_list.append(arr)
X = np.stack(X_list, axis=0)

Processing Images:   0%|          | 0/2943 [00:00<?, ?img/s]

In [4]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import numpy as np

# 라벨 인코딩
labels = df["category"].values
le = LabelEncoder()
y_int = le.fit_transform(labels)

# test 분리
X_rest, X_test, y_rest, y_test = train_test_split(
    X, y_int,
    test_size=0.2,
    #random_state=RANDOM_STATE,
    stratify=y_int
)

# train / valid 분리
X_train, X_valid, y_train, y_valid = train_test_split(
    X_rest, y_rest,
    test_size=0.2,
    #random_state=RANDOM_STATE,
    stratify=y_rest
)

print("X_train:", X_train.shape)
print("X_valid:", X_valid.shape)
print("X_test :", X_test.shape)

print("y_train 분포:", np.bincount(y_train))
print("y_valid 분포:", np.bincount(y_valid))
print("y_test  분포:", np.bincount(y_test))

print("class mapping:", dict(zip(le.classes_, range(len(le.classes_)))))

X_train: (1883, 256, 256, 3)
X_valid: (471, 256, 256, 3)
X_test : (589, 256, 256, 3)
y_train 분포: [316 148  97 468  49 147 155 503]
y_valid 분포: [ 79  37  24 117  12  37  39 126]
y_test  분포: [ 99  46  30 146  15  46  49 158]
class mapping: {'Blended': 0, 'Bourbon': 1, 'Brandy': 2, 'Other': 3, 'Rye': 4, 'SM_40_43': 5, 'SM_43_46': 6, 'SM_G46': 7}


In [5]:
X_train = X_train.astype("float32", copy=False)
X_valid = X_valid.astype("float32", copy=False)
X_test = X_test.astype("float32", copy=False)
X_train /= 255;
X_valid /= 255;
X_test /= 255;

### Pre-exsisting models builder
이미 잘 알려진 모델과의 성능을 비교하는 차원에서, 동일한 데이터로 기성모델을 학습합니다.
예시로써는 MobileNet, EfficientNet등이 있겠습니다.

In [6]:
def build_mobilenet(input_shape=(256,256,3), num_classes=8):
    base = keras.applications.MobileNetV2(
        include_top=False,
        weights="imagenet",
        input_shape=input_shape,
        pooling="avg",
    )
    base.trainable = False
    x = layers.Dense(256, activation="gelu")(base.output)
    x = layers.Dropout(0.3)(x)
    output = layers.Dense(num_classes, activation="softmax")(x)
    model = keras.Model(base.input, output)
    return model

In [7]:
def build_efficientnet_b0(input_shape=(256,256,3), num_classes=8):
    base = keras.applications.EfficientNetB0(
        include_top=False,
        weights="imagenet",
        input_shape=input_shape,
        pooling="avg",
    )
    base.trainable = False
    x = layers.Dense(256, activation="gelu")(base.output)
    x = layers.Dropout(0.3)(x)
    output = layers.Dense(num_classes, activation="softmax")(x)
    model = keras.Model(base.input, output)
    return model

In [8]:
def build_resnet50(input_shape=(256,256,3), num_classes=8):
    base = keras.applications.ResNet50(
        include_top=False,
        weights="imagenet",
        input_shape=input_shape,
        pooling="avg",
    )
    base.trainable = False
    x = layers.Dense(256, activation="gelu")(base.output)
    x = layers.Dropout(0.3)(base.output)
    output = layers.Dense(num_classes, activation="softmax")(x)
    model = keras.Model(base.input, output)
    return model

In [9]:
from sklearn.metrics import f1_score
from tensorflow.keras.callbacks import Callback

class F1ScoreCallback(Callback):
    def __init__(self, X_val, y_val, start_from_epoch=12, patient=3):
        super().__init__()
        self.X_val = X_val
        self.y_val = y_val
        self.f1_scores = [] #this is for cumilating f1 per epoch
        self.start_from_epoch = start_from_epoch
        self.patient = patient
        self.out = 0
        self.best_f1 = -1

    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.X_val, verbose=0)
        y_pred = np.argmax(y_pred, axis=1)

        if self.y_val.ndim == 2:
            y_true = np.argmax(self.y_val, axis=1)
        else:
            y_true = self.y_val

        f1 = f1_score(y_true, y_pred, average='macro')
        self.f1_scores.append(f1)
        logs['val_macro_f1'] = f1

        if f1 > self.best_f1:
            self.best_f1 = f1

        if 1 < epoch and epoch > self.start_from_epoch and f1 < self.f1_scores[-2]:
            print(f"\nNon Improvement detected at EP : {epoch}, f1 : {f1}")
            self.out += 1

        if self.out >= self.patient:
            print(f"\nStopping at EP : {epoch}, f1 : {f1}")
            self.model.stop_training = True

In [10]:
f1_cb = F1ScoreCallback(X_valid, y_valid)

  base = keras.applications.MobileNetV2(


In [16]:
mobilenet_model = build_mobilenet()
mobilenet_model.compile(optimizer= "rmsprop",
              loss = "sparse_categorical_crossentropy",
              metrics=['accuracy'])

  base = keras.applications.MobileNetV2(


In [None]:
mobilenet_model.fit(X_train, y_train, epochs=30, batch_size=128, callbacks=[f1_cb])

Epoch 1/30
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m97s[0m 6s/step - accuracy: 0.2953 - loss: 2.0691 - val_macro_f1: 0.2755
Epoch 2/30
[1m 2/15[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m55s[0m 4s/step - accuracy: 0.4824 - loss: 1.5071 