### train, test csv파일 업로드, 데이터 분석

In [None]:
import pandas as pd
train_path = "train.csv"
train_df = pd.read_csv(train_path)

print(train_df.shape)
print(train_df.columns.tolist())
train_df.head()

In [None]:
test_path = "test.csv"
test_df = pd.read_csv(test_path)

print(test_df.shape)
print(test_df.columns.tolist())
test_df.head()

In [None]:
ID_COL = "ID"

TARGET_COLS = [
    'probaObstacle1', 'x1', 'dx1', 'y1', 'dy1', 
    'probaObstacle2', 'x2', 'dx2', 'y2', 'dy2', 
    'probaObstacle3', 'x3', 'dx3', 'y3', 'dy3',
]

### 이미지 기반 Feature Vector 추출

In [None]:
# 기본 import + 유틸 / Box

import os
import cv2
import numpy as np
import pandas as pd
from dataclasses import dataclass

@dataclass
class Box:
    x: int
    y: int
    w: int
    h: int
    score: float = 1.0

def clamp(v, lo, hi):
    return max(lo, min(hi, v))

def iou(a: Box, b: Box) -> float:
    ax1, ay1, ax2, ay2 = a.x, a.y, a.x + a.w, a.y + a.h
    bx1, by1, bx2, by2 = b.x, b.y, b.x + b.w, b.y + b.h
    ix1, iy1 = max(ax1, bx1), max(ay1, by1)
    ix2, iy2 = min(ax2, bx2), min(ay2, by2)
    iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1)
    inter = iw * ih
    union = a.w * a.h + b.w * b.h - inter
    return inter / union if union > 0 else 0.0


In [None]:
# 후보 bbox 추출 (rail mask ROI 기반)

def extract_candidates(frame_bgr: np.ndarray, rail_mask: np.ndarray,
                       min_area=400, min_side=15, topk=30):
    """
    rail_mask: uint8 (H,W) rail=255, background=0 (혹은 0/255)
    """
    assert frame_bgr is not None and rail_mask is not None

    mask = (rail_mask > 0).astype(np.uint8) * 255

    gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
    gray = cv2.bitwise_and(gray, gray, mask=mask)

    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blur, 60, 140)

    k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
    edges = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, k, iterations=2)
    edges = cv2.dilate(edges, k, iterations=1)
    edges = cv2.bitwise_and(edges, edges, mask=mask)

    num, labels, stats, _ = cv2.connectedComponentsWithStats(edges, connectivity=8)

    H, W = gray.shape
    boxes = []
    for i in range(1, num):
        x, y, w, h, area = stats[i]
        if area < min_area:
            continue
        if w < min_side or h < min_side:
            continue
        if w > W * 0.9 or h > H * 0.9:
            continue
        boxes.append(Box(int(x), int(y), int(w), int(h), float(area)))

    boxes.sort(key=lambda b: b.score, reverse=True)
    return boxes[:topk]


In [None]:
# KLT (LK optical flow)로 박스 내부 motion 측정

def box_motion_median(frame1_bgr: np.ndarray, frame2_bgr: np.ndarray, box: Box) -> float:
    g1 = cv2.cvtColor(frame1_bgr, cv2.COLOR_BGR2GRAY)
    g2 = cv2.cvtColor(frame2_bgr, cv2.COLOR_BGR2GRAY)

    x1 = clamp(box.x, 0, g1.shape[1]-1)
    y1 = clamp(box.y, 0, g1.shape[0]-1)
    x2 = clamp(box.x + box.w, 0, g1.shape[1])
    y2 = clamp(box.y + box.h, 0, g1.shape[0])

    roi = g1[y1:y2, x1:x2]
    if roi.size == 0:
        return 0.0

    p0 = cv2.goodFeaturesToTrack(roi, maxCorners=120, qualityLevel=0.01, minDistance=5)
    if p0 is None:
        return 0.0

    # ROI -> full coords
    p0[:, 0, 0] += x1
    p0[:, 0, 1] += y1

    p1, st, _ = cv2.calcOpticalFlowPyrLK(
        g1, g2, p0, None,
        winSize=(21, 21), maxLevel=3,
        criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
    )

    good0 = p0[st == 1]
    good1 = p1[st == 1]
    if len(good0) == 0:
        return 0.0

    disp = good1 - good0
    mag = np.sqrt(disp[:, 0]**2 + disp[:, 1]**2)
    return float(np.median(mag))


In [None]:
# edge density (bbox 내부 edge 비율) 계산

def edge_density(frame_bgr: np.ndarray, box: Box) -> float:
    gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
    x1 = clamp(box.x, 0, gray.shape[1]-1)
    y1 = clamp(box.y, 0, gray.shape[0]-1)
    x2 = clamp(box.x + box.w, 0, gray.shape[1])
    y2 = clamp(box.y + box.h, 0, gray.shape[0])
    roi = gray[y1:y2, x1:x2]
    if roi.size == 0:
        return 0.0
    blur = cv2.GaussianBlur(roi, (5, 5), 0)
    edges = cv2.Canny(blur, 60, 140)
    return float((edges > 0).mean())


In [None]:
# 두 프레임 후보 매칭 + feature vector 추출
def make_paths(sample_id: int, base_dir: str):
    sid = f"{sample_id:04d}"  # 1 -> "0001", 12 -> "0012"
    img1_path = os.path.join(base_dir, f"{sid}_1.jpg")
    img2_path = os.path.join(base_dir, f"{sid}_2.jpg")
    mask_path = os.path.join(base_dir, f"{sid}_rail.png")
    return img1_path, img2_path, mask_path


def extract_feature_vector_for_id(sample_id: int, base_dir: str) -> dict:
    """
    base_dir 안에:
      {id}_1.jpg, {id}_2.jpg, {id}_rail.png
    이 있다고 가정 (id는 1~2319, 2320~3361 등)
    """
    img1_path, img2_path, mask_path = make_paths(sample_id, base_dir)


    f1 = cv2.imread(img1_path, cv2.IMREAD_COLOR)
    f2 = cv2.imread(img2_path, cv2.IMREAD_COLOR)
    rail = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

    if f1 is None or f2 is None or rail is None:
        raise FileNotFoundError(
            f"Failed to load files for id={sample_id}\n"
            f"img1={img1_path} (None? {f1 is None})\n"
            f"img2={img2_path} (None? {f2 is None})\n"
            f"mask={mask_path} (None? {rail is None})"
        )

    H, W = f1.shape[:2]
    rail_bin = (rail > 0).astype(np.uint8)
    rail_area = float(rail_bin.sum()) + 1e-6  # avoid div0

    # 1) 후보 박스 (각 프레임)
    c1 = extract_candidates(f1, rail)
    c2 = extract_candidates(f2, rail)
    num_candidates = len(c1)

    # 후보가 없으면 feature는 대부분 0으로
    if num_candidates == 0 or len(c2) == 0:
        return {
            "id": sample_id,
            "num_candidates": num_candidates,
            "min_motion": 0.0,
            "mean_motion": 0.0,
            "max_iou": 0.0,
            "max_area_ratio": 0.0,
            "stationary_score_1": 0.0,
            "stationary_score_2": 0.0,
            "stationary_score_3": 0.0,
            "edge_density_mean": 0.0,
        }

    # 2) 매칭: c1 박스마다 c2에서 best IoU 찾기 (중복 매칭 방지)
    matches = []
    used2 = set()
    for b1 in c1:
        best_j, best_v = -1, 0.0
        for j, b2 in enumerate(c2):
            if j in used2:
                continue
            v = iou(b1, b2)
            if v > best_v:
                best_v = v
                best_j = j
        if best_j >= 0 and best_v > 0.0:
            used2.add(best_j)
            b2 = c2[best_j]

            # union box
            ux = min(b1.x, b2.x)
            uy = min(b1.y, b2.y)
            ux2 = max(b1.x + b1.w, b2.x + b2.w)
            uy2 = max(b1.y + b1.h, b2.y + b2.h)
            ubox = Box(ux, uy, ux2 - ux, uy2 - uy)

            # motion in union box
            m = box_motion_median(f1, f2, ubox)

            # area ratio (union box area vs rail area)
            area_ratio = float((ubox.w * ubox.h) / rail_area)

            # stationary score: (IoU) / (motion + eps) * area_ratio 가중
            score = (best_v / (m + 0.5)) * (0.2 + area_ratio)

            matches.append({
                "iou": best_v,
                "motion": m,
                "area_ratio": area_ratio,
                "stationary_score": score,
                "edge_density": edge_density(f1, ubox),
            })

    if len(matches) == 0:
        return {
            "id": sample_id,
            "num_candidates": num_candidates,
            "min_motion": 0.0,
            "mean_motion": 0.0,
            "max_iou": 0.0,
            "max_area_ratio": 0.0,
            "stationary_score_1": 0.0,
            "stationary_score_2": 0.0,
            "stationary_score_3": 0.0,
            "edge_density_mean": 0.0,
        }

    motions = [d["motion"] for d in matches]
    ious = [d["iou"] for d in matches]
    area_ratios = [d["area_ratio"] for d in matches]
    eds = [d["edge_density"] for d in matches]

    # stationary score 상위 3개
    matches.sort(key=lambda d: d["stationary_score"], reverse=True)
    top_scores = [d["stationary_score"] for d in matches[:3]]
    while len(top_scores) < 3:
        top_scores.append(0.0)

    return {
        "id": sample_id,
        "num_candidates": float(num_candidates),
        "min_motion": float(np.min(motions)),
        "mean_motion": float(np.mean(motions)),
        "max_iou": float(np.max(ious)),
        "max_area_ratio": float(np.max(area_ratios)),
        "stationary_score_1": float(top_scores[0]),
        "stationary_score_2": float(top_scores[1]),
        "stationary_score_3": float(top_scores[2]),
        "edge_density_mean": float(np.mean(eds)),
    }


In [None]:
# 여러 id에 대해 feature DataFrame 만들기 (train/test 공통)

def build_feature_df(ids, base_dir: str, verbose_every=50) -> pd.DataFrame:
    rows = []
    for idx, sid in enumerate(ids, 1):
        rows.append(extract_feature_vector_for_id(int(sid), base_dir))
        if verbose_every and idx % verbose_every == 0:
            print(f"Processed {idx}/{len(ids)}")
    return pd.DataFrame(rows)


In [None]:
import re
import os

def scan_ids(base_dir: str):
    """
    base_dir 안에서 '####_1.jpg' 패턴을 찾아 ####를 id로 수집
    """
    pat = re.compile(r"^(\d{4})_1\.jpg$")
    ids = []
    for fn in os.listdir(base_dir):
        m = pat.match(fn)
        if m:
            ids.append(int(m.group(1)))  # "0001" -> 1
    ids = sorted(set(ids))
    return ids


In [None]:
# train(1~2319), test(2320~3361) feature 추출

base_dir = "./imagesLevelCrossing"  # 너 폴더
all_ids = scan_ids(base_dir)

train_ids = [i for i in all_ids if 1 <= i <= 2319]
test_ids  = [i for i in all_ids if 2320 <= i <= 3361]

print("train ids:", len(train_ids), train_ids[:10])
print("test ids :", len(test_ids), test_ids[:10])

X_train_features = build_feature_df(train_ids, base_dir, verbose_every=50)
X_test_features  = build_feature_df(test_ids,  base_dir, verbose_every=50)

X_train_features.head()


In [None]:
X_test_features.head(30)

In [None]:
X_train_features.head(20)

In [None]:
print(X_train_features.columns.tolist())

### 이미지 기반으로 추출한 feature와 y를 id로 merge시킴

In [None]:
# 필수 체크
missing_train = [c for c in [ID_COL] + TARGET_COLS if c not in train_df.columns]
if missing_train:
    raise ValueError(f"train.csv에 필요한 컬럼이 없음: {missing_train}")

missing_test = [c for c in [ID_COL] if c not in test_df.columns]
if missing_test:
    raise ValueError(f"test.csv에 필요한 컬럼이 없음: {missing_test}")

In [None]:
# 이미지 기반 feature df를 csv의 ID랑 맞춰서 merge시킴 

# 1) 이미지 feature DF에 id가 있어야 함
assert "id" in X_train_features.columns
assert "id" in X_test_features.columns

# 2) id -> ID로 rename (CSV와 키 맞추기)
X_train_feat = X_train_features.rename(columns={"id": ID_COL}).copy()
X_test_feat  = X_test_features.rename(columns={"id": ID_COL}).copy()

# 3) 타입 통일(merge 실패 방지)
train_df[ID_COL] = train_df[ID_COL].astype(int)
test_df[ID_COL]  = test_df[ID_COL].astype(int)
X_train_feat[ID_COL] = X_train_feat[ID_COL].astype(int)
X_test_feat[ID_COL]  = X_test_feat[ID_COL].astype(int)

# 4) train: 이미지 feature + target
train_merged = X_train_feat.merge(train_df[[ID_COL] + TARGET_COLS], on=ID_COL, how="inner")

# 5) test: 이미지 feature + ID (target 없음)
test_merged = X_test_feat.merge(test_df[[ID_COL]], on=ID_COL, how="inner")

print("train_merged:", train_merged.shape)
print("test_merged :", test_merged.shape)

In [None]:
test_merged.head(30)

In [None]:
train_merged.head(30)

In [None]:
print(train_merged.columns.tolist())

In [None]:
print(train_df.columns.tolist())

### 학습용 X / y 최종 분리

In [None]:
# 학습용 X / y 분리 + exist 라벨 만들기

FEATURE_COLS = [c for c in train_merged.columns if c not in [ID_COL] + TARGET_COLS]

X_train = train_merged[FEATURE_COLS].copy()
y_train = train_merged[TARGET_COLS].copy()

X_test = test_merged[FEATURE_COLS].copy()

y_exist = (train_merged[["probaObstacle1","probaObstacle2","probaObstacle3"]].max(axis=1) > 0).astype(int)

print("X_train:", X_train.shape, "y_train:", y_train.shape, "X_test:", X_test.shape)
print("exist rate:", y_exist.mean())


### 모델 학습 + 예측 + sample_submission 형식 저장

In [None]:
from sklearn.ensemble import HistGradientBoostingClassifier, HistGradientBoostingRegressor
from sklearn.multioutput import MultiOutputRegressor
import numpy as np

# 1) 존재 여부 분류기
clf = HistGradientBoostingClassifier(
    max_depth=6,
    learning_rate=0.05,
    max_iter=400,
    random_state=42
)
clf.fit(X_train, y_exist)

# 2) 회귀기 (exist==1만 학습)
reg = MultiOutputRegressor(
    HistGradientBoostingRegressor(
        max_depth=6,
        learning_rate=0.05,
        max_iter=700,
        random_state=42
    )
)

mask_pos = (y_exist.values == 1)
reg.fit(X_train[mask_pos], y_train[mask_pos])

print("trained: clf + reg")

# ----- test 예측 -----
prob_exist = clf.predict_proba(X_test)[:, 1]
pred_exist = (prob_exist >= 0.5).astype(int)

pred_reg = reg.predict(X_test)

# 차량 없을 때 기본값 (문서 규칙)
default_row = np.array([
    0.0, 0.5, 0.0, 0.5, 0.0,
    0.0, 0.5, 0.0, 0.5, 0.0,
    0.0, 0.5, 0.0, 0.5, 0.0,
], dtype=float)

pred_reg[pred_exist == 0] = default_row

# 범위 클립 (안정)
pred_reg[:, 0]  = np.clip(pred_reg[:, 0],  0, 1)
pred_reg[:, 5]  = np.clip(pred_reg[:, 5],  0, 1)
pred_reg[:, 10] = np.clip(pred_reg[:, 10], 0, 1)
pred_reg[:, 1:] = np.clip(pred_reg[:, 1:], 0, 1)

# ----- sample_submission 형식에 맞춰 저장 -----
sample_path = "sample_submission.csv"
sample_sub = pd.read_csv(sample_path)
sub_cols = sample_sub.columns.tolist()

pred_df = pd.DataFrame(pred_reg, columns=TARGET_COLS)

out = pd.DataFrame()
if ID_COL in sub_cols:
    out[ID_COL] = test_merged[ID_COL].values
    target_cols_needed = [c for c in sub_cols if c != ID_COL]
else:
    target_cols_needed = sub_cols

for c in target_cols_needed:
    if c not in pred_df.columns:
        raise ValueError(f"sample_submission 컬럼 '{c}'가 예측 컬럼(TARGET_COLS)에 없음. 컬럼명 확인 필요.")
    out[c] = pred_df[c].values

out.to_csv("LYJ_0103_submission.csv", index=False)
print("Saved: submission.csv", out.shape)
out.head()


In [None]:
# 어떤 ID가 안 맞는지 확인
train_ids_csv = set(train_df[ID_COL].tolist())
train_ids_img = set(X_train_feat[ID_COL].tolist())

print("train.csv에만 있는 ID 개수:", len(train_ids_csv - train_ids_img))
print("이미지 feature에만 있는 ID 개수:", len(train_ids_img - train_ids_csv))

# 샘플 몇 개 보기
print("csv only sample:", sorted(list(train_ids_csv - train_ids_img))[:10])
print("img only sample:", sorted(list(train_ids_img - train_ids_csv))[:10])
