FPL

In [1]:
import sys
from pathlib import Path

ROOT = Path.cwd().resolve()   # 보통 FPL 폴더에서 노트북 실행중이면 이게 루트
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))


import os, sys
from pathlib import Path
import cv2
import numpy as np
import pandas as pd

# 현재 노트북이 있는 폴더(FPL) 기준으로 src 절대경로 계산
SRC_DIR = (Path(os.getcwd()) / "src").resolve()

print("CWD:", os.getcwd())
print("SRC_DIR:", SRC_DIR)
print("SRC exists?:", SRC_DIR.exists())

sys.path.insert(0, str(SRC_DIR))
print("sys.path[0]:", sys.path[0])

from src.fpl_data_io import build_image_index, load_dataset
from src.fpl_features import extract_color_hs_3x3, extract_hog_3x3
from src.fpl_models import encode_road_labels, train_color_svm, train_hog_pca_svm_by_dims
from src.fpl_fusion import fuse_probabilities
from src.fpl_metrics import eval_accuracy, eval_confusion, eval_report
from src.fpl_detail_models import train_and_save_detail_models
from src.fpl_knn_models import train_and_save_knn_models

from src.fpl_features import (
    extract_color_hs_full,
    extract_hog_full,
    extract_lbp_full,
)

from src.fpl_models import (
    encode_road_labels,
    train_color_svm,
    train_lbp_svm,
    train_hog_pca_svm_by_dims,
    eval_svm,
    fit_sigmoid_calibrator,
    predict_proba_custom,
    fuse_probabilities,
    evaluate_fusion,
)

print("imports OK")


CWD: /home/hanseong/vscode/ML_code/FPL/FPL
SRC_DIR: /home/hanseong/vscode/ML_code/FPL/FPL/src
SRC exists?: True
sys.path[0]: /home/hanseong/vscode/ML_code/FPL/FPL/src
imports OK


In [2]:
# === PATHS ===
TRAINING_LABEL_CSV = "/home/hanseong/gdrive/ML_FPL_training_data/training_labels_plus.csv"
TEST_LABEL_CSV  = "/home/hanseong/gdrive/ML_FPL_test_data/test_labels_plus.csv"

test_path = "/home/hanseong/gdrive/ML_FPL_test_data"
training_path = "/home/hanseong/gdrive/ML_FPL_training_data"



In [3]:
# training/test 폴더 둘 다 훑어서 filename -> fullpath 인덱스 생성
image_index = build_image_index(training_path, test_path)

print("indexed files:", len(image_index))
print("sample:", list(image_index.items())[:3])


indexed files: 2974
sample: [('donhwamunro_11_da_A_raw_0824.jpg', '/home/hanseong/gdrive/ML_FPL_training_data/jpg/donhwamunro_11_da_A_raw_0824.jpg'), ('donhwamunro_11_da_A_raw_0825.jpg', '/home/hanseong/gdrive/ML_FPL_training_data/jpg/donhwamunro_11_da_A_raw_0825.jpg'), ('donhwamunro_11_da_A_raw_0826.jpg', '/home/hanseong/gdrive/ML_FPL_training_data/jpg/donhwamunro_11_da_A_raw_0826.jpg')]


In [4]:
train = load_dataset(TRAINING_LABEL_CSV, image_index, resize=True)
test  = load_dataset(TEST_LABEL_CSV, image_index, resize=True)

Training_origin_data = train["images"]
Test_data = test["images"]

training_road_label = train["road_labels"]
test_road_label = test["road_labels"]

training_photo_id = train["photo_ids"]
test_photo_id = test["photo_ids"]

training_detail = train["details"]
test_detail = test["details"]

training_filename = train["filenames"]
test_filename = test["filenames"]

print("Loaded train images:", len(Training_origin_data), "missed:", len(train["missed"]))
print("Loaded test  images:", len(Test_data), "missed:", len(test["missed"]))

print("Train sample image shapes:", {Training_origin_data[i].shape for i in range(min(5, len(Training_origin_data)))})
print("Test  sample image shapes:", {Test_data[i].shape for i in range(min(5, len(Test_data)))})


Loaded train images: 2231 missed: 0
Loaded test  images: 743 missed: 0
Train sample image shapes: {(682, 1024, 3)}
Test  sample image shapes: {(682, 1024, 3)}


In [5]:
training_x = train["xs"]; training_y = train["ys"]
test_x     = test["xs"];  test_y     = test["ys"]


In [6]:
y_train_road, y_test_road, road_label_map = encode_road_labels(training_road_label, test_road_label)

print("num roads:", len(road_label_map))
print("y_train_road unique:", np.unique(y_train_road))
print("y_test_road  unique:", np.unique(y_test_road))

# 매핑 일부 확인
items = list(road_label_map.items())
print("road_label_map sample:", items[:20])


num roads: 12
y_train_road unique: [ 0  1  2  3  4  5  6  7  8  9 10 11]
y_test_road  unique: [ 0  1  2  3  4  5  6  7  8  9 10 11]
road_label_map sample: [('donhwamunro', 0), ('donhwamunro_11', 1), ('donhwamunro_11_da', 2), ('donhwamunro_11_ga', 3), ('donhwamunro_11_na', 4), ('samildaero', 5), ('samildaero_26', 6), ('samildaero_28', 7), ('samildaero_30', 8), ('samildaero_32', 9), ('samildaero_32_ga', 10), ('suporo_28', 11)]


In [7]:
# HOG
X_hog_train = extract_hog_3x3(
    Training_origin_data,
    hog_size=(128, 128),
    orientations=9,
    pixels_per_cell=(8, 8),
    cells_per_block=(2, 2)
)
X_hog_test = extract_hog_3x3(
    Test_data,
    hog_size=(128, 128),
    orientations=9,
    pixels_per_cell=(8, 8),
    cells_per_block=(2, 2)
)

print("X_hog_train:", X_hog_train.shape)
print("X_hog_test :", X_hog_test.shape)

# Color (H,S)
X_color_train = extract_color_hs_3x3(Training_origin_data, h_bins=30, s_bins=32)
X_color_test  = extract_color_hs_3x3(Test_data, h_bins=30, s_bins=32)

print("X_color_train:", X_color_train.shape)
print("X_color_test :", X_color_test.shape)


X_hog_train: (2231, 72900)
X_hog_test : (743, 72900)
X_color_train: (2231, 558)
X_color_test : (743, 558)


In [21]:
from src.fpl_models import train_color_svm, train_hog_pca_svm_by_dims
color_svm = train_color_svm(X_color_train, y_train_road, C=10, gamma="scale")
P_color_test = color_svm.predict_proba(X_color_test)

print("P_color_test:", P_color_test.shape)

# 1) test 예측 라벨 (확률이 가장 큰 클래스)
y_pred_color = np.argmax(P_color_test, axis=1)

# 2) 정확도
acc_color = eval_accuracy(y_test_road, y_pred_color)
print("Color SVM Test Accuracy:", acc_color)

# 3) confusion matrix + report
cm_color = eval_confusion(y_test_road, y_pred_color)
print("\nColor SVM Confusion Matrix:\n", cm_color)

print("\nColor SVM Classification Report:\n")
print(eval_report(y_test_road, y_pred_color))


P_color_test: (743, 12)
Color SVM Test Accuracy: 0.7254374158815612

Color SVM Confusion Matrix:
 [[49  1  0  0  6  0  0  0  2  2  2  0]
 [ 2 35  0  0  1  0  0  2  0  0  1  2]
 [ 0  0 39  1  3  2  2  1  1  1  0  8]
 [ 2  0  0 44 11  3  0  0  2  3  5  2]
 [ 1  1  5  9 65  3  0  0  4  5  6  1]
 [ 2  1  2  1  1 51  0  1  0  3  2  2]
 [ 0  1  1  2  0  0 21  1  0  0  0  0]
 [ 0  3  1  2  2  1  0 25  0  1  1  2]
 [ 0  2  0  0  4  1  0  1 47  3  0  1]
 [ 1  0  0  2  8  4  0  2  3 58  0  1]
 [ 0  0  1  2  6  3  0  0  1  2 27  1]
 [ 2  1  2  3  5  1  3  0  0  0  2 78]]

Color SVM Classification Report:

              precision    recall  f1-score   support

           0     0.8305    0.7903    0.8099        62
           1     0.7778    0.8140    0.7955        43
           2     0.7647    0.6724    0.7156        58
           3     0.6667    0.6111    0.6377        72
           4     0.5804    0.6500    0.6132       100
           5     0.7391    0.7727    0.7556        66
           6     0.

In [22]:
PCA_DIMS = [2, 8, 16, 32, 64, 128, 256]  # 원하는 대로 조절

hog_pack = train_hog_pca_svm_by_dims(
    X_train=X_hog_train,
    y_train=y_train_road,
    X_test=X_hog_test,
    y_test=y_test_road,
    pca_dims=PCA_DIMS,
    C=10,
    gamma="scale"
)

hog_svm_models = hog_pack["hog_svm_models"]
hog_pca_test_features = hog_pack["hog_pca_test_features"]
hog_test_acc = hog_pack["test_acc"]
hog_time_report = hog_pack["time_report"]

print("\n===== HOG+PCA SVM summary =====")
for d in PCA_DIMS:
    tr = hog_time_report[d]
    print(
        f"PCA {d:>3d} | "
        f"Test Acc={hog_test_acc[d]:.4f} | "
        f"scale={tr['scale_sec']:.3f}s pca={tr['pca_sec']:.3f}s svm={tr['svm_fit_sec']:.3f}s total={tr['total_sec']:.3f}s"
    )



===== HOG+PCA SVM summary =====
PCA   2 | Test Acc=0.1965 | scale=1.120s pca=0.638s svm=0.857s total=2.797s
PCA   8 | Test Acc=0.3580 | scale=1.138s pca=0.727s svm=0.750s total=2.790s
PCA  16 | Test Acc=0.4980 | scale=1.096s pca=0.866s svm=0.946s total=3.095s
PCA  32 | Test Acc=0.5236 | scale=1.127s pca=1.029s svm=0.995s total=3.350s
PCA  64 | Test Acc=0.5141 | scale=1.124s pca=1.536s svm=1.094s total=3.999s
PCA 128 | Test Acc=0.4926 | scale=1.143s pca=2.525s svm=1.436s total=5.451s
PCA 256 | Test Acc=0.4293 | scale=1.122s pca=2.960s svm=2.457s total=7.179s


ALPHA Line regression

In [23]:
from sklearn.linear_model import LogisticRegression

# =========================
# 입력 전제(이미 있어야 함)
# =========================
# PCA_DIMS: list[int]
# hog_svm_models: dict[dim -> SVC(probability=True)]
# hog_pca_test_features: dict[dim -> np.ndarray (N_test, dim)]
# P_color_test: np.ndarray (N_test, K)
# y_test_road: np.ndarray (N_test,)
# eval_accuracy: 함수(없으면 accuracy_score로 대체 가능)

fusion_lr_models = {}      # dim -> LogisticRegression
fusion_alpha = {}          # dim -> float (shape vs color 요약 alpha)
P_fusion_by_dim = {}       # dim -> np.ndarray (N_test, K)
fusion_acc = {}            # dim -> float

K = P_color_test.shape[1]

print("===== LR Fusion per PCA dim =====")

for d in PCA_DIMS:
    # 1) HOG 확률 (N_test, K)
    P_shape_test = hog_svm_models[d].predict_proba(hog_pca_test_features[d])

    # shape 체크
    assert P_shape_test.shape == P_color_test.shape, (
        f"[dim={d}] P_shape_test {P_shape_test.shape} vs P_color_test {P_color_test.shape} mismatch"
    )

    # 2) fusion 입력 (N_test, 2K)
    X_fuse = np.hstack([P_shape_test, P_color_test])

    # 3) 회귀(로지스틱)로 fusion 학습 (요구대로 test 라벨 사용)
    lr = LogisticRegression(
    solver="lbfgs",
    max_iter=2000
    )

    lr.fit(X_fuse, y_test_road)

    # 4) fusion 확률 출력 (N_test, K)
    P_fusion_test = lr.predict_proba(X_fuse)

    # 5) 정확도(참고)
    y_pred = np.argmax(P_fusion_test, axis=1).astype(np.int64)
    acc = eval_accuracy(y_test_road, y_pred)

    # 6) "alpha(형태 비중)" 요약값 계산
    # lr.coef_ shape: (K, 2K)
    W = lr.coef_
    w_shape = np.mean(np.abs(W[:, :K]))
    w_color = np.mean(np.abs(W[:, K:]))

    alpha_hat = float(w_shape / (w_shape + w_color + 1e-12))  # 0~1, 형태 영향 비중

    # 저장
    fusion_lr_models[d] = lr
    fusion_alpha[d] = alpha_hat
    P_fusion_by_dim[d] = P_fusion_test
    fusion_acc[d] = float(acc)

    # 출력
    print(f"[PCA dim={d:>3d}] alpha_hat(shape)={alpha_hat:.3f} | TestAcc={acc:.4f} | P_fusion_test={P_fusion_test.shape}")

# dim별 요약
print("\n===== Summary =====")
for d in PCA_DIMS:
    print(f"dim={d:>3d} | alpha_hat={fusion_alpha[d]:.3f} | acc={fusion_acc[d]:.4f} | P_fusion={P_fusion_by_dim[d].shape}")


===== LR Fusion per PCA dim =====
[PCA dim=  2] alpha_hat(shape)=0.205 | TestAcc=0.7362 | P_fusion_test=(743, 12)
[PCA dim=  8] alpha_hat(shape)=0.315 | TestAcc=0.7389 | P_fusion_test=(743, 12)
[PCA dim= 16] alpha_hat(shape)=0.374 | TestAcc=0.7631 | P_fusion_test=(743, 12)
[PCA dim= 32] alpha_hat(shape)=0.385 | TestAcc=0.7550 | P_fusion_test=(743, 12)
[PCA dim= 64] alpha_hat(shape)=0.386 | TestAcc=0.7604 | P_fusion_test=(743, 12)
[PCA dim=128] alpha_hat(shape)=0.389 | TestAcc=0.7725 | P_fusion_test=(743, 12)
[PCA dim=256] alpha_hat(shape)=0.372 | TestAcc=0.7577 | P_fusion_test=(743, 12)

===== Summary =====
dim=  2 | alpha_hat=0.205 | acc=0.7362 | P_fusion=(743, 12)
dim=  8 | alpha_hat=0.315 | acc=0.7389 | P_fusion=(743, 12)
dim= 16 | alpha_hat=0.374 | acc=0.7631 | P_fusion=(743, 12)
dim= 32 | alpha_hat=0.385 | acc=0.7550 | P_fusion=(743, 12)
dim= 64 | alpha_hat=0.386 | acc=0.7604 | P_fusion=(743, 12)
dim=128 | alpha_hat=0.389 | acc=0.7725 | P_fusion=(743, 12)
dim=256 | alpha_hat=0.372

In [24]:
best_dim = max(fusion_acc, key=fusion_acc.get)

y_pred_best = np.argmax(P_fusion_by_dim[best_dim], axis=1)

cm = eval_confusion(y_test_road, y_pred_best)
print(f"Confusion Matrix (LR-Fusion, PCA={best_dim}):\n", cm)

print(f"\nClassification Report (LR-Fusion, PCA={best_dim}):\n")
print(eval_report(y_test_road, y_pred_best))


Confusion Matrix (LR-Fusion, PCA=128):
 [[51  1  0  0  5  0  0  0  1  2  2  0]
 [ 2 35  0  1  3  0  0  0  0  0  1  1]
 [ 0  0 41  1  2  1  0  1  0  1  0 11]
 [ 1  0  0 51 10  2  0  0  2  1  2  3]
 [ 1  1  3  8 71  2  0  0  2  3  5  4]
 [ 0  1  2  0  3 54  0  1  0  1  2  2]
 [ 0  1  1  2  0  0 21  1  0  0  0  0]
 [ 1  2  1  2  1  1  0 26  0  1  1  2]
 [ 0  2  0  0  3  1  0  1 48  3  0  1]
 [ 1  0  0  2  8  2  0  1  2 63  0  0]
 [ 0  0  0  1  8  2  0  0  1  2 28  1]
 [ 1  0  1  1  8  1  0  0  0  0  0 85]]

Classification Report (LR-Fusion, PCA=128):

              precision    recall  f1-score   support

           0     0.8793    0.8226    0.8500        62
           1     0.8140    0.8140    0.8140        43
           2     0.8367    0.7069    0.7664        58
           3     0.7391    0.7083    0.7234        72
           4     0.5820    0.7100    0.6396       100
           5     0.8182    0.8182    0.8182        66
           6     1.0000    0.8077    0.8936        26
           7

models save

In [25]:

import joblib

MODEL_DIR = "FPL_models"
os.makedirs(MODEL_DIR, exist_ok=True)

# =========================
# best_dim 기준 모델 선택
# =========================
best_dim = best_dim   # 이미 위에서 계산된 값 사용

hog_scaler = hog_pack["hog_pca_models"][best_dim][0]
hog_pca    = hog_pack["hog_pca_models"][best_dim][1]
hog_svm    = hog_pack["hog_svm_models"][best_dim]

# LR-fusion 모델 (있다면)
best_fusion_lr = fusion_lr_models[best_dim]

# =========================
# 모델 저장
# =========================
joblib.dump(hog_svm,        f"{MODEL_DIR}/hog_svm_dim{best_dim}.pkl")
joblib.dump(hog_pca,        f"{MODEL_DIR}/hog_pca_dim{best_dim}.pkl")
joblib.dump(hog_scaler,     f"{MODEL_DIR}/hog_scaler_dim{best_dim}.pkl")

joblib.dump(color_svm,      f"{MODEL_DIR}/color_svm.pkl")
joblib.dump(best_fusion_lr, f"{MODEL_DIR}/fusion_lr_dim{best_dim}.pkl")

joblib.dump(road_label_map, f"{MODEL_DIR}/road_label_map.pkl")

print("Models saved with best_dim =", best_dim)


Models saved with best_dim = 128


In [26]:
import importlib, inspect
import fpl_knn_models
import fpl_detail_models

print("loaded from:", fpl_knn_models.__file__)
importlib.reload(fpl_knn_models)
print("loaded from:", fpl_detail_models.__file__)
importlib.reload(fpl_detail_models)

print("new signature:", inspect.signature(fpl_knn_models.train_and_save_knn_models))

# ✅ 이 줄이 중요: 로컬 이름을 '새 함수'로 다시 바인딩
train_and_save_knn_models = fpl_knn_models.train_and_save_knn_models


loaded from: /home/hanseong/vscode/ML_code/FPL/FPL/src/fpl_knn_models.py
loaded from: /home/hanseong/vscode/ML_code/FPL/FPL/src/fpl_detail_models.py
new signature: (X_hog_train, X_color_train, training_road_label, training_detail, training_x, training_y, out_dir, hog_pca_dim=128, n_neighbors=7, detail_roads={'donhwamunro_11_da', 'donhwamunro_11_na', 'donhwamunro_11_ga', 'suporo_28'}, min_samples=10, training_paths=None, feature_tag='full')


In [27]:
MODEL_DIR = "FPL_models"  
ALPHA_FIXED = float(fusion_alpha[best_dim])   # 모든 디테일 도로에 동일 적용
PCA_DIM_DETAIL = best_dim                     # 디테일 HOG PCA도 동일 dim 사용(원하면 숫자 고정해도 됨)

training_paths = np.array([image_index.get(fn, "") for fn in training_filename], dtype=object)
# 1) detail 도로들에 대해 detail 분류용 (Color SVM + HOG SVM) 저장
detail_pack = train_and_save_detail_models(
    X_hog_train=X_hog_train,
    X_color_train=X_color_train,
    training_road_label=training_road_label,
    training_detail=training_detail,

    out_dir=MODEL_DIR,                
    alpha_shape=ALPHA_FIXED,          
    hog_pca_dim=PCA_DIM_DETAIL,       

    C=10,
    gamma="scale",

    min_total_samples=20,             
    min_samples_per_detail=8          
)

detail_pack


# 2) 모든 도로명에 대해 KNN(위치 회귀) 저장 + detail 도로는 A/B/...별 KNN 추가 저장
train_and_save_knn_models(
    X_hog_train=X_hog_train,
    X_color_train=X_color_train,
    training_road_label=training_road_label,
    training_detail=training_detail,
    training_x=training_x,
    training_y=training_y,
    out_dir=MODEL_DIR,
    hog_pca_dim=best_dim,
    n_neighbors=10,
    min_samples=10,
    training_paths=training_paths
)

[SAVED] detail models road=donhwamunro_11_da | classes=['A', 'B', 'C'] | n=177 | pca=128
[SAVED] detail models road=donhwamunro_11_ga | classes=['A', 'B', 'C'] | n=213 | pca=128
[SAVED] detail models road=donhwamunro_11_na | classes=['A', 'B', 'C', 'D'] | n=300 | pca=128
[SAVED] detail models road=suporo_28 | classes=['A', 'B', 'C', 'D', 'E'] | n=296 | pca=128
[SAVED] KNN(full) road=donhwamunro | n=189
[SAVED] KNN(full) road=donhwamunro_11 | n=130
[SAVED] KNN(full) road=donhwamunro_11_da | n=177
  [SAVED] KNN(full) road=donhwamunro_11_da detail=A | n=90
  [SAVED] KNN(full) road=donhwamunro_11_da detail=B | n=49
  [SAVED] KNN(full) road=donhwamunro_11_da detail=C | n=38
[SAVED] KNN(full) road=donhwamunro_11_ga | n=213
  [SAVED] KNN(full) road=donhwamunro_11_ga detail=A | n=46
  [SAVED] KNN(full) road=donhwamunro_11_ga detail=B | n=79
  [SAVED] KNN(full) road=donhwamunro_11_ga detail=C | n=88
[SAVED] KNN(full) road=donhwamunro_11_na | n=300
  [SAVED] KNN(full) road=donhwamunro_11_na deta

{'saved': [('donhwamunro', 'road'),
  ('donhwamunro_11', 'road'),
  ('donhwamunro_11_da', 'road'),
  ('donhwamunro_11_da', 'A'),
  ('donhwamunro_11_da', 'B'),
  ('donhwamunro_11_da', 'C'),
  ('donhwamunro_11_ga', 'road'),
  ('donhwamunro_11_ga', 'A'),
  ('donhwamunro_11_ga', 'B'),
  ('donhwamunro_11_ga', 'C'),
  ('donhwamunro_11_na', 'road'),
  ('donhwamunro_11_na', 'A'),
  ('donhwamunro_11_na', 'B'),
  ('donhwamunro_11_na', 'C'),
  ('donhwamunro_11_na', 'D'),
  ('samildaero', 'road'),
  ('samildaero_26', 'road'),
  ('samildaero_28', 'road'),
  ('samildaero_30', 'road'),
  ('samildaero_32', 'road'),
  ('samildaero_32_ga', 'road'),
  ('suporo_28', 'road'),
  ('suporo_28', 'A'),
  ('suporo_28', 'B'),
  ('suporo_28', 'C'),
  ('suporo_28', 'D'),
  ('suporo_28', 'E')],
 'knn_root': 'FPL_models/knn_models_full',
 'pca_dim_actual': 128,
 'feature_tag': 'full'}

not 3x3

In [None]:
# Color (HS hist) - full image
X_color_train = extract_color_hs_full(Training_origin_data, h_bins=60, s_bins=64, sizes=None)
X_color_test  = extract_color_hs_full(Test_data,            h_bins=60, s_bins=64, sizes=None)

print("X_color_train:", X_color_train.shape, X_color_train.dtype)

# HOG - full image (추가 resize 없음)
X_hog_train = extract_hog_full(
    Training_origin_data,
    hog_sizes=None,            # 중요: 추가 resize 없음
    orientations=12,
    pixels_per_cell=(8, 8),
    cells_per_block=(2, 2),
)
X_hog_test = extract_hog_full(
    Test_data,
    hog_sizes=None,
    orientations=12,
    pixels_per_cell=(8, 8),
    cells_per_block=(2, 2),
)

print("X_hog_train:", X_hog_train.shape, X_hog_train.dtype)

# LBP - full image (추가 resize 없음)
X_lbp_train = extract_lbp_full(Training_origin_data, resize=None, P=24, R=3, method="uniform")
X_lbp_test  = extract_lbp_full(Test_data,            resize=None, P=24, R=3, method="uniform")

print("X_lbp_train:", X_lbp_train.shape, X_lbp_train.dtype)


X_color_train: (2231, 124) float32


In [None]:
from sklearn.preprocessing import StandardScaler
sc_color = StandardScaler()
Xc_tr = sc_color.fit_transform(X_color_train)
Xc_te = sc_color.transform(X_color_test)

Xh_tr, Xh_te = X_hog_train, X_hog_test  

sc_lbp = StandardScaler()
Xl_tr = sc_lbp.fit_transform(X_lbp_train)
Xl_te = sc_lbp.transform(X_lbp_test)

print("scaled color:", Xc_tr.shape, "scaled lbp:", Xl_tr.shape)

model evaluation

In [None]:
svm_color = train_color_svm(Xc_tr, y_train_road, C=10, gamma="scale")
svm_lbp   = train_lbp_svm(Xl_tr, y_train_road, C=10, gamma="scale")

res_color = eval_svm(svm_color, Xc_tr, y_train_road, Xc_te, y_test_road, name="COLOR_SVM")
res_lbp   = eval_svm(svm_lbp,   Xl_tr, y_train_road, Xl_te, y_test_road, name="LBP_SVM")

pd.DataFrame([res_color, res_lbp])


In [None]:
pca_dims = [64, 128, 256, 512]   # 필요하면 더 추가/조정
hog_result = train_hog_pca_svm_by_dims(
    X_hog_train, y_train_road,
    X_hog_test,  y_test_road,
    pca_dims=pca_dims,
    C=10,
    gamma="scale",
)

df_hog = pd.DataFrame({
    "dim": pca_dims,
    "train_acc": [hog_result["train_acc"][d] for d in pca_dims],
    "test_acc":  [hog_result["test_acc"][d]  for d in pca_dims],
    "total_sec": [hog_result["time_report"][d]["total_sec"] for d in pca_dims],
}).sort_values("test_acc", ascending=False)

df_hog


In [None]:
best_dim = int(df_hog.iloc[0]["dim"])
print("best_dim:", best_dim)

# best_dim의 (scaler,pca)와 svm
hog_scaler, hog_pca = hog_result["hog_pca_models"][best_dim]
svm_hog = hog_result["hog_svm_models"][best_dim]

Xh_tr = hog_result["hog_pca_train_features"][best_dim]
Xh_te = hog_result["hog_pca_test_features"][best_dim]

res_hog = eval_svm(svm_hog, Xh_tr, y_train_road, Xh_te, y_test_road, name=f"HOG_PCA{best_dim}_SVM")
pd.DataFrame([res_hog])


In [None]:
# sigmoid 캘리브레이터 (학습데이터로 fit)
cal_color = fit_sigmoid_calibrator(svm_color, Xc_tr, y_train_road, q_lo=0.10, q_hi=0.90, p_lo=0.05, p_hi=0.95)
cal_lbp   = fit_sigmoid_calibrator(svm_lbp,   Xl_tr, y_train_road, q_lo=0.10, q_hi=0.90, p_lo=0.05, p_hi=0.95)
cal_hog   = fit_sigmoid_calibrator(svm_hog,   Xh_tr, y_train_road, q_lo=0.10, q_hi=0.90, p_lo=0.05, p_hi=0.95)

# 테스트에서 "커스텀 확률" 생성
P_color = predict_proba_custom(svm_color, Xc_te, method="sigmoid", calibrator=cal_color, power=1.0)
P_lbp   = predict_proba_custom(svm_lbp,   Xl_te, method="sigmoid", calibrator=cal_lbp,   power=1.0)
P_hog   = predict_proba_custom(svm_hog,   Xh_te, method="sigmoid", calibrator=cal_hog,   power=1.0)

print(P_color.shape, P_lbp.shape, P_hog.shape)
print("row-sum check:", P_color[0].sum(), P_lbp[0].sum(), P_hog[0].sum())


In [None]:
# 가중치(원하는대로 조절)
w_hog   = 0.6
w_color = 0.3
w_lbp   = 0.1

P_final = fuse_probabilities([P_hog, P_color, P_lbp], weights=[w_hog, w_color, w_lbp])

final_acc, final_pred = evaluate_fusion(P_final, y_test_road)
print("Final Fusion Accuracy:", final_acc)

# 개별 모델도 확률 기반 argmax로 accuracy 찍어보기(참고)
acc_hog, _   = evaluate_fusion(P_hog, y_test_road)
acc_color, _ = evaluate_fusion(P_color, y_test_road)
acc_lbp, _   = evaluate_fusion(P_lbp, y_test_road)

pd.DataFrame([
    {"model":"HOG(sigmoid)",   "acc":acc_hog},
    {"model":"COLOR(sigmoid)", "acc":acc_color},
    {"model":"LBP(sigmoid)",   "acc":acc_lbp},
    {"model":"FUSION",         "acc":final_acc},
])


models_mark2 save

In [None]:
import os, joblib
from datetime import datetime

MODEL_DIR = "FPL_models"
os.makedirs(MODEL_DIR, exist_ok=True)

best_dim = best_dim  # 이미 계산된 값

# (HOG road 분류) pack에서 best_dim 모델 꺼내기
hog_scaler = hog_pack["hog_pca_models"][best_dim][0]
hog_pca    = hog_pack["hog_pca_models"][best_dim][1]
hog_svm    = hog_pack["hog_svm_models"][best_dim]

best_fusion_lr = fusion_lr_models[best_dim] if "fusion_lr_models" in globals() else None

ts = datetime.now().strftime("%Y%m%d_%H%M%S")

# ✅ full feature 세팅을 tag에 박아두면 나중에 헷갈릴 일 없음
tag = f"FULL_HS60x64_HOGori12_LBP24R3_dim{best_dim}_{ts}"

# ===== road models =====
joblib.dump(hog_svm,    f"{MODEL_DIR}/hog_svm_{tag}.pkl")
joblib.dump(hog_pca,    f"{MODEL_DIR}/hog_pca_{tag}.pkl")
joblib.dump(hog_scaler, f"{MODEL_DIR}/hog_scaler_{tag}.pkl")

joblib.dump(color_svm,  f"{MODEL_DIR}/color_svm_{tag}.pkl")  # Color SVM은 Xc_tr로 학습했다고 가정

# ===== LBP SVM (있으면) =====
if "lbp_svm" in globals():
    joblib.dump(lbp_svm, f"{MODEL_DIR}/lbp_svm_{tag}.pkl")   # LBP SVM은 Xl_tr로 학습했다고 가정

# ===== calibrator (있으면) =====
if "cal_hog" in globals():
    joblib.dump(cal_hog, f"{MODEL_DIR}/cal_hog_{tag}.pkl")
if "cal_color" in globals():
    joblib.dump(cal_color, f"{MODEL_DIR}/cal_color_{tag}.pkl")
if "cal_lbp" in globals():
    joblib.dump(cal_lbp, f"{MODEL_DIR}/cal_lbp_{tag}.pkl")

# ✅ 여기! 지금 너가 만든 scaler 변수명으로 저장
# (app에서 읽는 파일명과 호환되게 유지)
joblib.dump(sc_color, f"{MODEL_DIR}/color_scaler_{tag}.pkl")
joblib.dump(sc_lbp,   f"{MODEL_DIR}/lbp_scaler_{tag}.pkl")

# ===== fusion weights (있으면) =====
if all(v in globals() for v in ["w_hog", "w_color", "w_lbp"]):
    joblib.dump({"w_hog": w_hog, "w_color": w_color, "w_lbp": w_lbp},
                f"{MODEL_DIR}/fusion_weights_{tag}.pkl")

# ===== LR-fusion (있으면) =====
if best_fusion_lr is not None:
    joblib.dump(best_fusion_lr, f"{MODEL_DIR}/fusion_lr_{tag}.pkl")

# ===== label map =====
joblib.dump(road_label_map, f"{MODEL_DIR}/road_label_map_{tag}.pkl")

print("Models saved. best_dim =", best_dim)
print("Tag =", tag)
print("Dir =", MODEL_DIR)


detail_SVM and kNN

In [None]:
import importlib, inspect
import fpl_knn_models
import fpl_detail_models

print("loaded from:", fpl_knn_models.__file__)
importlib.reload(fpl_knn_models)
print("loaded from:", fpl_detail_models.__file__)
importlib.reload(fpl_detail_models)

print("new signature:", inspect.signature(fpl_knn_models.train_and_save_knn_models))

# ✅ 이 줄이 중요: 로컬 이름을 '새 함수'로 다시 바인딩
train_and_save_knn_models = fpl_knn_models.train_and_save_knn_models


loaded from: /home/hanseong/vscode/ML_code/FPL/FPL/src/fpl_knn_models.py
new signature: (X_hog_train, X_color_train, training_road_label, training_detail, training_x, training_y, out_dir, hog_pca_dim=128, n_neighbors=7, detail_roads={'donhwamunro_11_ga', 'suporo_28', 'donhwamunro_11_da', 'donhwamunro_11_na'}, min_samples=10, training_paths=None)


In [None]:
MODEL_DIR = "FPL_models" 
ALPHA_FIXED = float(fusion_alpha[best_dim]) 
# 모든 디테일 도로에 동일 적용 PCA_DIM_DETAIL = best_dim 
# # 디테일 HOG PCA도 동일 dim 사용(원하면 숫자 고정해도 됨)

# X_hog_full_train, X_color_full_train  (N, D) 형태

detail_pack = train_and_save_detail_models(
    X_hog_train=Xh_tr,
    X_color_train=Xc_tr,   # ✅ 스케일된 color
    training_road_label=training_road_label,
    training_detail=training_detail,
    out_dir=MODEL_DIR,
    alpha_shape=ALPHA_FIXED,
    hog_pca_dim=best_dim,
    C=10, gamma="scale",
    min_total_samples=20,
    min_samples_per_detail=8,
    feature_tag="full",    # (내가 전에 준 버전 기준)
)

knn_pack = train_and_save_knn_models(
    X_hog_train=Xh_tr,
    X_color_train=Xc_tr,   # ✅ 스케일된 color
    training_road_label=training_road_label,
    training_detail=training_detail,
    training_x=training_x,
    training_y=training_y,
    out_dir=MODEL_DIR,
    hog_pca_dim=best_dim,
    n_neighbors=10,
    min_samples=10,
    training_paths=training_paths,
    feature_tag="full",
)



In [None]:

from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

def euclid_dist(a, b):
    # a,b: (N,2)
    return np.sqrt(np.sum((a - b) ** 2, axis=1))

def clean_detail_arr(detail_arr):
    # detail이 NaN/None/"0" 섞여있어도 안전하게 문자열로
    out = []
    for d in detail_arr:
        s = str(d).strip()
        if s.lower() in ("nan", "none", ""):
            s = "0"
        out.append(s)
    return np.array(out, dtype=object)

KNN_ROOT = os.path.join(MODEL_DIR, "knn_models")

knn_scaler = joblib.load(os.path.join(KNN_ROOT, "knn_hog_scaler_pca128.pkl"))
knn_pca    = joblib.load(os.path.join(KNN_ROOT, "knn_hog_pca_pca128.pkl"))

def make_knn_Z(X_hog, X_color):
    Xh_p = knn_pca.transform(knn_scaler.transform(X_hog))
    return np.hstack([Xh_p, X_color])



In [None]:
MODEL_DIR = "FPL_models"
ALPHA_FIXED = 0.389  # 네가 쓰는 고정 alpha (원하면 바꿔도 됨)

ROAD = "suporo_28"   # 여기만 바꿔서 도로별로 평가 가능

dt = clean_detail_arr(test_detail)

# test 중에서 해당 road + detail 있는 샘플만
idx = np.where((test_road_label == ROAD) & (dt != "0"))[0]
print("Eval samples:", len(idx))

if len(idx) == 0:
    print("해당 road/detail 샘플이 test에 없음")
else:
    base = os.path.join(MODEL_DIR, "detail_models", ROAD)

    hog_svm    = joblib.load(os.path.join(base, "hog_svm.pkl"))
    hog_pca    = joblib.load(os.path.join(base, "hog_pca.pkl"))
    hog_scaler = joblib.load(os.path.join(base, "hog_scaler.pkl"))
    color_svm  = joblib.load(os.path.join(base, "color_svm.pkl"))

    detail_map = joblib.load(os.path.join(base, "detail_label_map.pkl"))
    inv_map = {v:k for k,v in detail_map.items()}

    Xh = X_hog_test[idx]
    Xc = X_color_test[idx]
    y_true = dt[idx]  # 'A','B','C'...

    # HOG pipeline
    Xh_s = hog_scaler.transform(Xh)
    Xh_p = hog_pca.transform(Xh_s)

    # prob
    P_shape = hog_svm.predict_proba(Xh_p)
    P_color = color_svm.predict_proba(Xc)

    # fusion
    P_fuse = ALPHA_FIXED * P_shape + (1 - ALPHA_FIXED) * P_color

    y_pred_idx = np.argmax(P_fuse, axis=1)
    y_pred = np.array([inv_map[i] for i in y_pred_idx], dtype=object)

    acc = accuracy_score(y_true, y_pred)
    print(f"\n=== Detail SVM Fusion Eval | road={ROAD} | alpha={ALPHA_FIXED} ===")
    print("Accuracy:", acc)
    print("\nConfusion:\n", confusion_matrix(y_true, y_pred, labels=sorted(list(set(y_true)))))
    print("\nReport:\n", classification_report(y_true, y_pred))


Eval samples: 97

=== Detail SVM Fusion Eval | road=suporo_28 | alpha=0.389 ===
Accuracy: 0.7835051546391752

Confusion:
 [[30  0  1  0  4]
 [ 3 15  1  1  0]
 [ 1  1 10  0  0]
 [ 2  1  1 15  1]
 [ 3  0  0  1  6]]

Report:
               precision    recall  f1-score   support

           A       0.77      0.86      0.81        35
           B       0.88      0.75      0.81        20
           C       0.77      0.83      0.80        12
           D       0.88      0.75      0.81        20
           E       0.55      0.60      0.57        10

    accuracy                           0.78        97
   macro avg       0.77      0.76      0.76        97
weighted avg       0.79      0.78      0.78        97



In [None]:
DETAIL_ROADS = ["donhwamunro_11_ga", "donhwamunro_11_na", "donhwamunro_11_da", "suporo_28"]

dt = clean_detail_arr(test_detail)

summary = {}
for ROAD in DETAIL_ROADS:
    idx = np.where((test_road_label == ROAD) & (dt != "0"))[0]
    if len(idx) == 0:
        summary[ROAD] = None
        continue

    base = os.path.join(MODEL_DIR, "detail_models", ROAD)

    hog_svm    = joblib.load(os.path.join(base, "hog_svm.pkl"))
    hog_pca    = joblib.load(os.path.join(base, "hog_pca.pkl"))
    hog_scaler = joblib.load(os.path.join(base, "hog_scaler.pkl"))
    color_svm  = joblib.load(os.path.join(base, "color_svm.pkl"))

    detail_map = joblib.load(os.path.join(base, "detail_label_map.pkl"))
    inv_map = {v:k for k,v in detail_map.items()}

    Xh = X_hog_test[idx]
    Xc = X_color_test[idx]
    y_true = dt[idx]

    Xh_p = hog_pca.transform(hog_scaler.transform(Xh))
    P_shape = hog_svm.predict_proba(Xh_p)
    P_color = color_svm.predict_proba(Xc)
    P_fuse  = ALPHA_FIXED * P_shape + (1 - ALPHA_FIXED) * P_color

    y_pred = np.array([inv_map[i] for i in np.argmax(P_fuse, axis=1)], dtype=object)

    summary[ROAD] = {
        "n": len(idx),
        "acc": float(accuracy_score(y_true, y_pred))
    }

print("=== Detail SVM Fusion Summary ===")
for r, v in summary.items():
    if v is None:
        print(f"{r}: (no samples)")
    else:
        print(f"{r}: n={v['n']} acc={v['acc']:.4f}")


=== Detail SVM Fusion Summary ===
donhwamunro_11_ga: n=72 acc=0.7500
donhwamunro_11_na: n=100 acc=0.8100
donhwamunro_11_da: n=58 acc=0.8793
suporo_28: n=97 acc=0.7835


In [None]:
ROAD = "donhwamunro_11_na"   # 원하는 도로로 바꿔

# 좌표 있는 샘플만 평가
xy = np.stack([test_x, test_y], axis=1)
valid = np.isfinite(xy).all(axis=1)

idx = np.where((test_road_label == ROAD) & valid)[0]
print("Eval samples:", len(idx))

if len(idx) == 0:
    print("해당 road에 좌표 있는 test 샘플이 없음")
else:
    Z = make_knn_Z(X_hog_test[idx], X_color_test[idx])
    gt_xy = xy[idx]

    knn_road = joblib.load(os.path.join(KNN_ROOT, ROAD, "knn_road.pkl"))
    pred_xy = knn_road.predict(Z)

    dist = euclid_dist(pred_xy, gt_xy)

    print(f"\n=== Road KNN Eval | road={ROAD} ===")
    print("Mean dist  :", float(dist.mean()))
    print("Median dist:", float(np.median(dist)))
    print("90% dist   :", float(np.percentile(dist, 90)))


Eval samples: 100

=== Road KNN Eval | road=donhwamunro_11_na ===
Mean dist  : 1.3513006679775241
Median dist: 1.1110861492932425
90% dist   : 2.5679790074481845


In [None]:
# ===== 전제: 이미 존재해야 하는 변수들 =====
# KNN_ROOT: 예) "FPL_models/knn_models"
# X_hog_test, X_color_test
# test_road_label, test_detail
# test_x, test_y

def _clean_detail_arr(arr):
    out = []
    for d in arr:
        if d is None:
            out.append("0")
            continue
        s = str(d).strip()
        if s.lower() in ("nan", "none", ""):
            out.append("0")
        else:
            out.append(s)
    return np.array(out, dtype=object)

def _euclid_dist(pred_xy, gt_xy):
    return np.sqrt(np.sum((pred_xy - gt_xy) ** 2, axis=1))

def _make_knn_Z_from_saved_scaler_pca(X_hog, X_color, knn_root):
    # knn_root 안에 저장된 scaler/pca 자동 탐색
    scaler_path = None
    pca_path = None
    for fn in os.listdir(knn_root):
        if fn.startswith("knn_hog_scaler_") and fn.endswith(".pkl"):
            scaler_path = os.path.join(knn_root, fn)
        if fn.startswith("knn_hog_pca_") and fn.endswith(".pkl"):
            pca_path = os.path.join(knn_root, fn)

    if scaler_path is None or pca_path is None:
        raise FileNotFoundError("Cannot find knn_hog_scaler_*.pkl or knn_hog_pca_*.pkl under " + knn_root)

    scaler = joblib.load(scaler_path)
    pca = joblib.load(pca_path)

    Xh_s = scaler.transform(X_hog)
    Xh_p = pca.transform(Xh_s)
    Z = np.hstack([Xh_p, X_color])
    return Z, scaler_path, pca_path


# =========================
# (1) test 준비
# =========================
dt = _clean_detail_arr(test_detail)

xy = np.stack([test_x, test_y], axis=1).astype(np.float32)
valid = np.isfinite(xy).all(axis=1)

print("KNN_ROOT:", KNN_ROOT)
print("valid xy:", int(valid.sum()), "/", len(valid))

# =========================
# (2) KNN 공통 feature 만들기 (저장된 scaler/pca 사용)
# =========================
Z_test, used_scaler_path, used_pca_path = _make_knn_Z_from_saved_scaler_pca(
    X_hog_test, X_color_test, KNN_ROOT
)
print("Z_test shape:", Z_test.shape)
print("used scaler:", used_scaler_path)
print("used pca   :", used_pca_path)

# =========================
# (3) 전체 road/detail 모델 스캔
# =========================
roads = sorted([d for d in os.listdir(KNN_ROOT) if os.path.isdir(os.path.join(KNN_ROOT, d))])

road_rows = []
detail_rows = []

for road in roads:
    road_dir = os.path.join(KNN_ROOT, road)
    road_model_path = os.path.join(road_dir, "knn_road.pkl")
    if not os.path.exists(road_model_path):
        continue

    # ---- road 모델 평가 (해당 road test subset) ----
    idx_r = np.where((test_road_label == road) & valid)[0]
    if len(idx_r) > 0:
        knn_road = joblib.load(road_model_path)
        pred_r = knn_road.predict(Z_test[idx_r])
        dist_r = _euclid_dist(pred_r, xy[idx_r])

        road_rows.append({
            "road": road,
            "n": int(len(idx_r)),
            "mean": float(dist_r.mean()),
            "median": float(np.median(dist_r)),
            "p90": float(np.percentile(dist_r, 90)),
        })

    # ---- detail 모델들 평가 ----
    # 폴더 내 knn_detail_*.pkl 전부
    for fn in sorted(os.listdir(road_dir)):
        if not (fn.startswith("knn_detail_") and fn.endswith(".pkl")):
            continue
        det = fn[len("knn_detail_"):-len(".pkl")]

        idx_d = np.where((test_road_label == road) & (dt == det) & valid)[0]
        if len(idx_d) == 0:
            continue

        knn_det = joblib.load(os.path.join(road_dir, fn))
        pred_d = knn_det.predict(Z_test[idx_d])
        dist_d = _euclid_dist(pred_d, xy[idx_d])

        # 비교용: 같은 subset에서 road knn도 같이 평가해서 개선량 계산
        if os.path.exists(road_model_path):
            knn_road = joblib.load(road_model_path)
            pred_r2 = knn_road.predict(Z_test[idx_d])
            dist_r2 = _euclid_dist(pred_r2, xy[idx_d])
            mean_improve = float(dist_r2.mean() - dist_d.mean())
            med_improve = float(np.median(dist_r2) - np.median(dist_d))
        else:
            mean_improve = float("nan")
            med_improve = float("nan")

        detail_rows.append({
            "road": road,
            "detail": det,
            "n": int(len(idx_d)),
            "mean": float(dist_d.mean()),
            "median": float(np.median(dist_d)),
            "p90": float(np.percentile(dist_d, 90)),
            "mean_improve_vs_road": mean_improve,
            "median_improve_vs_road": med_improve,
        })

# =========================
# (4) 출력
# =========================
print("\n=== Road KNN Distance Report (sorted by mean) ===")
road_rows_sorted = sorted(road_rows, key=lambda x: x["mean"])
for r in road_rows_sorted:
    print(f"{r['road']:15s} | n={r['n']:4d} | mean={r['mean']:.3f} | median={r['median']:.3f} | p90={r['p90']:.3f}")

print("\n=== Detail KNN Distance Report (sorted by mean) ===")
detail_rows_sorted = sorted(detail_rows, key=lambda x: x["mean"])
for r in detail_rows_sorted:
    print(
        f"{r['road']:15s} | detail={r['detail']:>1s} | n={r['n']:4d} | "
        f"mean={r['mean']:.3f} | median={r['median']:.3f} | p90={r['p90']:.3f} | "
        f"Δmean_vs_road={r['mean_improve_vs_road']:+.3f} | Δmed_vs_road={r['median_improve_vs_road']:+.3f}"
    )

print("\n=== Summary ===")
print("roads evaluated :", len(road_rows))
print("details evaluated:", len(detail_rows))


KNN_ROOT: FPL_models/knn_models
valid xy: 743 / 743
Z_test shape: (743, 686)
used scaler: FPL_models/knn_models/knn_hog_scaler_pca128.pkl
used pca   : FPL_models/knn_models/knn_hog_pca_pca128.pkl

=== Road KNN Distance Report (sorted by mean) ===
samildaero      | n=  66 | mean=0.000 | median=0.000 | p90=0.000
samildaero_32   | n=  79 | mean=0.000 | median=0.000 | p90=0.000
samildaero_26   | n=  26 | mean=0.211 | median=0.156 | p90=0.405
samildaero_28   | n=  38 | mean=0.607 | median=0.659 | p90=1.108
samildaero_32_ga | n=  43 | mean=0.614 | median=0.598 | p90=1.211
donhwamunro_11  | n=  43 | mean=1.026 | median=0.988 | p90=1.880
donhwamunro_11_da | n=  58 | mean=1.080 | median=1.013 | p90=1.892
suporo_28       | n=  97 | mean=1.122 | median=1.137 | p90=1.594
donhwamunro     | n=  62 | mean=1.142 | median=1.192 | p90=1.803
samildaero_30   | n=  59 | mean=1.258 | median=1.229 | p90=2.241
donhwamunro_11_na | n= 100 | mean=1.351 | median=1.111 | p90=2.568
donhwamunro_11_ga | n=  72 | mean

In [None]:
DETAIL_ROADS = [
    "donhwamunro_11_da",
    "donhwamunro_11_ga",
    "donhwamunro_11_na",
    "suporo_28",
]

# 좌표 dtype 강제 숫자화 (중요!)
test_x_num = pd.to_numeric(test_x, errors="coerce")
test_y_num = pd.to_numeric(test_y, errors="coerce")

xy = np.stack([test_x_num, test_y_num], axis=1)
valid = np.isfinite(xy).all(axis=1)
dt = clean_detail_arr(test_detail)

print("valid xy:", valid.sum(), "/", len(valid))

detail_report = []

for ROAD in DETAIL_ROADS:
    print(f"\n[ROAD] {ROAD}")

    road_knn_path = os.path.join(KNN_ROOT, ROAD, "knn_road.pkl")
    if not os.path.exists(road_knn_path):
        print("  ❗ road knn not found:", road_knn_path)
        continue

    knn_road = joblib.load(road_knn_path)

    # 이 도로에 존재하는 detail 목록 (test 기준)
    details_in_road = sorted(list(set(dt[test_road_label == ROAD])))
    details_in_road = [d for d in details_in_road if d != "0"]
    print("  details:", details_in_road)

    for DETAIL in details_in_road:
        detail_knn_path = os.path.join(KNN_ROOT, ROAD, f"knn_detail_{DETAIL}.pkl")
        if not os.path.exists(detail_knn_path):
            print(f"    ❗ detail knn not found: {DETAIL}")
            continue

        idx = np.where((test_road_label == ROAD) & (dt == DETAIL) & valid)[0]
        print(f"    detail={DETAIL} | valid samples={len(idx)}")

        if len(idx) == 0:
            continue

        knn_detail = joblib.load(detail_knn_path)

        Z = make_knn_Z(X_hog_test[idx], X_color_test[idx])
        gt_xy = xy[idx]

        pred_r = knn_road.predict(Z)
        pred_d = knn_detail.predict(Z)

        dist_r = euclid_dist(pred_r, gt_xy)
        dist_d = euclid_dist(pred_d, gt_xy)

        detail_report.append({
            "road": ROAD,
            "detail": DETAIL,
            "n": len(idx),
            "road_mean": float(dist_r.mean()),
            "detail_mean": float(dist_d.mean()),
            "road_median": float(np.median(dist_r)),
            "detail_median": float(np.median(dist_d)),
            "mean_improve": float(dist_r.mean() - dist_d.mean()),
            "median_improve": float(np.median(dist_r) - np.median(dist_d)),
        })

print("\n=== detail_report rows ===", len(detail_report))


valid xy: 743 / 743

[ROAD] donhwamunro_11_da
  details: ['A', 'B', 'C']
    detail=A | valid samples=30
    detail=B | valid samples=16
    detail=C | valid samples=12

[ROAD] donhwamunro_11_ga
  details: ['A', 'B', 'C']
    detail=A | valid samples=16
    detail=B | valid samples=27
    detail=C | valid samples=29

[ROAD] donhwamunro_11_na
  details: ['A', 'B', 'C', 'D']
    detail=A | valid samples=23
    detail=B | valid samples=42
    detail=C | valid samples=18
    detail=D | valid samples=17

[ROAD] suporo_28
  details: ['A', 'B', 'C', 'D', 'E']
    detail=A | valid samples=35
    detail=B | valid samples=20
    detail=C | valid samples=12
    detail=D | valid samples=20
    detail=E | valid samples=10

=== detail_report rows === 15


In [None]:
print("=== Detail KNN Distance Report (per detail) ===")

detail_report_sorted = sorted(detail_report, key=lambda x: (x["road"], x["detail"]))

for r in detail_report_sorted:
    print(
        f"{r['road']:15s} | detail={r['detail']} | n={r['n']:3d} | "
        f"road_mean={r['road_mean']:.3f} -> detail_mean={r['detail_mean']:.3f} | "
        f"Δmean={r['mean_improve']:+.3f} | "
        f"road_med={r['road_median']:.3f} -> detail_med={r['detail_median']:.3f} | "
        f"Δmed={r['median_improve']:+.3f}"
    )


=== Detail KNN Distance Report (per detail) ===
donhwamunro_11_da | detail=A | n= 30 | road_mean=0.833 -> detail_mean=0.336 | Δmean=+0.497 | road_med=0.671 -> detail_med=0.327 | Δmed=+0.345
donhwamunro_11_da | detail=B | n= 16 | road_mean=0.858 -> detail_mean=0.496 | Δmean=+0.362 | road_med=0.803 -> detail_med=0.456 | Δmed=+0.347
donhwamunro_11_da | detail=C | n= 12 | road_mean=1.074 -> detail_mean=0.264 | Δmean=+0.810 | road_med=1.079 -> detail_med=0.000 | Δmed=+1.079
donhwamunro_11_ga | detail=A | n= 16 | road_mean=2.164 -> detail_mean=0.307 | Δmean=+1.857 | road_med=2.247 -> detail_med=0.310 | Δmed=+1.937
donhwamunro_11_ga | detail=B | n= 27 | road_mean=1.176 -> detail_mean=0.635 | Δmean=+0.542 | road_med=1.167 -> detail_med=0.484 | Δmed=+0.683
donhwamunro_11_ga | detail=C | n= 29 | road_mean=0.980 -> detail_mean=0.290 | Δmean=+0.690 | road_med=0.760 -> detail_med=0.000 | Δmed=+0.760
donhwamunro_11_na | detail=A | n= 23 | road_mean=1.864 -> detail_mean=0.603 | Δmean=+1.261 | road_me

In [None]:
from sklearn.neighbors import KNeighborsRegressor

K_LIST = [1, 3, 5, 7, 9]

# train / test 좌표
xy_train = np.stack([training_x, training_y], axis=1)
xy_test  = np.stack([test_x, test_y], axis=1)

valid_tr = np.isfinite(xy_train).all(axis=1)
valid_te = np.isfinite(xy_test).all(axis=1)

Z_train = make_knn_Z(X_hog_train, X_color_train)
Z_test  = make_knn_Z(X_hog_test,  X_color_test)

print("=== k sweep (train → test) ===")

for k in K_LIST:
    knn = KNeighborsRegressor(
        n_neighbors=k,
        metric="euclidean",
        weights="distance"
    )

    knn.fit(Z_train[valid_tr], xy_train[valid_tr])

    pred = knn.predict(Z_test[valid_te])
    dist = euclid_dist(pred, xy_test[valid_te])

    print(
        f"k={k} | "
        f"mean={dist.mean():.3f} | "
        f"median={np.median(dist):.3f} | "
        f"p90={np.percentile(dist,90):.3f}"
    )


=== k sweep (train → test) ===
k=1 | mean=2.049 | median=1.414 | p90=5.099
k=3 | mean=2.244 | median=1.948 | p90=4.447
k=5 | mean=2.307 | median=2.200 | p90=4.115
k=7 | mean=2.335 | median=2.147 | p90=4.086
k=9 | mean=2.364 | median=2.218 | p90=4.020
