## base code

In [None]:
# ===== train_and_log.py (요청 사양 저장 형식) =====
import os, fnmatch, random, time, json, pickle
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, Callback
from sklearn.metrics import mean_squared_error, f1_score, accuracy_score

print("tf.__version__:", tf.__version__)
print("GPU:", "Available" if tf.config.list_physical_devices('GPU') else "Not available")

# --------------------------
# 설정 (이 3가지만 바꿔 쓰세요)
# --------------------------
data_root = "주소"                 # 압축 풀린 루트 폴더
model_output_dir = "저장 파일 이름" # 저장 폴더
model_name = "모델 이름"           # 실험/모델 이름

epochs = 30
batch_size = 32
lr = 1e-4

os.makedirs(model_output_dir, exist_ok=True)

# 저장 파일 경로들
ACC_CSV = os.path.join(model_output_dir, f"{model_name}_accuracy.csv")
F1_CSV  = os.path.join(model_output_dir, f"{model_name}_f1.csv")
FINAL_H5 = os.path.join(model_output_dir, f"{model_name}_final.h5")
TIME_TXT = os.path.join(model_output_dir, f"{model_name}_time.txt")
SIZE_TXT = os.path.join(model_output_dir, f"{model_name}_size.txt")
# PARAMS_TXT = os.path.join(model_output_dir, f"{model_name}_params.txt")  # 파라미터 수로 저장하고 싶으면 사용

# --------------------------
# 데이터 로딩(전처리 없음)
# --------------------------
train_dir = os.path.join(data_root, "train")
test_dir  = os.path.join(data_root, "test")

def collect_paths_angles(folder, pattern="*.png"):
    paths, labels = [], []
    for fn in os.listdir(folder):
        if fnmatch.fnmatch(fn, pattern):
            path = os.path.join(folder, fn)
            angle = int(fn[-7:-4])  # 예: ..._015.png → 15
            paths.append(path)
            labels.append(angle)
    return paths, labels

X_train, y_train = collect_paths_angles(train_dir)
X_test,  y_test  = collect_paths_angles(test_dir)

assert len(X_train) and len(X_test), "train/test 폴더에 .png 이미지가 있는지 확인하세요."
print(f"Train: {len(X_train)}  |  Test: {len(X_test)}")

import cv2
def load_image(path, as_rgb=True):
    img = cv2.imread(path, cv2.IMREAD_COLOR)
    if img is None:
        raise FileNotFoundError(f"이미지 로드 실패: {path}")
    if as_rgb:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img.astype(np.float32)  # 전처리 없음 (사전 전처리 가정)

# 입력 크기 추론
INPUT_SHAPE = load_image(X_train[0]).shape
print("INPUT_SHAPE:", INPUT_SHAPE)

def image_data_generator(paths, labels, batch):
    n = len(paths)
    while True:
        batch_imgs, batch_lbls = [], []
        for _ in range(batch):
            i = random.randint(0, n-1)
            batch_imgs.append(load_image(paths[i]))
            batch_lbls.append(labels[i])
        yield np.asarray(batch_imgs, dtype=np.float32), np.asarray(batch_lbls, dtype=np.int32)

train_gen = image_data_generator(X_train, y_train, batch_size)
valid_gen = image_data_generator(X_test,  y_test,  batch_size)

steps_per_epoch = max(1, len(X_train)//batch_size)
val_steps       = max(1, len(X_test)//batch_size)

# --------------------------
# 모델
# --------------------------
def build_model(input_shape):
    m = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv2D(24, (5,5), strides=(2,2), activation='relu'),
        layers.Conv2D(36, (5,5), strides=(2,2), activation='relu'),
        layers.Conv2D(48, (5,5), strides=(2,2), activation='relu'),
        layers.Conv2D(64, (3,3), activation='relu'),
        layers.Conv2D(64, (3,3), activation='relu'),
        layers.Flatten(),
        layers.Dense(100, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(50, activation='relu'),
        layers.Dense(10, activation='relu'),
        layers.Dense(1)  # 회귀 출력(각도)
    ])
    m.compile(optimizer=tf.keras.optimizers.Adam(lr), loss='mse',
              metrics=[tf.keras.metrics.MeanAbsoluteError(name="mae")])
    return m

model = build_model(INPUT_SHAPE)
model.summary()

# --------------------------
# 고정 평가셋(매 epoch 동일)
# --------------------------
EVAL_MAX = min(200, len(X_test))
eval_paths = X_test[:EVAL_MAX]
eval_labels = np.asarray(y_test[:EVAL_MAX], dtype=np.int32)
eval_images = np.asarray([load_image(p) for p in eval_paths], dtype=np.float32)

# --------------------------
# 콜백: 에폭마다 accuracy / f1를 파일로 저장
# --------------------------
class SaveAccF1PerEpoch(Callback):
    def __init__(self, X, y, acc_csv, f1_csv):
        super().__init__()
        self.X, self.y = X, y
        self.acc_csv = acc_csv
        self.f1_csv  = f1_csv
        # 헤더 생성(없으면)
        if not os.path.exists(self.acc_csv):
            with open(self.acc_csv, "w", encoding="utf-8") as f:
                f.write("epoch,accuracy\n")
        if not os.path.exists(self.f1_csv):
            with open(self.f1_csv, "w", encoding="utf-8") as f:
                f.write("epoch,f1_macro\n")

    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.X, verbose=0).squeeze()
        y_pred_cls = np.round(y_pred).astype(np.int32)

        acc = accuracy_score(self.y, y_pred_cls)
        f1  = f1_score(self.y, y_pred_cls, average="macro", zero_division=0)

        with open(self.acc_csv, "a", encoding="utf-8") as f:
            f.write(f"{epoch+1},{acc:.6f}\n")
        with open(self.f1_csv, "a", encoding="utf-8") as f:
            f.write(f"{epoch+1},{f1:.6f}\n")

        print(f"[Epoch {epoch+1}] accuracy={acc:.4f} | f1_macro={f1:.4f}")

epoch_cb = SaveAccF1PerEpoch(eval_images, eval_labels, ACC_CSV, F1_CSV)

# 얼리스톱(선택)
es_cb   = EarlyStopping(monitor="val_loss", mode="min", patience=3, restore_best_weights=True)

# --------------------------
# 학습
# --------------------------
t0 = time.time()
history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=epochs,
    validation_data=valid_gen,     # test를 validation으로 사용
    validation_steps=val_steps,
    callbacks=[epoch_cb, es_cb],   # 체크포인트 콜백 제거(요청)
    verbose=1,
    shuffle=True
)
train_time_sec = time.time() - t0
print(f"학습시간(초): {train_time_sec:.2f}")

# --------------------------
# 모델 저장(.h5) + 크기/시간 저장
# --------------------------
model.save(FINAL_H5)

# 모델 파일 크기(바이트) 저장
model_size_bytes = os.path.getsize(FINAL_H5)
with open(SIZE_TXT, "w", encoding="utf-8") as f:
    f.write(str(model_size_bytes))
# 파라미터 수로 저장하고 싶으면 위 2줄 대신 아래 2줄 사용:
# with open(SIZE_TXT, "w", encoding="utf-8") as f:
#     f.write(str(model.count_params()))

# 학습시간(초) 저장
with open(TIME_TXT, "w", encoding="utf-8") as f:
    f.write(f"{train_time_sec:.6f}")

print("\n저장 완료:")
print(" -", ACC_CSV)
print(" -", F1_CSV)
print(" -", FINAL_H5)
print(" -", TIME_TXT)
print(" -", SIZE_TXT)
# ===== end =====


## UNet

In [2]:
# ===== train_and_log_unet_resized.py (U-Net 기반 각도 분류, 이미지 resize 적용) =====
import os, fnmatch, random, time
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, Callback
from sklearn.metrics import f1_score, accuracy_score
import cv2

print("tf.__version__:", tf.__version__)
print("GPU:", "Available" if tf.config.list_physical_devices('GPU') else "Not available")

# --------------------------
# 설정
# --------------------------
data_root = "/content/drive/MyDrive/2025_한이음_경량핑/data/sampled"
model_output_dir = "/content/drive/MyDrive/2025_한이음_경량핑/model_output"
model_name = "UNet_resized"

epochs = 30
batch_size = 32
lr = 1e-4
target_size = (64, 192)  # height x width, MaxPooling 단계 맞춤

os.makedirs(model_output_dir, exist_ok=True)

# 저장 파일 경로
ACC_CSV = os.path.join(model_output_dir, f"{model_name}_accuracy.csv")
F1_CSV  = os.path.join(model_output_dir, f"{model_name}_f1.csv")
FINAL_H5 = os.path.join(model_output_dir, f"{model_name}_final.h5")
TIME_TXT = os.path.join(model_output_dir, f"{model_name}_time.txt")
SIZE_TXT = os.path.join(model_output_dir, f"{model_name}_size.txt")

# --------------------------
# 데이터 로딩
# --------------------------
train_dir = os.path.join(data_root, "train")
test_dir  = os.path.join(data_root, "test")

def collect_paths_angles(folder, pattern="*.png"):
    paths, labels = [], []
    for fn in os.listdir(folder):
        if fnmatch.fnmatch(fn, pattern):
            path = os.path.join(folder, fn)
            name, _ = os.path.splitext(fn)
            angle_str = name.split('_')[-1]
            try:
                angle = int(angle_str)
            except ValueError:
                raise ValueError(f"파일명에서 각도 추출 실패: {fn}")
            paths.append(path)
            labels.append(angle)
    return paths, labels

X_train, y_train = collect_paths_angles(train_dir)
X_test,  y_test  = collect_paths_angles(test_dir)

assert len(X_train) and len(X_test), "train/test 폴더에 .png 이미지가 있는지 확인하세요."
print(f"Train: {len(X_train)}  |  Test: {len(X_test)}")

# --------------------------
# 이미지 로드 및 resize
# --------------------------
def load_image(path, as_rgb=True, target_size=target_size):
    img = cv2.imread(path, cv2.IMREAD_COLOR)
    if img is None:
        raise FileNotFoundError(f"이미지 로드 실패: {path}")
    if as_rgb:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (target_size[1], target_size[0]))  # width, height 순서
    return img.astype(np.float32)

INPUT_SHAPE = (*target_size, 3)
print("INPUT_SHAPE:", INPUT_SHAPE)

# --------------------------
# 각도 클래스 변환 (45,90,135 → 0,1,2)
# --------------------------
angle_classes = {45: 0, 90: 1, 135: 2}
y_train = [angle_classes[a] for a in y_train]
y_test  = [angle_classes[a] for a in y_test]

# --------------------------
# 데이터 제너레이터
# --------------------------
def image_data_generator(paths, labels, batch):
    n = len(paths)
    while True:
        batch_imgs, batch_lbls = [], []
        for _ in range(batch):
            i = random.randint(0, n-1)
            batch_imgs.append(load_image(paths[i]))
            batch_lbls.append(labels[i])
        yield np.asarray(batch_imgs, dtype=np.float32), np.asarray(batch_lbls, dtype=np.int32)

train_gen = image_data_generator(X_train, y_train, batch_size)
valid_gen = image_data_generator(X_test,  y_test,  batch_size)

steps_per_epoch = max(1, len(X_train)//batch_size)
val_steps       = max(1, len(X_test)//batch_size)

# --------------------------
# 모델 (U-Net + Conv2DTranspose)
# --------------------------
def build_unet_model_fixed(input_shape, num_classes=3):
    inputs = tf.keras.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(32, 3, activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D(2, padding='same')(c1)

    c2 = layers.Conv2D(64, 3, activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(64, 3, activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D(2, padding='same')(c2)

    c3 = layers.Conv2D(128, 3, activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(128, 3, activation='relu', padding='same')(c3)
    p3 = layers.MaxPooling2D(2, padding='same')(c3)

    # Bottleneck
    b = layers.Conv2D(256, 3, activation='relu', padding='same')(p3)
    b = layers.Conv2D(256, 3, activation='relu', padding='same')(b)

    # Decoder
    u3 = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(b)
    u3 = layers.Concatenate()([u3, c3])
    c4 = layers.Conv2D(128, 3, activation='relu', padding='same')(u3)
    c4 = layers.Conv2D(128, 3, activation='relu', padding='same')(c4)

    u2 = layers.Conv2DTranspose(64, 2, strides=2, padding='same')(c4)
    u2 = layers.Concatenate()([u2, c2])
    c5 = layers.Conv2D(64, 3, activation='relu', padding='same')(u2)
    c5 = layers.Conv2D(64, 3, activation='relu', padding='same')(c5)

    u1 = layers.Conv2DTranspose(32, 2, strides=2, padding='same')(c5)
    u1 = layers.Concatenate()([u1, c1])
    c6 = layers.Conv2D(32, 3, activation='relu', padding='same')(u1)
    c6 = layers.Conv2D(32, 3, activation='relu', padding='same')(c6)

    # Classification head
    x = layers.GlobalAveragePooling2D()(c6)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

model = build_unet_model_fixed(INPUT_SHAPE, num_classes=3)
model.summary()

# --------------------------
# 고정 평가셋
# --------------------------
EVAL_MAX = min(200, len(X_test))
eval_paths = X_test[:EVAL_MAX]
eval_labels = np.asarray(y_test[:EVAL_MAX], dtype=np.int32)
eval_images = np.asarray([load_image(p) for p in eval_paths], dtype=np.float32)

# --------------------------
# 콜백: 에폭마다 accuracy / f1 저장
# --------------------------
class SaveAccF1PerEpoch(Callback):
    def __init__(self, X, y, acc_csv, f1_csv):
        super().__init__()
        self.X, self.y = X, y
        self.acc_csv = acc_csv
        self.f1_csv  = f1_csv
        if not os.path.exists(self.acc_csv):
            with open(self.acc_csv, "w", encoding="utf-8") as f:
                f.write("epoch,accuracy\n")
        if not os.path.exists(self.f1_csv):
            with open(self.f1_csv, "w", encoding="utf-8") as f:
                f.write("epoch,f1_macro\n")

    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.X, verbose=0)
        y_pred_cls = np.argmax(y_pred, axis=1)

        acc = accuracy_score(self.y, y_pred_cls)
        f1  = f1_score(self.y, y_pred_cls, average="macro", zero_division=0)

        with open(self.acc_csv, "a", encoding="utf-8") as f:
            f.write(f"{epoch+1},{acc:.6f}\n")
        with open(self.f1_csv, "a", encoding="utf-8") as f:
            f.write(f"{epoch+1},{f1:.6f}\n")

        print(f"[Epoch {epoch+1}] accuracy={acc:.4f} | f1_macro={f1:.4f}")

epoch_cb = SaveAccF1PerEpoch(eval_images, eval_labels, ACC_CSV, F1_CSV)
es_cb   = EarlyStopping(monitor="val_loss", mode="min", patience=3, restore_best_weights=True)

# --------------------------
# 학습
# --------------------------
t0 = time.time()
history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=epochs,
    validation_data=valid_gen,
    validation_steps=val_steps,
    callbacks=[epoch_cb, es_cb],
    verbose=1,
    shuffle=True
)
train_time_sec = time.time() - t0
print(f"학습시간(초): {train_time_sec:.2f}")

# --------------------------
# 모델 저장
# --------------------------
model.save(FINAL_H5)

model_size_bytes = os.path.getsize(FINAL_H5)
with open(SIZE_TXT, "w", encoding="utf-8") as f:
    f.write(str(model_size_bytes))

with open(TIME_TXT, "w", encoding="utf-8") as f:
    f.write(f"{train_time_sec:.6f}")

print("\n저장 완료:")
print(" -", ACC_CSV)
print(" -", F1_CSV)
print(" -", FINAL_H5)
print(" -", TIME_TXT)
print(" -", SIZE_TXT)

# ===== end =====


tf.__version__: 2.19.0
GPU: Available
Train: 4560  |  Test: 1137
INPUT_SHAPE: (64, 192, 3)


Epoch 1/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12s/step - accuracy: 0.3368 - loss: 2.8585 [Epoch 1] accuracy=0.0000 | f1_macro=0.0000
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1658s[0m 12s/step - accuracy: 0.3367 - loss: 2.8498 - val_accuracy: 0.3330 - val_loss: 1.1001
Epoch 2/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5s/step - accuracy: 0.3416 - loss: 1.1016[Epoch 2] accuracy=0.4150 | f1_macro=0.1955
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m651s[0m 5s/step - accuracy: 0.3416 - loss: 1.1016 - val_accuracy: 0.5446 - val_loss: 1.0919
Epoch 3/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2s/step - accuracy: 0.3749 - loss: 1.0918[Epoch 3] accuracy=0.1000 | f1_macro=0.0909
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m263s[0m 2s/step - accuracy: 0.3749 - loss: 1.0918 - val_accuracy: 0.3893 - val_loss: 1.0665
Epoch 4/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[



학습시간(초): 3126.10

저장 완료:
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UNet_resized_accuracy.csv
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UNet_resized_f1.csv
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UNet_resized_final.h5
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UNet_resized_time.txt
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UNet_resized_size.txt


## SCNN

In [None]:
# ===== train_and_log_scnn.py (SCNN 기반 각도 분류, 이미지 resize 적용) =====
import os, fnmatch, random, time
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, Callback
from sklearn.metrics import f1_score, accuracy_score
import cv2

print("tf.__version__:", tf.__version__)
print("GPU:", "Available" if tf.config.list_physical_devices('GPU') else "Not available")

# --------------------------
# 설정
# --------------------------
data_root = "/content/drive/MyDrive/2025_한이음_경량핑/data/sampled"
model_output_dir = "/content/drive/MyDrive/2025_한이음_경량핑/model_output"
model_name = "SCNN_resized"

epochs = 30
batch_size = 32
lr = 1e-4
target_size = (64, 192)  # height x width, MaxPooling 단계 맞춤

os.makedirs(model_output_dir, exist_ok=True)

# 저장 파일 경로
ACC_CSV = os.path.join(model_output_dir, f"{model_name}_accuracy.csv")
F1_CSV  = os.path.join(model_output_dir, f"{model_name}_f1.csv")
FINAL_H5 = os.path.join(model_output_dir, f"{model_name}_final.h5")
TIME_TXT = os.path.join(model_output_dir, f"{model_name}_time.txt")
SIZE_TXT = os.path.join(model_output_dir, f"{model_name}_size.txt")

# --------------------------
# 데이터 로딩
# --------------------------
train_dir = os.path.join(data_root, "train")
test_dir  = os.path.join(data_root, "test")

def collect_paths_angles(folder, pattern="*.png"):
    paths, labels = [], []
    for fn in os.listdir(folder):
        if fnmatch.fnmatch(fn, pattern):
            path = os.path.join(folder, fn)
            name, _ = os.path.splitext(fn)
            angle_str = name.split('_')[-1]
            try:
                angle = int(angle_str)
            except ValueError:
                raise ValueError(f"파일명에서 각도 추출 실패: {fn}")
            paths.append(path)
            labels.append(angle)
    return paths, labels

X_train, y_train = collect_paths_angles(train_dir)
X_test,  y_test  = collect_paths_angles(test_dir)

assert len(X_train) and len(X_test), "train/test 폴더에 .png 이미지가 있는지 확인하세요."
print(f"Train: {len(X_train)}  |  Test: {len(X_test)}")

# --------------------------
# 이미지 로드 및 resize
# --------------------------
def load_image(path, as_rgb=True, target_size=target_size):
    img = cv2.imread(path, cv2.IMREAD_COLOR)
    if img is None:
        raise FileNotFoundError(f"이미지 로드 실패: {path}")
    if as_rgb:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (target_size[1], target_size[0]))  # width, height 순서
    return img.astype(np.float32)

INPUT_SHAPE = (*target_size, 3)
print("INPUT_SHAPE:", INPUT_SHAPE)

# --------------------------
# 각도 클래스 변환 (45,90,135 → 0,1,2)
# --------------------------
angle_classes = {45: 0, 90: 1, 135: 2}
y_train = [angle_classes[a] for a in y_train]
y_test  = [angle_classes[a] for a in y_test]

# --------------------------
# 데이터 제너레이터
# --------------------------
def image_data_generator(paths, labels, batch):
    n = len(paths)
    while True:
        batch_imgs, batch_lbls = [], []
        for _ in range(batch):
            i = random.randint(0, n-1)
            batch_imgs.append(load_image(paths[i]))
            batch_lbls.append(labels[i])
        yield np.asarray(batch_imgs, dtype=np.float32), np.asarray(batch_lbls, dtype=np.int32)

train_gen = image_data_generator(X_train, y_train, batch_size)
valid_gen = image_data_generator(X_test,  y_test,  batch_size)

steps_per_epoch = max(1, len(X_train)//batch_size)
val_steps       = max(1, len(X_test)//batch_size)

# --------------------------
# SCNN 모델 구조 정의
# --------------------------
def build_scnn_model(input_shape, num_classes=3):
    inputs = tf.keras.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(32, 3, activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D(2, padding='same')(c1)

    c2 = layers.Conv2D(64, 3, activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(64, 3, activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D(2, padding='same')(c2)

    c3 = layers.Conv2D(128, 3, activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(128, 3, activation='relu', padding='same')(c3)
    p3 = layers.MaxPooling2D(2, padding='same')(c3)

    # Bottleneck
    b = layers.Conv2D(256, 3, activation='relu', padding='same')(p3)
    b = layers.Conv2D(256, 3, activation='relu', padding='same')(b)

    # SCNN Layer: spatial message passing
    scnn_row = layers.Conv2D(256, (1, 3), activation='relu', padding='same')(b)
    scnn_col = layers.Conv2D(256, (3, 1), activation='relu', padding='same')(b)

    # Decoder
    u3 = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(scnn_row)
    u3 = layers.Concatenate()([u3, c3])
    c4 = layers.Conv2D(128, 3, activation='relu', padding='same')(u3)
    c4 = layers.Conv2D(128, 3, activation='relu', padding='same')(c4)

    u2 = layers.Conv2DTranspose(64, 2, strides=2, padding='same')(c4)
    u2 = layers.Concatenate()([u2, c2])
    c5 = layers.Conv2D(64, 3, activation='relu', padding='same')(u2)
    c5 = layers.Conv2D(64, 3, activation='relu', padding='same')(c5)

    u1 = layers.Conv2DTranspose(32, 2, strides=2, padding='same')(c5)
    u1 = layers.Concatenate()([u1, c1])
    c6 = layers.Conv2D(32, 3, activation='relu', padding='same')(u1)
    c6 = layers.Conv2D(32, 3, activation='relu', padding='same')(c6)

    # Classification head
    x = layers.GlobalAveragePooling2D()(c6)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

model = build_scnn_model(INPUT_SHAPE, num_classes=3)
model.summary()

# --------------------------
# 고정 평가셋
# --------------------------
EVAL_MAX = min(200, len(X_test))
eval_paths = X_test[:EVAL_MAX]
eval_labels = np.asarray(y_test[:EVAL_MAX], dtype=np.int32)
eval_images = np.asarray([load_image(p) for p in eval_paths], dtype=np.float32)

# --------------------------
# 콜백: 에폭마다 accuracy / f1 저장
# --------------------------
class SaveAccF1PerEpoch(Callback):
    def __init__(self, X, y, acc_csv, f1_csv):
        super().__init__()
        self.X, self.y = X, y
        self.acc_csv = acc_csv
        self.f1_csv  = f1_csv
        if not os.path.exists(self.acc_csv):
            with open(self.acc_csv, "w", encoding="utf-8") as f:
                f.write("epoch,accuracy\n")
        if not os.path.exists(self.f1_csv):
            with open(self.f1_csv, "w", encoding="utf-8") as f:
                f.write("epoch,f1_macro\n")

    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.X, verbose=0)
        y_pred_cls = np.argmax(y_pred, axis=1)

        acc = accuracy_score(self.y, y_pred_cls)
        f1  = f1_score(self.y, y_pred_cls, average="macro", zero_division=0)

        with open(self.acc_csv, "a", encoding="utf-8") as f:
            f.write(f"{epoch+1},{acc:.6f}\n")
        with open(self.f1_csv, "a", encoding="utf-8") as f:
            f.write(f"{epoch+1},{f1:.6f}\n")

        print(f"[Epoch {epoch+1}] accuracy={acc:.4f} | f1_macro={f1:.4f}")

epoch_cb = SaveAccF1PerEpoch(eval_images, eval_labels, ACC_CSV, F1_CSV)
es_cb   = EarlyStopping(monitor="val_loss", mode="min", patience=3, restore_best_weights=True)

# --------------------------
# 학습
# --------------------------
t0 = time.time()
history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=epochs,
    validation_data=valid_gen,
    validation_steps=val_steps,
    callbacks=[epoch_cb, es_cb],
    verbose=1,
    shuffle=True
)
train_time_sec = time.time() - t0
print(f"학습시간(초): {train_time_sec:.2f}")

# --------------------------
# 모델 저장
# --------------------------
model.save(FINAL_H5)

model_size_bytes = os.path.getsize(FINAL_H5)
with open(SIZE_TXT, "w", encoding="utf-8") as f:
    f.write(str(model_size_bytes))

with open(TIME_TXT, "w", encoding="utf-8") as f:
    f.write(f"{train_time_sec:.6f}")

print("\n저장 완료:")
print(" -", ACC_CSV)
print(" -", F1_CSV)
print(" -", FINAL_H5)
print(" -", TIME_TXT)
print(" -", SIZE_TXT)

# ===== end =====


## UFLD

In [11]:
# ===== train_and_log_ufld_from_scnn.py =====
import os, fnmatch, random, time
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, Callback
from sklearn.metrics import f1_score, accuracy_score
import cv2

print("tf.__version__:", tf.__version__)
print("GPU:", "Available" if tf.config.list_physical_devices('GPU') else "Not available")

# --------------------------
# 설정
# --------------------------
data_root = "/content/drive/MyDrive/2025_한이음_경량핑/data/sampled"
model_output_dir = "/content/drive/MyDrive/2025_한이음_경량핑/model_output"
model_name = "UFLD_resized"

epochs = 30
batch_size = 32
lr = 1e-4
target_size = (64, 192)  # H, W

os.makedirs(model_output_dir, exist_ok=True)

# 저장 파일 경로
ACC_CSV = os.path.join(model_output_dir, f"{model_name}_accuracy.csv")
F1_CSV  = os.path.join(model_output_dir, f"{model_name}_f1.csv")
FINAL_H5 = os.path.join(model_output_dir, f"{model_name}_final.h5")
TIME_TXT = os.path.join(model_output_dir, f"{model_name}_time.txt")
SIZE_TXT = os.path.join(model_output_dir, f"{model_name}_size.txt")

# --------------------------
# 데이터 로딩
# --------------------------
train_dir = os.path.join(data_root, "train")
test_dir  = os.path.join(data_root, "test")

def collect_paths_angles(folder, pattern="*.png"):
    paths, labels = [], []
    for fn in os.listdir(folder):
        if fnmatch.fnmatch(fn, pattern):
            path = os.path.join(folder, fn)
            name, _ = os.path.splitext(fn)
            angle_str = name.split('_')[-1]
            try:
                angle = int(angle_str)
            except ValueError:
                raise ValueError(f"파일명에서 각도 추출 실패: {fn}")
            paths.append(path)
            labels.append(angle)
    return paths, labels

X_train, y_train = collect_paths_angles(train_dir)
X_test,  y_test  = collect_paths_angles(test_dir)
print(f"Train: {len(X_train)}  |  Test: {len(X_test)}")

# --------------------------
# 이미지 로드 및 resize
# --------------------------
def load_image(path, as_rgb=True, target_size=target_size):
    img = cv2.imread(path, cv2.IMREAD_COLOR)
    if img is None:
        raise FileNotFoundError(f"이미지 로드 실패: {path}")
    if as_rgb:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (target_size[1], target_size[0]))
    img = img.astype(np.float32) / 255.0
    return img

INPUT_SHAPE = (*target_size, 3)
print("INPUT_SHAPE:", INPUT_SHAPE)

# --------------------------
# 각도 클래스 변환
# --------------------------
angle_classes = {45:0, 90:1, 135:2}
y_train = [angle_classes[a] for a in y_train]
y_test  = [angle_classes[a] for a in y_test]
NUM_ANGLE_CLASSES = 3

# --------------------------
# 데이터 제너레이터
# --------------------------
def image_data_generator(paths, labels, batch):
    n = len(paths)
    while True:
        batch_imgs, batch_lbls = [], []
        for _ in range(batch):
            i = random.randint(0, n-1)
            batch_imgs.append(load_image(paths[i]))
            batch_lbls.append(labels[i])
        yield np.asarray(batch_imgs, dtype=np.float32), np.asarray(batch_lbls, dtype=np.int32)

train_gen = image_data_generator(X_train, y_train, batch_size)
valid_gen = image_data_generator(X_test,  y_test,  batch_size)

steps_per_epoch = max(1, len(X_train)//batch_size)
val_steps       = max(1, len(X_test)//batch_size)

# --------------------------
# ResNet-18 backbone 정의
# --------------------------
def conv_bn_relu(x, filters, k=3, s=1):
    x = layers.Conv2D(filters, k, strides=s, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    return x

def basic_block(x, filters, s=1):
    shortcut = x
    x = conv_bn_relu(x, filters, 3, s)
    x = layers.Conv2D(filters, 3, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    if shortcut.shape[-1] != filters or s != 1:
        shortcut = layers.Conv2D(filters, 1, strides=s, padding='same', use_bias=False)(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)
    x = layers.Add()([x, shortcut])
    x = layers.ReLU()(x)
    return x

def resnet18_backbone(inputs):
    x = conv_bn_relu(inputs, 32, 7, 2)
    x = layers.MaxPooling2D(3, strides=2, padding='same')(x)
    # layer1
    x = basic_block(x, 32, 1)
    x = basic_block(x, 32, 1)
    # layer2
    x = basic_block(x, 64, 2)
    x = basic_block(x, 64, 1)
    # layer3
    x = basic_block(x, 128, 2)
    x = basic_block(x, 128, 1)
    # layer4
    x = basic_block(x, 256, 2)
    x = basic_block(x, 256, 1)
    return x

# --------------------------
# UFLD 모델 정의 (lane loss 없이 angle head만)
# --------------------------
def build_ufld_model(input_shape, num_angle_classes=3, lr=1e-4):
    inputs = layers.Input(shape=input_shape)
    feat_map = resnet18_backbone(inputs)
    gap = layers.GlobalAveragePooling2D()(feat_map)

    # Angle head
    x = layers.Dense(128, activation='relu')(gap)
    x = layers.Dropout(0.5)(x)
    angle_out = layers.Dense(num_angle_classes, activation='softmax', name="angle_cls")(x)

    model = tf.keras.Model(inputs, angle_out)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

model = build_ufld_model(INPUT_SHAPE, num_angle_classes=NUM_ANGLE_CLASSES, lr=lr)
model.summary()

# --------------------------
# 고정 평가셋
# --------------------------
EVAL_MAX = min(200, len(X_test))
eval_paths = X_test[:EVAL_MAX]
eval_labels = np.asarray(y_test[:EVAL_MAX], dtype=np.int32)
eval_images = np.asarray([load_image(p) for p in eval_paths], dtype=np.float32)

# --------------------------
# 콜백: 에폭마다 accuracy / f1 저장
# --------------------------
class SaveAccF1PerEpoch(Callback):
    def __init__(self, X, y, acc_csv, f1_csv):
        super().__init__()
        self.X, self.y = X, y
        self.acc_csv = acc_csv
        self.f1_csv  = f1_csv
        if not os.path.exists(self.acc_csv):
            with open(self.acc_csv, "w") as f: f.write("epoch,accuracy\n")
        if not os.path.exists(self.f1_csv):
            with open(self.f1_csv, "w") as f: f.write("epoch,f1_macro\n")
    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.X, verbose=0)
        y_pred_cls = np.argmax(y_pred, axis=1)
        acc = accuracy_score(self.y, y_pred_cls)
        f1  = f1_score(self.y, y_pred_cls, average="macro", zero_division=0)
        with open(self.acc_csv, "a") as f: f.write(f"{epoch+1},{acc:.6f}\n")
        with open(self.f1_csv, "a") as f: f.write(f"{epoch+1},{f1:.6f}\n")
        print(f"[Epoch {epoch+1}] accuracy={acc:.4f} | f1_macro={f1:.4f}")

epoch_cb = SaveAccF1PerEpoch(eval_images, eval_labels, ACC_CSV, F1_CSV)
es_cb = EarlyStopping(monitor="val_loss", mode="min", patience=3, restore_best_weights=True)

# --------------------------
# 학습
# --------------------------
t0 = time.time()
history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    validation_data=valid_gen,
    validation_steps=val_steps,
    epochs=epochs,
    callbacks=[epoch_cb, es_cb],
    verbose=1,
    shuffle=True
)
train_time_sec = time.time() - t0
print(f"학습시간(초): {train_time_sec:.2f}")

# --------------------------
# 모델 저장
# --------------------------
model.save(FINAL_H5)
model_size_bytes = os.path.getsize(FINAL_H5)
with open(SIZE_TXT, "w") as f: f.write(str(model_size_bytes))
with open(TIME_TXT, "w") as f: f.write(f"{train_time_sec:.6f}")

print("\n저장 완료:")
print(" -", ACC_CSV)
print(" -", F1_CSV)
print(" -", FINAL_H5)
print(" -", TIME_TXT)
print(" -", SIZE_TXT)

tf.__version__: 2.19.0
GPU: Available
Train: 4560  |  Test: 1137
INPUT_SHAPE: (64, 192, 3)


Epoch 1/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17s/step - accuracy: 0.6102 - loss: 0.8073 [Epoch 1] accuracy=1.0000 | f1_macro=1.0000
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2478s[0m 17s/step - accuracy: 0.6107 - loss: 0.8064 - val_accuracy: 0.3232 - val_loss: 1.5432
Epoch 2/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6s/step - accuracy: 0.7223 - loss: 0.5794[Epoch 2] accuracy=1.0000 | f1_macro=1.0000
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m894s[0m 6s/step - accuracy: 0.7224 - loss: 0.5793 - val_accuracy: 0.3411 - val_loss: 1.2641
Epoch 3/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - accuracy: 0.7754 - loss: 0.5026[Epoch 3] accuracy=0.1150 | f1_macro=0.1031
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m383s[0m 3s/step - accuracy: 0.7754 - loss: 0.5026 - val_accuracy: 0.3536 - val_loss: 1.6227
Epoch 4/30
[1m142/142[0m [32m━━━━━━━━━━━━━━━━━━━━[



학습시간(초): 4212.93

저장 완료:
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UFLD_resized_accuracy.csv
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UFLD_resized_f1.csv
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UFLD_resized_final.h5
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UFLD_resized_time.txt
 - /content/drive/MyDrive/2025_한이음_경량핑/model_output/UFLD_resized_size.txt
