In [14]:
# 필요한 라이브러리와 모듈을 임포트합니다.
import os
import numpy as np
import pandas as pd
import ast
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from keras_tuner import RandomSearch, HyperModel
from tensorflow.keras.callbacks import EarlyStopping

In [15]:
# 주어진 디렉토리에서 데이터를 로드하고 전처리하는 함수입니다.
def load_and_preprocess_data(directory, sample_fraction=0.01):
    all_drawings = []  # 모든 그림 데이터를 저장할 리스트
    all_labels = []  # 모든 라벨(단어)을 저장할 리스트
    # 디렉토리 내의 모든 파일을 순회합니다.
    for filename in os.listdir(directory):
        if filename.endswith('.csv'):  # CSV 파일인 경우
            filepath = os.path.join(directory, filename)  # 파일 경로 생성
            # CSV 파일을 읽어 들이면서 'drawing'과 'word' 컬럼만 사용합니다.
            data = pd.read_csv(filepath, usecols=['drawing', 'word'], nrows=100)  # 첫 100개의 row만 로드
            data['drawing'] = data['drawing'].apply(ast.literal_eval)  # 'drawing' 컬럼의 문자열을 파이썬 객체로 변환
            all_labels.extend(data['word'].tolist())  # 라벨(단어) 데이터를 리스트에 추가
            for drawing in data['drawing']:  # 각 그림 데이터에 대해
                image = drawing_to_np(drawing, size=64)  # 그림 데이터를 넘파이 배열 이미지로 변환
                all_drawings.append(image)  # 변환된 이미지를 리스트에 추가
    return np.array(all_drawings), np.array(all_labels)  # 넘파이 배열로 변환하여 반환

In [16]:
# 'drawing' 데이터를 넘파이 배열로 변환하는 함수입니다.
def drawing_to_np(drawing, size=64):
    img = np.zeros((256, 256), dtype=np.uint8)  # 초기 이미지 배열 생성
    for stroke in drawing:  # 각 스트로크에 대해
        for x, y in zip(stroke[0], stroke[1]):  # 스트로크의 x, y 좌표에 대해
            img[y, x] = 255  # 해당 좌표를 흰색으로 채움
    img = tf.image.resize(img[..., np.newaxis], (size, size)).numpy()  # 이미지 크기를 조정
    img = img / 255.0  # 이미지를 정규화
    return img.squeeze()  # 불필요한 차원 제거 후 반환

In [25]:
# 하이퍼파라미터를 조정하여 모델을 구축하는 클래스입니다.
class MyHyperModel(HyperModel):
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape  # 모델 입력 형태
        self.num_classes = num_classes  # 분류할 클래스의 수
        
    def build(self, hp):  # 모델 구축 메소드
        model = Sequential([
            Conv2D(
                filters=hp.Int('conv_1_filters', min_value=32, max_value=128, step=32),  # 첫 번째 Conv2D 층의 필터 수를 조정
                kernel_size=(3, 3),  # 커널 크기는 3x3
                activation='relu',  # 활성화 함수는 ReLU
                input_shape=self.input_shape  # 입력 형태 지정
            ),
            MaxPooling2D(2, 2),  # 첫 번째 MaxPooling 층
            Conv2D(
                filters=hp.Int('conv_2_filters', min_value=64, max_value=256, step=32),  # 두 번째 Conv2D 층의 필터 수를 조정
                kernel_size=(3, 3),  # 커널 크기는 3x3
                activation='relu'  # 활성화 함수는 ReLU
            ),
            MaxPooling2D(2, 2),  # 두 번째 MaxPooling 층
            Flatten(),  # Flatten 층
            Dense(
                units=hp.Int('dense_units', min_value=100, max_value=500, step=50),  # Dense 층의 유닛 수를 조정
                activation='relu'  # 활성화 함수는 ReLU
            ),
            Dropout(hp.Float('dropout', min_value=0, max_value=0.5, step=0.1)),  # Dropout 비율을 조정
            Dense(self.num_classes, activation='softmax')  # 출력 층, 클래스 수에 따라 softmax 활성화 함수 사용
        ])
        
        model.compile(optimizer=tf.keras.optimizers.Adam(hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='LOG')),  # 학습률을 조정
                      loss='categorical_crossentropy',  # 손실 함수는 범주형 교차 엔트로피
                      metrics=['accuracy'])  # 평가 지표는 정확도
        return model  # 구축된 모델 반환

In [20]:
# 데이터 로드 및 전처리
images, labels = load_and_preprocess_data('C:\\Users\\OWJ\\Downloads\\data\\train_simplified')
label_encoder = LabelEncoder()  # 라벨 인코더 인스턴스 생성
encoded_labels = label_encoder.fit_transform(labels)  # 라벨을 정수 인코딩
categorical_labels = to_categorical(encoded_labels)  # 정수 인코딩된 라벨을 원-핫 인코딩
X_train, X_val, y_train, y_val = train_test_split(images, categorical_labels, test_size=0.2, random_state=42)  # 데이터를 훈련 세트와 검증 세트로 분할

In [36]:
# X_train과 X_val 배열을 4차원으로 변경
X_train = X_train.reshape(-1, 64, 64, 1)  # -1은 샘플 수를 자동으로 계산하도록 합니다.
X_val = X_val.reshape(-1, 64, 64, 1)

# 데이터 증강 설정
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)

# 데이터 증강 적용
train_generator = datagen.flow(X_train, y_train, batch_size=32)
# 검증 데이터셋에는 rescale만 적용 (증강 적용 X)
val_gen = ImageDataGenerator(rescale=1./255).flow(X_val, y_val, batch_size=32)

In [28]:
# 하이퍼파라미터 튜닝 설정
hypermodel = MyHyperModel(input_shape=(64, 64, 1), num_classes=len(np.unique(labels)))  # 하이퍼모델 인스턴스 생성
tuner = RandomSearch(
    hypermodel,  # 하이퍼모델 지정
    objective='val_accuracy',  # 목표는 검증 세트의 정확도 최대화
    max_trials=10,  # 시도할 최대 트라이얼 수
    executions_per_trial=2,  # 트라이얼 당 실행 횟수
    directory='my_dir',  # 튜닝 로그를 저장할 디렉토리
    project_name='quick_draw_tuning'  # 프로젝트 이름
)

Reloading Tuner from my_dir\quick_draw_tuning\tuner0.json


In [29]:
# EarlyStopping 콜백 설정
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

In [30]:
tuner.search(x=train_generator, epochs=10, validation_data=(X_val, y_val), callbacks=[early_stopping])  # 하이퍼파라미터 검색 시작

In [31]:
# 최적의 하이퍼파라미터로 모델 훈련
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]  # 최적의 하이퍼파라미터 추출
model = tuner.hypermodel.build(best_hps)  # 최적의 하이퍼파라미터로 모델 구축
model.fit(train_generator, epochs=50, validation_data=(X_val, y_val), callbacks=[early_stopping])  # 모델 훈련

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50


<keras.src.callbacks.History at 0x1b893285210>

In [33]:
# 예측된 클래스 확률을 기반으로 MAP@3 점수를 계산하는 함수
def calculate_map3_for_batch(predictions, true_labels):
    total_score = 0.0
    for i in range(predictions.shape[0]):  # 배치 내 각 샘플에 대해
        top_3_preds_indices = np.argsort(predictions[i])[-3:][::-1]  # 상위 3개 예측
        true_label = np.argmax(true_labels[i])  # 실제 라벨
        score = 0.0
        if true_label in top_3_preds_indices:  # 실제 라벨이 상위 3개 예측에 포함되는 경우
            rank = np.where(top_3_preds_indices == true_label)[0][0] + 1  # 랭크 계산
            score = 1.0 / rank  # 점수 계산
        total_score += score
    map3 = total_score / predictions.shape[0]  # 평균 점수 계산
    return map3

In [34]:
# 검증 데이터 제너레이터를 사용하여 모델의 MAP@3 점수를 계산하는 함수
def evaluate_model_with_map3(model, validation_generator, X_val):
    total_map3_score = 0.0
    num_batches = 0
    validation_steps = len(X_val) // validation_generator.batch_size  # 처리할 전체 배치 수 계산
    
    for batch_images, batch_labels in validation_generator:  # 검증 데이터 제너레이터에서 배치를 반복해서 가져옴
        predictions = model.predict(batch_images)  # 현재 배치에 대한 예측
        map3_score = calculate_map3_for_batch(predictions, batch_labels)  # 현재 배치에 대한 MAP@3 점수 계산
        total_map3_score += map3_score  # 총 점수에 추가
        num_batches += 1
        
        if num_batches >= validation_steps:  # 모든 배치를 처리했으면 종료
            break
    
    average_map3_score = total_map3_score / num_batches  # 평균 MAP@3 점수 계산
    return average_map3_score

In [37]:
# 모델과 검증 데이터셋의 제너레이터를 함수에 전달하여 평가
average_map3_score = evaluate_model_with_map3(model, val_gen, X_val)
print(f"Average MAP@3 Score across all batches: {average_map3_score}")  # 평균 MAP@3 점수 출력

Average MAP@3 Score across all batches: 0.003242924528301886
