In [12]:
import os
import cv2
import numpy as np
from glob import glob
from tqdm import tqdm
from ultralytics import YOLO

# 경로 설정
TRAIN_DIR = "D:/데이콘 250519 대회/open/train"
FILTERED_DIR = "D:/데이콘 250519 대회/filtered_train"

# YOLOv8 모델 로드 (기본 모델 사용 가능)
model = YOLO('yolov8n.pt')  # 또는 yolov8s.pt 등

# 필터링 조건 설정
def is_full_vehicle(detections, img_shape):
    h, w = img_shape[:2]

    for det in detections:
        cls = int(det.cls)
        # class 2: car, class 5: bus, class 7: truck
        if cls in [2, 5, 7]:
            x1, y1, x2, y2 = det.xyxy[0].cpu().numpy()
            box_w = x2 - x1
            box_h = y2 - y1
            area = box_w * box_h
            img_area = h * w
            center_x = (x1 + x2) / 2
            center_y = (y1 + y2) / 2

            aspect_ratio = box_w / box_h  # 가로/세로 비율

            # 조건 1: 차량이 중앙 근처에 있어야 함 (좀 더 엄격하게)
            if not (0.35*w < center_x < 0.65*w and 0.35*h < center_y < 0.65*h):
                continue

            # 조건 2: 차량 박스가 이미지 대비 일정 면적 이상 (0.35)
            if area / img_area < 0.35:
                continue

            # 조건 3: 차량 비율 필터링
            # 트렁크나 보닛 아래쪽에서 찍은 사진은 세로가 지나치게 크거나 작음
            # 예) 너무 세로로 긴 박스 (aspect_ratio < 0.8) 이거나 너무 납작한 박스 (aspect_ratio > 2.5) 제거
            if aspect_ratio < 0.8 or aspect_ratio > 2.5:
                continue

            # 위 조건 모두 통과하면 '전체 차량 외관 포함'으로 판단
            return True

    # 검출된 차량 중 조건 충족하는 게 없으면 False
    return False


# 필터링 수행
os.makedirs(FILTERED_DIR, exist_ok=True)
class_dirs = sorted(os.listdir(TRAIN_DIR))

filtered_count = 0
total_count = 0

for cls in tqdm(class_dirs, desc="클래스별 필터링"):
    input_dir = os.path.join(TRAIN_DIR, cls)
    output_dir = os.path.join(FILTERED_DIR, cls)
    os.makedirs(output_dir, exist_ok=True)

    image_paths = glob(os.path.join(input_dir, "*.jpg"))
    for img_path in image_paths:
        total_count += 1
        img = cv2.imread(img_path)
        results = model(img, verbose=False)[0]

        if is_full_vehicle(results.boxes, img.shape):
            filename = os.path.basename(img_path)
            cv2.imwrite(os.path.join(output_dir, filename), img)
            filtered_count += 1

print(f"🔍 전체 이미지 수: {total_count}")
print(f"✅ 필터링된 차량 외관 이미지 수: {filtered_count}")
print(f"🧼 이상치 제거 완료. 정제 비율: {filtered_count / total_count:.2%}")


클래스별 필터링: 100%|██████████| 396/396 [30:58<00:00,  4.69s/it]

🔍 전체 이미지 수: 33137
✅ 필터링된 차량 외관 이미지 수: 31822
🧼 이상치 제거 완료. 정제 비율: 96.03%





In [None]:
import os
import random
import numpy as np
import pandas as pd
from glob import glob
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score

import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.efficientnet import preprocess_input

# ✅ GPU 확인 코드
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"[INFO] {len(gpus)} GPU(s) available: {[gpu.name for gpu in gpus]}")
else:
    print("[INFO] No GPU available. Training will use CPU.")

# Config
CFG = {
    'IMG_SIZE': 224,
    'EPOCHS': 15,
    'LR': 3e-4,
    'BATCH_SIZE': 24,
    'SEED': 2025,
    'FOLDS': 5
}

# Set seed
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

set_seed(CFG['SEED'])

# ✅ 필터링된 이미지 경로 사용
TRAIN_DIR = "D:/데이콘 250519 대회/filtered_train"  # 변경된 경로
TEST_DIR = "D:/데이콘 250519 대회/open/test"
SAMPLE_SUB = "D:/데이콘 250519 대회/open/sample_submission.csv"

# Label mapping
label_list = sorted(os.listdir(TRAIN_DIR))
label2id = {v: i for i, v in enumerate(label_list)}
id2label = {i: v for v, i in label2id.items()}

# Load data
image_paths = glob(os.path.join(TRAIN_DIR, '*', '*.jpg'))
labels = [label2id[os.path.basename(os.path.dirname(p))] for p in image_paths]

# Data preprocessing
def load_and_preprocess(img_path):
    img = load_img(img_path, target_size=(CFG['IMG_SIZE'], CFG['IMG_SIZE']))
    img = img_to_array(img)
    img = preprocess_input(img)
    return img

# TF Dataset 생성 함수
def create_dataset(image_paths, labels=None, is_train=True):
    def gen():
        for i, path in enumerate(image_paths):
            img = load_and_preprocess(path)
            if labels is not None:
                yield img, labels[i]
            else:
                yield img

    if labels is not None:
        ds = tf.data.Dataset.from_generator(
            gen,
            output_types=(tf.float32, tf.int32),
            output_shapes=((CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3), ())
        )
    else:
        ds = tf.data.Dataset.from_generator(
            gen,
            output_types=tf.float32,
            output_shapes=(CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3)
        )

    if is_train:
        ds = ds.shuffle(1024)
    ds = ds.batch(CFG['BATCH_SIZE']).prefetch(tf.data.AUTOTUNE)
    return ds

# Model
def build_model(num_classes):
    base = tf.keras.applications.EfficientNetB0(include_top=False, input_shape=(CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3), weights='imagenet', pooling='avg')
    x = layers.Dense(num_classes, activation='softmax')(base.output)
    model = models.Model(inputs=base.input, outputs=x)
    return model

# Cross-validation training
skf = StratifiedKFold(n_splits=CFG['FOLDS'], shuffle=True, random_state=CFG['SEED'])

for fold, (train_idx, val_idx) in enumerate(skf.split(image_paths, labels)):
    print(f"\n### Fold {fold+1}")

    train_paths = [image_paths[i] for i in train_idx]
    val_paths = [image_paths[i] for i in val_idx]
    train_labels = [labels[i] for i in train_idx]
    val_labels = [labels[i] for i in val_idx]

    train_ds = create_dataset(train_paths, train_labels, is_train=True)
    val_ds = create_dataset(val_paths, val_labels, is_train=False)

    model = build_model(num_classes=len(label2id))
    model.compile(optimizer=optimizers.Adam(CFG['LR']),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(train_ds,
              validation_data=val_ds,
              epochs=CFG['EPOCHS'],
              verbose=1)

# Inference
test_paths = sorted(glob(os.path.join(TEST_DIR, '*.jpg')))
test_ds = create_dataset(test_paths, is_train=False)

preds = model.predict(test_ds)
print("Inference 완료")

# Submission
submission = pd.read_csv(SAMPLE_SUB)
for idx, class_name in enumerate(label2id.keys()):
    submission[class_name] = preds[:, idx]

submission.to_csv("submission.csv", index=False)
print("submission.csv 저장 완료")


[INFO] 1 GPU(s) available: ['/physical_device:GPU:0']

### Fold 1 시작 ###
Epoch 1/15
   1061/Unknown - 134s 118ms/step - loss: 2.8717 - accuracy: 0.4431
Epoch 1: val_accuracy improved from -inf to 0.18507, saving model to ./models\best_model_fold1.h5


TypeError: Unable to serialize [     2.0897      2.1129      2.1082] to JSON. Unrecognized type <class 'tensorflow.python.framework.ops.EagerTensor'>.

In [26]:
import os
import random
import numpy as np
import pandas as pd
from glob import glob
from sklearn.model_selection import StratifiedKFold

import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.efficientnet import preprocess_input

# GPU 확인
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"[INFO] {len(gpus)} GPU(s) available: {[gpu.name for gpu in gpus]}")
else:
    print("[INFO] No GPU available. Training will use CPU.")

# Config
CFG = {
    'IMG_SIZE': 224,
    'EPOCHS': 15,
    'LR': 3e-4,
    'BATCH_SIZE': 24,
    'SEED': 2025,
    'FOLDS': 5,
    'AUTOTUNE': tf.data.AUTOTUNE,
    'PATIENCE': 3,
    'MODEL_SAVE_DIR': './models'
}

os.makedirs(CFG['MODEL_SAVE_DIR'], exist_ok=True)

# Seed 고정
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

set_seed(CFG['SEED'])

# 데이터 경로
TRAIN_DIR = "D:/데이콘 250519 대회/filtered_train"
TEST_DIR = "D:/데이콘 250519 대회/open/test"
SAMPLE_SUB = "D:/데이콘 250519 대회/open/sample_submission.csv"

# 레이블 매핑
label_list = sorted(os.listdir(TRAIN_DIR))
label2id = {v: i for i, v in enumerate(label_list)}
id2label = {i: v for v, i in label2id.items()}

# 이미지 경로 및 레이블 수집
image_paths = glob(os.path.join(TRAIN_DIR, '*', '*.jpg'))
labels = [label2id[os.path.basename(os.path.dirname(p))] for p in image_paths]

# 데이터 증강 함수 (train 전용)
def augment_image(image):
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, lower=0.9, upper=1.1)
    return image

# 이미지 불러오기 및 전처리 함수
def load_and_preprocess(img_path):
    img = load_img(img_path, target_size=(CFG['IMG_SIZE'], CFG['IMG_SIZE']))
    img = img_to_array(img)
    img = preprocess_input(img)
    return img

# TF Dataset 생성 함수
def create_dataset(image_paths, labels=None, is_train=True):
    def gen():
        for i, path in enumerate(image_paths):
            img = load_and_preprocess(path)
            if labels is not None:
                yield img, labels[i]
            else:
                yield img

    if labels is not None:
        ds = tf.data.Dataset.from_generator(
            gen,
            output_types=(tf.float32, tf.int32),
            output_shapes=((CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3), ())
        )
    else:
        ds = tf.data.Dataset.from_generator(
            gen,
            output_types=tf.float32,
            output_shapes=(CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3)
        )

    if is_train:
        ds = ds.map(lambda x, y: (augment_image(x), y), num_parallel_calls=CFG['AUTOTUNE'])
        ds = ds.shuffle(buffer_size=1024, seed=CFG['SEED'])
    ds = ds.batch(CFG['BATCH_SIZE']).prefetch(CFG['AUTOTUNE'])
    return ds

# 모델 생성 함수
def build_model(num_classes):
    base = tf.keras.applications.EfficientNetB0(
        include_top=False, input_shape=(CFG['IMG_SIZE'], CFG['IMG_SIZE'], 3),
        weights='imagenet', pooling='avg'
    )
    x = layers.Dense(num_classes, activation='softmax')(base.output)
    model = models.Model(inputs=base.input, outputs=x)
    return model

# 총 스텝 계산 (epoch당 스텝 수 * 전체 epoch 수)
total_steps = (len(image_paths) // CFG['BATCH_SIZE']) * CFG['EPOCHS']

# CosineDecay 학습률 스케줄러 생성
lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=CFG['LR'],
    decay_steps=total_steps
)

# Cross-validation
skf = StratifiedKFold(n_splits=CFG['FOLDS'], shuffle=True, random_state=CFG['SEED'])

fold_val_acc = []

for fold, (train_idx, val_idx) in enumerate(skf.split(image_paths, labels)):
    print(f"\n### Fold {fold+1} 시작 ###")

    train_paths = [image_paths[i] for i in train_idx]
    val_paths = [image_paths[i] for i in val_idx]
    train_labels = [labels[i] for i in train_idx]
    val_labels = [labels[i] for i in val_idx]

    train_ds = create_dataset(train_paths, train_labels, is_train=True)
    val_ds = create_dataset(val_paths, val_labels, is_train=False)

    model = build_model(num_classes=len(label2id))

    optimizer = optimizers.Adam(learning_rate=lr_schedule)  # CosineDecay를 옵티마이저에 직접 적용

    model.compile(optimizer=optimizer,
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    checkpoint_path = os.path.join(CFG['MODEL_SAVE_DIR'], f"best_model_fold{fold+1}.h5")
    early_stop = callbacks.EarlyStopping(monitor='val_accuracy', patience=CFG['PATIENCE'], restore_best_weights=True, verbose=1)
    model_ckpt = callbacks.ModelCheckpoint(checkpoint_path, monitor='val_accuracy', save_best_only=True, verbose=1)

    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=CFG['EPOCHS'],
        callbacks=[early_stop, model_ckpt],  # lr_scheduler 콜백 제거
        verbose=1
    )

    best_val_acc = max(history.history['val_accuracy'])
    print(f"Fold {fold+1} Best Validation Accuracy: {best_val_acc:.4f}")
    fold_val_acc.append(best_val_acc)

print(f"\n평균 Validation Accuracy: {np.mean(fold_val_acc):.4f}")

# 테스트 데이터셋 예측
print("테스트 데이터셋에 대해 예측 진행 중...")
test_paths = sorted(glob(os.path.join(TEST_DIR, '*.jpg')))
test_ds = create_dataset(test_paths, is_train=False)

preds = model.predict(test_ds)
pred_classes = np.argmax(preds, axis=1)

print("Inference 완료")

# 제출 파일 생성
submission = pd.read_csv(SAMPLE_SUB)
submission['category'] = [id2label[c] for c in pred_classes]

submission.to_csv("submission.csv", index=False)
print("submission.csv 저장 완료")


[INFO] 1 GPU(s) available: ['/physical_device:GPU:0']

### Fold 1 시작 ###
Epoch 1/15
   1061/Unknown - 135s 119ms/step - loss: 2.8717 - accuracy: 0.4432
Epoch 1: val_accuracy improved from -inf to 0.18507, saving model to ./models\best_model_fold1.h5


TypeError: Unable to serialize [     2.0897      2.1129      2.1082] to JSON. Unrecognized type <class 'tensorflow.python.framework.ops.EagerTensor'>.