In [2]:
from google.colab import drive
drive.mount('/content/drive')

!unzip -q "/content/drive/MyDrive/dataset.zip" -d /content/dataset

Mounted at /content/drive


In [4]:
# =========================================
# 0) 기본 설정
# =========================================
# (필요 시) !pip install --upgrade --force-reinstall pandas scikit-learn

import os, glob, zipfile, warnings, random
from pathlib import Path
import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

warnings.filterwarnings("ignore")
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# 혼합정밀 (GPU 있을 때만)
MIXED = False
if tf.config.list_physical_devices('GPU'):
    try:
        from tensorflow.keras import mixed_precision
        mixed_precision.set_global_policy('mixed_float16')
        MIXED = True
    except Exception:
        MIXED = False


# =========================================
# 0-A) 데이터셋 준비 (ZIP 없어도 동작)
#  - /content/dataset 안에 cup/ 폴더가 있으면 그대로 사용
#  - 현재 작업 폴더에서 최신 ZIP 자동 탐색 (dataset(2).zip 등 포함)
#  - 없으면 업로드 창 표시(Colab)
# =========================================
# =========================================
# 0-A) 데이터셋 준비 (드라이브 ZIP 우선 사용)
# =========================================
from google.colab import drive
drive.mount('/content/drive')

EXTRACT_DIR = "/content/dataset"
# 폴더로 이미 풀어둔 경우 👉 이 경로를 폴더로 지정하면 압축해제 생략
CUSTOM_DATASET_DIR = None  # 예: "/content/drive/MyDrive/dataset"  (폴더일 때만)

# 드라이브에 올린 ZIP 경로 (정확히 여기에 있다면 바로 사용)
PREFERRED_ZIP = "/content/drive/MyDrive/dataset.zip"

def ensure_dataset_dir():
    # 1) 사용자가 폴더를 직접 지정한 경우 (ZIP 아님)
    if CUSTOM_DATASET_DIR:
        assert os.path.isdir(CUSTOM_DATASET_DIR), f"경로가 폴더가 아닙니다: {CUSTOM_DATASET_DIR}"
        print(f"직접 지정 폴더 사용: {CUSTOM_DATASET_DIR}")
        return CUSTOM_DATASET_DIR

    os.makedirs(EXTRACT_DIR, exist_ok=True)

    # 이미 풀려 있으면 사용
    if os.path.isdir(os.path.join(EXTRACT_DIR, "cup")):
        print("기존 폴더 발견: 압축 해제 스킵")
        return EXTRACT_DIR

    # 2) 드라이브 고정 경로 ZIP 우선
    candidate_zips = []
    if os.path.isfile(PREFERRED_ZIP):
        candidate_zips.append(PREFERRED_ZIP)

    # 3) 현재 작업 폴더의 ZIP 자동 탐색 (dataset(2).zip 등 포함)
    candidate_zips += sorted(glob.glob("*.zip"), key=os.path.getmtime, reverse=True)

    # 4) 그래도 없으면 업로드 유도
    if not candidate_zips:
        try:
            from google.colab import files
            print("📤 ZIP이 없어 업로드 창을 엽니다. dataset.zip을 선택하세요.")
            uploaded = files.upload()
            candidate_zips = [os.path.join(os.getcwd(), k) for k in uploaded.keys()]
        except Exception as e:
            raise FileNotFoundError("*.zip 파일이 없고 업로드도 되지 않았습니다.") from e

    zip_path = candidate_zips[0]
    print(f"사용 ZIP: {zip_path}")

    # 압축 테스트 및 해제
    try:
        with zipfile.ZipFile(zip_path, 'r') as zf:
            bad = zf.testzip()
            if bad:
                raise zipfile.BadZipFile(f"손상된 파일: {bad}")
            zf.extractall(EXTRACT_DIR)
    except zipfile.BadZipFile as e:
        raise RuntimeError(
            f"ZIP이 손상되었습니다: {e}\n"
            f"- 폴더를 다시 ZIP으로 압축해 재업로드하거나,\n"
            f"- Google Drive를 마운트해 폴더 경로를 직접 지정하세요."
        )

    # ZIP 내부에 'dataset/' 폴더가 한 겹 더 있으면 그 내부를 사용
    inner = os.path.join(EXTRACT_DIR, "dataset")
    if os.path.isdir(inner):
        print("↪️ 내부 'dataset' 폴더 발견: 해당 경로 사용")
        return inner

    return EXTRACT_DIR

DATASET_DIR = ensure_dataset_dir()
print("📁 DATASET_DIR:", DATASET_DIR)

# =========================================
# 1) 라벨 규칙 (객체 6클래스 + 재질 3클래스)
#  - 요청하신 매핑을 그대로 반영
# =========================================
OBJECT_CLASS_NAMES = ['cup_paper', 'cup_plastic', 'cupholder', 'lid', 'straw', 'none']
OBJECT_CLASS_MAP   = {n:i for i,n in enumerate(OBJECT_CLASS_NAMES)}

MATERIAL_CLASS_NAMES= ['paper', 'plastic', 'none']
MATERIAL_CLASS_MAP  = {n:i for i,n in enumerate(MATERIAL_CLASS_NAMES)}

IMG_EXTS = ('.jpg', '.jpeg', '.png', '.bmp', '.webp')

def infer_labels_from_path(path: str):
    """
    폴더 구조:
      - cup/paper/*, cup/plastic/*
      - cupholder/*
      - lid/*
      - straw/*
      - none/*
    재질 매핑:
      cup_paper → paper
      cup_plastic → plastic
      cupholder → paper
      lid → plastic
      straw → plastic
      none → none
    """
    base = Path(DATASET_DIR).resolve()
    rel  = Path(path).resolve().relative_to(base)
    top  = rel.parts[0].lower()
    sub  = rel.parts[1].lower() if len(rel.parts) > 1 else ''

    if top == 'cup':
        if sub == 'paper':
            return ('cup_paper', 'paper')
        elif sub == 'plastic':
            return ('cup_plastic', 'plastic')
        else:
            # cup 바로 아래 이미지인 경우 파일명 힌트
            name = Path(path).stem.lower()
            if 'paper' in name:
                return ('cup_paper', 'paper')
            elif 'plastic' in name:
                return ('cup_plastic', 'plastic')
            raise ValueError(f"[cup] 재질 하위폴더/힌트 불명확: {path}")

    elif top == 'cupholder':
        return ('cupholder', 'paper')

    elif top == 'lid':
        return ('lid', 'plastic')

    elif top == 'straw':
        return ('straw', 'plastic')  # 전부 plastic 고정

    elif top == 'none':
        return ('none', 'none')

    else:
        raise ValueError(f"알 수 없는 최상위 폴더: {top} (path={path})")


# =========================================
# 2) CSV 생성
# =========================================
image_paths, object_classes, materials = [], [], []

for root, _, files in os.walk(DATASET_DIR):
    for fname in files:
        if fname.lower().endswith(IMG_EXTS):
            fpath = os.path.join(root, fname)
            try:
                obj_cls, mat = infer_labels_from_path(fpath)
            except Exception as e:
                print("라벨 추론 스킵:", e)
                continue
            image_paths.append(fpath)
            object_classes.append(obj_cls)
            materials.append(mat)

if not image_paths:
    raise RuntimeError("이미지를 찾지 못했습니다. 폴더 구조/확장자 확인 필요.")

df = pd.DataFrame({
    "image_path": image_paths,
    "object_class": object_classes,
    "material_type": materials
})

print("원본 분포(객체):\n", df['object_class'].value_counts().sort_index())
print("원본 분포(재질):\n", df['material_type'].value_counts().sort_index())


# =========================================
# 3) 균형화 (객체 클래스별 정확히 500장)
# =========================================
TARGET_PER_CLASS = 500
balanced = []
for cls in OBJECT_CLASS_NAMES:
    g = df[df['object_class'] == cls]
    n = len(g)
    if n == 0:
        raise RuntimeError(f"'{cls}' 클래스 이미지가 없습니다.")
    if n == TARGET_PER_CLASS:
        balanced.append(g)
    elif n > TARGET_PER_CLASS:
        balanced.append(g.sample(n=TARGET_PER_CLASS, replace=False, random_state=SEED))
    else:
        balanced.append(g.sample(n=TARGET_PER_CLASS, replace=True, random_state=SEED))

df = pd.concat(balanced, ignore_index=True)

# 라벨 인코딩
df['object_label']   = df['object_class'].map(OBJECT_CLASS_MAP).astype(np.int32)
df['material_label'] = df['material_type'].map(MATERIAL_CLASS_MAP).astype(np.int32)

print("\n균형 분포(객체):\n", df['object_class'].value_counts().sort_index())
print("\n균형 분포(재질):\n", df['material_type'].value_counts().sort_index())

CSV_PATH = "/content/labels.csv"
df.to_csv(CSV_PATH, index=False)
print(f"\nlabels.csv 저장 완료: {CSV_PATH} (총 {len(df)}장)")


# =========================================
# 4) 데이터 분할 (Train/Val/Test = 70/15/15, 객체 기준 stratify)
# =========================================
train_df, temp_df = train_test_split(
    df, test_size=0.30, random_state=SEED, stratify=df['object_label']
)
val_df, test_df = train_test_split(
    temp_df, test_size=0.50, random_state=SEED, stratify=temp_df['object_label']
)

print(f"\nSplit -> Train:{len(train_df)}  Val:{len(val_df)}  Test:{len(test_df)}")


# =========================================
# 5) tf.data 파이프라인
# =========================================
IMG_SIZE = (180, 180)
BATCH_SIZE = 32
AUTO = tf.data.AUTOTUNE

data_augmentation = tf.keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
], name="augmentation")

def load_and_preprocess(path, obj_label, mat_label, augment=False):
    img = tf.io.read_file(path)
    img = tf.io.decode_image(img, channels=3, expand_animations=False)
    img = tf.image.resize(img, IMG_SIZE, method='bilinear')
    img = preprocess_input(img)  # [-1,1]
    if augment:
        img = data_augmentation(img, training=True)
    return img, {'object_output': obj_label, 'material_output': mat_label}

def make_ds(dataframe, training=False):
    paths = dataframe['image_path'].astype(str).values
    obj   = dataframe['object_label'].astype(np.int32).values
    mat   = dataframe['material_label'].astype(np.int32).values
    ds = tf.data.Dataset.from_tensor_slices((paths, obj, mat))
    if training:
        ds = ds.shuffle(buffer_size=len(dataframe), seed=SEED, reshuffle_each_iteration=True)
    ds = ds.map(lambda p,o,m: load_and_preprocess(p,o,m,augment=training), num_parallel_calls=AUTO)
    ds = ds.batch(BATCH_SIZE).prefetch(AUTO)
    # 검증/테스트는 순서 보장(평가 신뢰도 ↑)
    if not training:
        opts = tf.data.Options()
        opts.experimental_deterministic = True
        ds = ds.with_options(opts)
    return ds

train_ds = make_ds(train_df, training=True)
val_ds   = make_ds(val_df,   training=False)
test_ds  = make_ds(test_df,  training=False)


# =========================================
# 6) 모델 (MobileNetV2 멀티아웃풋)
# =========================================
def build_model():
    base = MobileNetV2(input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3), include_top=False, weights='imagenet')
    base.trainable = False

    inp = tf.keras.Input(shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
    x = base(inp, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.2)(x)
    x = layers.Dense(256, activation='relu')(x)

    dtype_out = 'float32' if MIXED else None
    object_output   = layers.Dense(len(OBJECT_CLASS_NAMES),   activation='softmax', dtype=dtype_out, name='object_output')(x)
    material_output = layers.Dense(len(MATERIAL_CLASS_NAMES), activation='softmax', dtype=dtype_out, name='material_output')(x)

    model = models.Model(inp, {'object_output': object_output, 'material_output': material_output})
    return model, base

model, base = build_model()

ckpt_path = "/content/best_model.keras"
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(ckpt_path, monitor='val_object_output_accuracy',
                                       mode='max', save_best_only=True, verbose=1),
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.4, patience=2, min_lr=1e-6, verbose=1)
]

# 1단계: Warm-up (베이스 고정)
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss={'object_output': 'sparse_categorical_crossentropy',
          'material_output': 'sparse_categorical_crossentropy'},
    metrics={'object_output': 'accuracy', 'material_output': 'accuracy'}
)
history_warm = model.fit(train_ds, validation_data=val_ds, epochs=7, callbacks=callbacks, verbose=1)

# 2단계: 상위 블록 파인튜닝 (BN 제외)
N_TRAIN = 50
for lyr in base.layers[-N_TRAIN:]:
    if not isinstance(lyr, layers.BatchNormalization):
        lyr.trainable = True

model.compile(
    optimizer=tf.keras.optimizers.Adam(2e-4),
    loss={'object_output': 'sparse_categorical_crossentropy',
          'material_output': 'sparse_categorical_crossentropy'},
    metrics={'object_output': 'accuracy', 'material_output': 'accuracy'}
)
history_ft = model.fit(train_ds, validation_data=val_ds, epochs=12, callbacks=callbacks, verbose=1)

FINAL_MODEL_PATH = "/content/multilabel_model.keras"
model.save(FINAL_MODEL_PATH)
print("모델 저장:", FINAL_MODEL_PATH)


# =========================================
# 7) 평가 (Val/Test) - 객체/재질
#  - val/test는 순서를 고정했으므로 df와 1:1 대응
# =========================================
def evaluate_split(name, ds, df_ref):
    preds = model.predict(ds, verbose=0)
    y_obj_pred = preds['object_output'].argmax(axis=1)
    y_mat_pred = preds['material_output'].argmax(axis=1)

    y_obj_true = df_ref['object_label'].values
    y_mat_true = df_ref['material_label'].values

    print(f"\n=== [{name}] Object Report ===")
    print(classification_report(y_obj_true, y_obj_pred, target_names=OBJECT_CLASS_NAMES, digits=4))
    print("Confusion Matrix (Object):\n", confusion_matrix(y_obj_true, y_obj_pred))

    print(f"\n=== [{name}] Material Report ===")
    print(classification_report(y_mat_true, y_mat_pred, target_names=MATERIAL_CLASS_NAMES, digits=4))
    print("Confusion Matrix (Material):\n", confusion_matrix(y_mat_true, y_mat_pred))

evaluate_split("VAL",  val_ds,  val_df)
evaluate_split("TEST", test_ds, test_df)


# =========================================
# 8) 추론 유틸
# =========================================
from tensorflow.keras.preprocessing import image as kimage
import matplotlib.pyplot as plt

def predict_image(img_path, none_threshold=0.55):
    p = Path(img_path)
    if not p.exists():
        print(f"파일 없음: {img_path}"); return
    img = kimage.load_img(img_path, target_size=IMG_SIZE)
    arr = kimage.img_to_array(img)
    x = tf.expand_dims(preprocess_input(arr), axis=0)

    preds = model.predict(x, verbose=0)
    obj_prob = preds['object_output'][0]
    mat_prob = preds['material_output'][0]
    o_idx = int(np.argmax(obj_prob))
    m_idx = int(np.argmax(mat_prob))

    # 낮은 확률이면 none으로 게이팅 (none 자체가 최고면 유지)
    if obj_prob[o_idx] < none_threshold and OBJECT_CLASS_NAMES[o_idx] != 'none':
        o_idx = OBJECT_CLASS_MAP['none']

    print(f"Object: {OBJECT_CLASS_NAMES[o_idx]} ({obj_prob[o_idx]*100:.2f}%)")
    print(f"Material: {MATERIAL_CLASS_NAMES[m_idx]} ({mat_prob[m_idx]*100:.2f}%)")

def predict_and_show(img_path, none_threshold=0.55):
    img = kimage.load_img(img_path, target_size=IMG_SIZE)
    arr = kimage.img_to_array(img).astype("uint8")
    x = tf.expand_dims(preprocess_input(arr), axis=0)

    preds = model.predict(x, verbose=0)
    obj_prob = preds['object_output'][0]
    mat_prob = preds['material_output'][0]
    o_idx = int(np.argmax(obj_prob))
    m_idx = int(np.argmax(mat_prob))
    if obj_prob[o_idx] < none_threshold and OBJECT_CLASS_NAMES[o_idx] != 'none':
        o_idx = OBJECT_CLASS_MAP['none']

    plt.imshow(arr); plt.axis("off")
    plt.title(f"{OBJECT_CLASS_NAMES[o_idx]} ({obj_prob[o_idx]*100:.1f}%)\n"
              f"{MATERIAL_CLASS_NAMES[m_idx]} ({mat_prob[m_idx]*100:.1f}%)")
    plt.show()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
사용 ZIP: /content/drive/MyDrive/dataset.zip
↪️ 내부 'dataset' 폴더 발견: 해당 경로 사용
📁 DATASET_DIR: /content/dataset/dataset
원본 분포(객체):
 object_class
cup_paper      500
cup_plastic    500
cupholder      500
lid            500
none           500
straw          500
Name: count, dtype: int64
원본 분포(재질):
 material_type
none        500
paper      1000
plastic    1500
Name: count, dtype: int64

균형 분포(객체):
 object_class
cup_paper      500
cup_plastic    500
cupholder      500
lid            500
none           500
straw          500
Name: count, dtype: int64

균형 분포(재질):
 material_type
none        500
paper      1000
plastic    1500
Name: count, dtype: int64

labels.csv 저장 완료: /content/labels.csv (총 3000장)

Split -> Train:2100  Val:450  Test:450
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf

In [5]:
df = pd.read_csv("/content/labels.csv")
print(df.groupby(['object_class','material_type']).size().sort_index())

object_class  material_type
cup_paper     paper            500
cup_plastic   plastic          500
cupholder     paper            500
lid           plastic          500
none          none             500
straw         plastic          500
dtype: int64
