<a href="https://colab.research.google.com/github/RLungWu/NCKU-CVDL-2025/blob/main/%E5%90%B3%E4%BD%B6%E5%80%9E_N26140901_Midterm_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Theme1

In [1]:
import tensorflow as tf
print(tf.config.list_physical_devices('GPU'))

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [2]:
!nvidia-smi

Thu Nov 27 08:51:40 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   62C    P8             11W /   70W |       2MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

#### Download Dataset

In [3]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("sanikamal/rock-paper-scissors-dataset")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'rock-paper-scissors-dataset' dataset.
Path to dataset files: /kaggle/input/rock-paper-scissors-dataset


#### v2 test

In [None]:
import os
import json
import tensorflow as tf
import numpy as np
from pathlib import Path

# ==========================================
# 1. 全域設定 (極限效能版)
# ==========================================
# 如果路徑不同請自行修改
#path = "/content/Rock-Paper-Scissors"

root = Path(path) / "Rock-Paper-Scissors"

SEED = 77
AUTOTUNE = tf.data.AUTOTUNE

# RAM 若足夠大，設 dataset 大小；否則設 1000 左右
SHUFFLE_BUFFER = 1000

# [極限優化] 暴力拉大 Batch Size 以填滿 VRAM
# 如果 VRAM OOM，請降為 128 或 64
BATCH_SIZE = 64
IMG_SIZE = (224, 224)

# 開啟混合精度加速 (VRAM 省一半，速度變快)
tf.keras.mixed_precision.set_global_policy('mixed_float16')

# ==========================================
# 2. 資料集準備 (修正 Validation 結構)
# ==========================================

# --- A. 建立 Train/Test 的 file list ---
train_dir = root / "train"
test_dir = root / "test"

# 使用 tf.keras.utils.image_dataset_from_directory 雖然方便，
# 但為了實作 uint8 trick，我們改用底層 API 手動控制流程，這樣最穩。
# 這裡我們只抓路徑，不做讀取。
train_ds_raw = tf.keras.utils.image_dataset_from_directory(
    train_dir,
    labels="inferred",
    label_mode="int",
    image_size=IMG_SIZE,
    batch_size=None, # 設為 None 以獲得單張圖片的 dataset，方便我們自定義 pipeline
    shuffle=True,
    seed=SEED
)

test_ds_raw = tf.keras.utils.image_dataset_from_directory(
    test_dir,
    labels="inferred",
    label_mode="int",
    image_size=IMG_SIZE,
    batch_size=None,
    shuffle=False
)

# --- B. 修正 Validation Pipeline (Pure TF) ---
# 解決你原本 RAM 壓力的核心：不使用 Python Loop 和 py_function
label_map = {"rock": 0, "paper": 1, "scissors": 2}
val_files = sorted((root / "validation").glob("*.png"))

val_paths = []
val_labels = []

print(f"正在解析 Validation 檔案... 共 {len(val_files)} 張")
for p in val_files:
    name = p.name.lower()
    found = False
    for cls_name, idx in label_map.items():
        if cls_name in name:
            val_paths.append(str(p))
            val_labels.append(idx)
            found = True
            break
    if not found:
        print(f"Warning: 無法識別類別 - {p.name}")

# 建立純文字路徑的 dataset
val_ds_paths = tf.data.Dataset.from_tensor_slices((val_paths, val_labels))

# 定義讀取函數 (Pure TF) - 這是 Validation 專用的讀取器
def load_val_image(path, label):
    img = tf.io.read_file(path)
    img = tf.io.decode_png(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    # 統一轉為 uint8，對齊 Train 的格式
    img = tf.cast(img, tf.uint8)
    return img, label

val_ds_raw = val_ds_paths.map(load_val_image, num_parallel_calls=AUTOTUNE)

# ==========================================
# 3. 核心優化 Pipeline (The uint8 Trick)
# ==========================================

# 定義 Augmentation (只在 GPU/Float 階段執行)
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.RandomZoom(0.1),
    tf.keras.layers.RandomContrast(0.15),
    tf.keras.layers.RandomBrightness(factor=0.1),
])

def apply_pipeline(ds, training=False):
    # 【階段 1】：前處理 (Pre-Cache)
    # 如果是 image_dataset_from_directory 出來的資料已經是 float32，我們要轉回 uint8
    def to_uint8(img, label):
        img = tf.cast(img, tf.uint8)
        # 確保 resize 是正確的 (雙重保險)
        img = tf.image.resize(img, IMG_SIZE)
        img = tf.cast(img, tf.uint8)
        return img, label

    ds = ds.map(to_uint8, num_parallel_calls=AUTOTUNE)

    # 【階段 2】：Cache (關鍵！)
    # 此時記憶體中存的是 uint8 (體積只有 float32 的 1/4)
    # 這樣你可以放心 cache 而不爆 RAM
    ds = ds.cache()

    # 【階段 3】：Shuffle (只對 Training)
    if training:
        ds = ds.shuffle(SHUFFLE_BUFFER, seed=SEED, reshuffle_each_iteration=True)

    # 【階段 4】：後處理 (Post-Cache) - 轉 Float、歸一化、Augmentation
    def process_on_gpu(img, label):
        # 轉回 Float32
        img = tf.cast(img, tf.float32)
        # 歸一化 (0~1)
        img = img / 255.0

        if training:
            # Augmentation 必須在 float32 下進行
            img = data_augmentation(img, training=True)
        return img, label

    # 進行 batching，然後再做 GPU 運算
    ds = ds.batch(BATCH_SIZE)
    ds = ds.map(process_on_gpu, num_parallel_calls=AUTOTUNE)

    # 【階段 5】：Prefetch
    ds = ds.prefetch(AUTOTUNE)
    return ds

print("正在建構高效能 Pipeline...")
train_ds = apply_pipeline(train_ds_raw, training=True)
val_ds   = apply_pipeline(val_ds_raw, training=False)
test_ds  = apply_pipeline(test_ds_raw, training=False)
print("Pipeline 建構完成。")

# ==========================================
# 4. 模型與訓練 (XLA 加速)
# ==========================================

def build_scratch_model(input_shape=(*IMG_SIZE, 3), num_classes=3):
    inputs = tf.keras.Input(shape=input_shape)

    # 因為我們在 pipeline 做了 /255.0，這裡不需要再 Rescaling

    x = tf.keras.layers.Conv2D(24, 3, padding="same")(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.MaxPool2D()(x)

    for filters in [64, 128, 256]:
        x = tf.keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)
        x = tf.keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)
        x = tf.keras.layers.MaxPool2D()(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dropout(0.4)(x)

    # 確保輸出層是 float32 (為了 Mixed Precision 穩定性)
    outputs = tf.keras.layers.Dense(num_classes, activation="softmax", dtype='float32')(x)
    return tf.keras.Model(inputs, outputs)

model = build_scratch_model()

# 編譯模型：開啟 XLA (jit_compile=True)
# 根據 Batch Size 256 調整 Learning Rate
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=2e-3), # 稍微調高 LR
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=["accuracy"],
    jit_compile=True # <--- 核武器：XLA 編譯加速
)

callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        "scratch_best.keras",
        monitor="val_accuracy",
        save_best_only=True
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5
    ),
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=10, restore_best_weights=True
    )
]

print(f"開始訓練... Batch Size: {BATCH_SIZE}, XLA: Enabled")
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=40,
    callbacks=callbacks
)

Found 2520 files belonging to 3 classes.
Found 372 files belonging to 3 classes.
正在解析 Validation 檔案... 共 33 張
正在建構高效能 Pipeline...
Pipeline 建構完成。
開始訓練... Batch Size: 64, XLA: Enabled
Epoch 1/40
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m63s[0m 1s/step - accuracy: 0.3344 - loss: 1.3825 - val_accuracy: 0.3333 - val_loss: 1.3705 - learning_rate: 0.0020
Epoch 2/40
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m61s[0m 888ms/step - accuracy: 0.3273 - loss: 1.1443 - val_accuracy: 0.3939 - val_loss: 1.1005 - learning_rate: 0.0020
Epoch 3/40
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 858ms/step - accuracy: 0.3372 - loss: 1.1296 - val_accuracy: 0.4545 - val_loss: 1.0984 - learning_rate: 0.0020
Epoch 4/40
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 863ms/step - accuracy: 0.3229 - loss: 1.1289 - val_accuracy: 0.3333 - val_loss: 1.1000 - learning_rate: 0.0020
Epoch 5/40
[1m32/40[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m7s[0

#### EDA

In [71]:
import os

# path = "/content/Rock-Paper-Scissors"

for root, dirs, files in os.walk(path):
    for d in dirs:
        print(os.path.join(root, d))

/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors
/kaggle/input/rock-paper-scissors-dataset/rock-paper-scissors
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/validation
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/test
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/train
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/test/paper
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/test/rock
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/test/scissors
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/train/paper
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/train/rock
/kaggle/input/rock-paper-scissors-dataset/Rock-Paper-Scissors/train/scissors
/kaggle/input/rock-paper-scissors-dataset/rock-paper-scissors/Rock-Paper-Scissors
/kaggle/input/rock-paper-scissors-dataset/rock-paper-scissors/Rock-Paper-Scissors/validation
/kaggle/input/rock-paper-scissors-dataset/rock-p

In [72]:
from pathlib import Path
import json

root = Path(path) / "Rock-Paper-Scissors"
splits = ["train", "validation", "test"]
summary = {}

for split in splits:
    split_dir = root / split
    summary[split] = {}
    if split_dir.is_dir():
        if any(child.is_dir() for child in split_dir.iterdir()):
            # train/test
            for cls_dir in sorted(split_dir.iterdir()):
                if cls_dir.is_dir():
                    summary[split][cls_dir.name] = len(list(cls_dir.glob("*.png")))
        else:
            # validation
            summary[split] = {"rock": 0, "paper": 0, "scissors": 0}
            for img_path in split_dir.glob("*.png"):
                name = img_path.name.lower()
                for cls in summary[split]:
                    if cls in name:
                        summary[split][cls] += 1
                        break
    summary[split]["total"] = sum(summary[split].values())

summary["overall_total"] = sum(summary[s]["total"] for s in splits)

with open("data_summary.json", "w", encoding="utf-8") as f:
    json.dump(summary, f, indent=2, ensure_ascii=False)

print(json.dumps(summary, indent=2, ensure_ascii=False))


{
  "train": {
    "paper": 840,
    "rock": 840,
    "scissors": 840,
    "total": 2520
  },
  "validation": {
    "rock": 11,
    "paper": 11,
    "scissors": 11,
    "total": 33
  },
  "test": {
    "paper": 124,
    "rock": 124,
    "scissors": 124,
    "total": 372
  },
  "overall_total": 2925
}


#### Clean Dataset

In [73]:
import tensorflow as tf
import numpy as np
from pathlib import Path

SEED = 77
AUTOTUNE = tf.data.AUTOTUNE
SHUFFLE_BUFFER = 256
PATH = path
ROOT = root
IMG_SIZE = (224, 224)
BATCH_SIZE = 128

train_ds_raw = tf.keras.utils.image_dataset_from_directory(
    ROOT / "train",
    labels="inferred",
    label_mode="int",
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=False
)
test_ds_raw = tf.keras.utils.image_dataset_from_directory(
    ROOT / "test",
    labels="inferred",
    label_mode="int",
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=False
)




Found 2520 files belonging to 3 classes.
Found 372 files belonging to 3 classes.


In [74]:
# ==========================================
# Optimized Validation Pipeline (Pure TF)
# ==========================================

# 1. 解析路徑與標籤 (這部分在 Python 端只跑一次，生成 list，沒問題)
label_map = {"rock": 0, "paper": 1, "scissors": 2}
val_files = sorted((root / "validation").glob("*.png"))

val_path_strs = [str(p) for p in val_files]
val_labels = []

for p in val_files:
    name = p.name.lower()
    # 這裡的邏輯保持你原本的寫法，但建議檢查是否會有 unmatched 的情況
    for cls_name, idx in label_map.items():
        if cls_name in name:
            val_labels.append(idx)
            break
    else:
        # 處理異常情況，避免 label 對不齊
        print(f"Warning: No label found for {p.name}")
        val_labels.append(-1) # 標記錯誤

# 2. 定義 Pure TF Loading Function
def load_and_process_image(path, label):
    # 使用 TF 原生 IO，不經過 Python Interpreter
    img_raw = tf.io.read_file(path)
    img = tf.io.decode_png(img_raw, channels=3)

    # 這裡直接 Resize，確保記憶體中只持有小圖
    img = tf.image.resize(img, IMG_SIZE)

    # 轉換型別 (雖然 base_preprocess 會轉，但在這裡轉比較保險)
    img = tf.cast(img, tf.float32)

    return img, label

# 3. 建構 Dataset
val_ds_raw = tf.data.Dataset.from_tensor_slices((val_path_strs, val_labels))

# 過濾掉無法標記的數據 (若有)
val_ds_raw = val_ds_raw.filter(lambda x, y: y != -1)

# 平行化處理 map
val_ds_raw = val_ds_raw.map(load_and_process_image, num_parallel_calls=AUTOTUNE)

# Batching (注意：這裡先不要 prefetch，統一在最後做)
val_ds_raw = val_ds_raw.batch(BATCH_SIZE)

# ==========================================
# End of Optimization
# ==========================================

#### Augumentation

In [75]:
import tensorflow as tf

base_preprocess = tf.keras.Sequential([
    tf.keras.layers.Resizing(*IMG_SIZE),
    tf.keras.layers.Rescaling(1./255, dtype=tf.float32),
], name="base_preprocess")

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.RandomZoom(0.1),
    tf.keras.layers.RandomContrast(0.15),
    tf.keras.layers.RandomBrightness(factor=0.1),
], name="data_augmentation")


def preprocess_dataset(ds, training=False):
    # 1. 第一階段：只做 Resize，保持 uint8 (0-255)
    # 移除 Rescaling，確保 Tensor 還是 uint8
    def initial_load(path, label):
        img = tf.io.read_file(path)
        img = tf.io.decode_png(img, channels=3)
        img = tf.image.resize(img, IMG_SIZE)
        # resize 預設輸出 float32，我們強轉回 uint8 省記憶體
        img = tf.cast(img, tf.uint8)
        return img, label

    ds = ds.map(initial_load, num_parallel_calls=AUTOTUNE)

    # 2. 關鍵時刻：Cache uint8 數據
    # 此時記憶體佔用量只有 float32 版本的 1/4
    ds = ds.cache()

    # 3. 第二階段：即時轉換 (On-the-fly casting)
    # 從 Cache 拿出來後，再轉成 float32 並歸一化
    def subsequent_process(img, label):
        img = tf.cast(img, tf.float32)
        img = img / 255.0  # 手動 Rescaling
        return img, label

    ds = ds.map(subsequent_process, num_parallel_calls=AUTOTUNE)

    if training:
        ds = ds.shuffle(SHUFFLE_BUFFER)
        # Augmentation 這裡會自動處理 float32
        ds = ds.map(lambda x, y: (data_augmentation(x, training=True), y),
                    num_parallel_calls=AUTOTUNE)

    return ds.prefetch(AUTOTUNE)


train_ds = preprocess_dataset(train_ds_raw, training=True)
val_ds   = preprocess_dataset(val_ds_raw)
test_ds  = preprocess_dataset(test_ds_raw)


TypeError: in user code:

    File "/tmp/ipython-input-3588197059.py", line 21, in initial_load  *
        img = tf.io.read_file(path)

    TypeError: Input 'filename' of 'ReadFile' Op has type float32 that does not match expected type of string.


#### Model Building

In [None]:
tf.keras.mixed_precision.set_global_policy('mixed_float16')

def build_scratch_model(input_shape=(224, 224,3), num_classes=3):
    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.Conv2D(24, 3, padding="same")(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.MaxPool2D()(x)

    for filters in [64, 128, 256, 512]:
        x = tf.keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)
        x = tf.keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)
        x = tf.keras.layers.MaxPool2D()(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dropout(0.4)(x)
    outputs = tf.keras.layers.Dense(num_classes, activation="softmax")(x)
    return tf.keras.Model(inputs, outputs)


#### Training

In [None]:
model = build_scratch_model()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=2e-3),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=["accuracy"],
    jit_compile=True
)

callbacks = [
    tf.keras.callbacks.ModelCheckpoint("scratch_best.keras",
                                       monitor="val_accuracy",
                                       save_best_only=True),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5),
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=6, restore_best_weights=True)
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=40,
    callbacks=callbacks
)


#### Plot

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(history.history["loss"], label="train")
plt.plot(history.history["val_loss"], label="val")
plt.title("Loss"); plt.legend()
plt.subplot(1,2,2)
plt.plot(history.history["accuracy"], label="train")
plt.plot(history.history["val_accuracy"], label="val")
plt.title("Accuracy"); plt.legend()
plt.show()


In [None]:
import seanborn as sns
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

test_loss, test_acc = model.evaluate(test_ds)
print(f"Test acc: {test_acc:.3f}")

y_true, y_pred = [], []
for imgs, labels in test_ds:
    probs = model.predict(imgs, verbose=0)
    y_true.extend(labels.numpy())
    y_pred.extend(np.argmax(probs, axis=1))

cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=label_map.keys(),
            yticklabels=label_map.keys())
plt.xlabel("Predicted"); plt.ylabel("True"); plt.show()
print(classification_report(y_true, y_pred, target_names=list(label_map.keys())))
