In [1]:
import os, glob
import numpy as np
import cv2
from collections import defaultdict

In [2]:
CSV = "./runs/detect/test_crash_ver2"
VIDEO = "./runs/detect/test_crash_ver2/crash_ver2.mp4"
LABELS_DIR = "./runs/detect/test_crash_ver2/labels"

In [3]:
###denormalize bounding boxes. 0~1 scale을 실제 image 크기로

cap = cv2.VideoCapture(VIDEO)
fps  = cap.get(cv2.CAP_PROP_FPS)
W    = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H    = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()

속도 구하기 방법 2가지
1. pixel 당 거리 임의로 지정하기
    - 쉽지만 부정확. 원근법 고려 안하기 때문에 대안이 될 수 없다
2. homography 이용
    - 화면에서의 두 지점의 실제 측정값 넣은 뒤 homography 계산
    - homography -> 실제 CCTV 화면의 좌표를 Real world 좌표로 전환하고자

### 방법 1

In [5]:
METERS_PER_PIXEL = 0.08

In [6]:
import re

def numeric_key(path):
    #test_video_1001.txt -> 1001
    m = re.findall(r"(\d+)(?=\.txt$)", os.path.basename(path))
    return int(m[-1]) if m else -1

def parse_line_norm6(line, W, H):
    """
    Parses: class cx cy w h track_id  (all normalized 0..1)
    Returns pixel xywh + track_id, or None on failure.
    """
    p = line.strip().split()
    if len(p) != 6:
        return None
    cls, cx, cy, w, h, tid = p
    cx, cy, w, h = map(float, (cx, cy, w, h))
    tid = int(float(tid))
    #de-normalize
    cx, cy, w, h = cx*W, cy*H, w*W, h*H
    return {"track_id": tid, "cx": cx, "cy": cy, "w": w, "h": h}


In [29]:
import os, glob, itertools



all_txts = sorted(glob.glob(os.path.join(LABELS_DIR, "*.txt")))
print("labels_dir exists:", os.path.isdir(LABELS_DIR))
print("# of txt files:", len(all_txts))
print("First 5 files:", all_txts[:5])

#앞에 몇개만 보기
def peek_lines(path, n=3):
    with open(path, "r") as f:
        return [next(f).strip() for _ in range(n)]
if all_txts:
    sample = all_txts[0]
    print("\nSample file:", sample)
    print("\n".join(peek_lines(sample, n=5)))


labels_dir exists: True
# of txt files: 1350
First 5 files: ['./runs/detect/test1/test_ver2/labels/test_1.txt', './runs/detect/test1/test_ver2/labels/test_10.txt', './runs/detect/test1/test_ver2/labels/test_100.txt', './runs/detect/test1/test_ver2/labels/test_1000.txt', './runs/detect/test1/test_ver2/labels/test_1001.txt']

Sample file: ./runs/detect/test1/test_ver2/labels/test_1.txt
1 0.619589 0.481689 0.0486497 0.061562 1
1 0.377282 0.640223 0.0814116 0.0951284 2
1 0.359535 0.559343 0.0715641 0.0903697 3
1 0.602564 0.302533 0.0292859 0.0333566 4
1 0.685362 0.212117 0.0189686 0.0248631 5


In [7]:
#모든 frame 가져오기
frame_files = sorted(glob.glob(os.path.join(LABELS_DIR, "*.txt")), key=numeric_key)
print("frame files:", len(frame_files))

tracks = defaultdict(list)   # tid -> [(t, y_bottom)]
for fi, path in enumerate(frame_files):
    t = fi / fps
    with open(path, "r") as f:
        for line in f:
            rec = parse_line_norm6(line, W, H)
            if rec is None:
                continue
            tid = rec["track_id"]
            y_bottom = rec["cy"] + rec["h"]/2.0
            tracks[tid].append((t, y_bottom))

print("unique track IDs:", len(tracks))

# compute speeds
per_track_avg_kmh = {}
per_track_inst = defaultdict(list)  # tid -> [(t_mid, v_mps)]

for tid, seq in tracks.items():
    if len(seq) < 2:
        continue
    seq = sorted(seq, key=lambda z: z[0])
    total_d, total_t = 0.0, 0.0
    for (t0, y0), (t1, y1) in zip(seq[:-1], seq[1:]):
        dt = t1 - t0
        if dt <= 1e-6: 
            continue
        dy_px = abs(y1 - y0)
        dist_m = dy_px * METERS_PER_PIXEL        # assumption: vertical pixel motion ≈ along-lane meters
        v_mps = dist_m / dt
        total_d += dist_m
        total_t += dt
        per_track_inst[tid].append((0.5*(t0+t1), v_mps))
    if total_t > 0:
        per_track_avg_kmh[tid] = (total_d/total_t) * 3.6

if per_track_avg_kmh:
    overall_avg_kmh = float(np.mean(list(per_track_avg_kmh.values())))
    print(f"Overall average speed: {overall_avg_kmh:.1f} km/h  (cars: {len(per_track_avg_kmh)})")
else:
    print("No multi-frame tracks with IDs.")



frame files: 1350
unique track IDs: 174
Overall average speed: 17.8 km/h  (cars: 142)


ㅋㅋㅋ 될리가없지

### 방법 2

한국 고속도로 차로 폭: 3.6미터
점선 길이: 8미터
점선 사이 간격: 12미터


In [4]:
cap = cv2.VideoCapture(VIDEO)

In [5]:
#첫번쨰 frame 가져오기
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, frame = cap.read()
if ret:
    out_path = "./frame_1.jpg"
    ok = cv2.imwrite(out_path, frame)
    print("Saved")
    
cap.release()

Saved


In [6]:
img_points = np.array([])

In [7]:
###select homography points

frame_path = "frame_1.jpg"
img = cv2.imread(frame_path)

points = []

def click_event(event, x, y, flags, param):
    if event == cv2.EVENT_LBUTTONDOWN:
        print(f"Clicked: ({x}, {y})")
        points.append((x, y))
        #draw a small circle to mark the click
        cv2.circle(img, (x, y), 5, (0, 0, 255), -1)
        cv2.imshow("frame", img)

cv2.imshow("frame", img)
cv2.setMouseCallback("frame", click_event)
print("Left-click points in the order you want; press any key when done.")
cv2.waitKey(0)
cv2.destroyAllWindows()

print("Collected points:", points)



Left-click points in the order you want; press any key when done.
Clicked: (1616, 1948)
Clicked: (1907, 1485)
Clicked: (1220, 1793)
Clicked: (1569, 1423)
Collected points: [(1616, 1948), (1907, 1485), (1220, 1793), (1569, 1423)]


In [8]:
img_points = np.array(points, dtype = np.float32)
###실제 고속도로 실측 토대로 알맞은 값 넣기
#한국 고속도로
"""
real_coords = np.array([
    [0,0], 
    [0, 8], 
    [3.6, 0], 
    [3.6, 8], 
    [0, 40], 
    [0, 48], 
    [3.6, 40], 
    [3.6, 48]], dtype = np.float32)
"""
#test
real_coords = np.array([
    [3.5,0], 
    [3.5, 8], 
    [0, 0], 
    [0, 8]], dtype = np.float32)

In [6]:
H, _ = cv2.findHomography(img_points, real_coords, cv2.RANSAC, 2.0)
print("Homography Matrix:\n", H)

NameError: name 'img_points' is not defined

In [10]:
"""
VIDEO = "./videos/test_video.mp4" 
LABELS_DIR = "./runs/detect/test1/test_ver1/labels
"""


# Filtering knobs (tweak as needed)
MIN_TRACK_SECONDS = 3        # ignore very short tracks
MAX_STEP_M = 25.0              # ignore single-frame jumps > this (meters), helps with ID swaps
USE_MEDIAN_INSTANT = False     # if True, per-track speed = median of inst speeds; else distance/time


In [11]:
# ====== HELPERS ======
def natural_key(path):
    # sort "test_video_1.txt" < "test_video_10.txt" < "test_video_100.txt"
    bname = os.path.basename(path)
    return [int(t) if t.isdigit() else t for t in re.split(r"(\d+)", bname)]

def img_to_ground(x_px, y_px, H):
    """Project image pixel -> ground plane (meters) using homography H."""
    p = np.array([x_px, y_px, 1.0], dtype=float)
    q = H @ p
    X = q[0] / q[2]
    Y = q[1] / q[2]
    return float(X), float(Y)

def parse_line_6col(line, W, Himg):
    # Format: class cx cy w h track_id  (cx,cy,w,h normalized)
    parts = line.strip().split()
    if len(parts) != 6:
        return None
    cls, cx, cy, w, h, tid = parts
    cx, cy, w, h = map(float, (cx, cy, w, h))
    tid = int(float(tid))
    # denormalize to pixels
    cx_px = cx * W
    cy_px = cy * Himg
    w_px  = w  * W
    h_px  = h  * Himg
    # bottom-center
    x_bc = cx_px
    y_bc = cy_px + 0.5 * h_px
    return tid, x_bc, y_bc


In [12]:
# ====== PREP ======
# Video props
cap = cv2.VideoCapture(VIDEO)
fps  = cap.get(cv2.CAP_PROP_FPS)
W    = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
Himg = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
if not fps or fps <= 0:
    raise RuntimeError("Could not read FPS from video; set fps manually.")


In [13]:
import re

In [14]:
if H.shape != (3,3):
    raise ValueError("Homography must be 3x3.")
# Frame txts
txt_files = sorted(glob.glob(os.path.join(LABELS_DIR, "*.txt")), key=natural_key)
if not txt_files:
    raise FileNotFoundError(f"No .txt files found in {LABELS_DIR}")


In [15]:
# ====== BUILD TRACKS IN METERS ======
tracks_xy = defaultdict(list)   # tid -> list of (t_sec, X_m, Y_m)

for fi, path in enumerate(txt_files):
    t = fi / fps  # seconds since start (assumes 1 txt per frame, in order)
    with open(path, "r") as f:
        for line in f:
            parsed = parse_line_6col(line, W, Himg)
            if parsed is None:
                continue
            tid, x_bc, y_bc = parsed
            X, Y = img_to_ground(x_bc, y_bc, H)
            tracks_xy[tid].append((t, X, Y))


In [None]:
# ====== SPEEDS ======
per_track_avg_kmh = {}
per_track_inst = {}  # tid -> list of instantaneous speeds (m/s)

for tid, seq in tracks_xy.items():
    if len(seq) < 2: 
        continue
    seq.sort(key=lambda z: z[0])

    # instantaneous speeds with simple outlier filtering
    inst = []
    dsum = 0.0
    tsum = 0.0
    for (t0, X0, Y0), (t1, X1, Y1) in zip(seq[:-1], seq[1:]):
        dt = t1 - t0
        if dt <= 1e-6:
            continue
        dx = X1 - X0
        dy = Y1 - Y0
        dist = float(np.hypot(dx, dy))  # meters
        if dist > MAX_STEP_M:
            # likely ID switch or bad projection for this step; skip this step only
            continue
        v = dist / dt  # m/s
        inst.append(v)
        dsum += dist
        tsum += dt

    if tsum < MIN_TRACK_SECONDS:
        continue

    if inst:
        if USE_MEDIAN_INSTANT:
            v_mps = float(np.median(inst))
        else:
            v_mps = dsum / tsum
        per_track_avg_kmh[tid] = v_mps * 3.6
        per_track_inst[tid] = inst

# Overall average across cars
if per_track_avg_kmh:
    overall_avg_kmh = float(np.mean(list(per_track_avg_kmh.values())))
else:
    overall_avg_kmh = 0.0

print(f"Tracks with valid speed: {len(per_track_avg_kmh)}")
print(f"Overall average speed: {overall_avg_kmh:.1f} km/h")

# Show a few examples
for tid, v in list(per_track_avg_kmh.items())[:10]:
    print(f"Track {tid}: {v:.1f} km/h")

# (Optional) save per-track speeds
import csv
with open(f"{CSV}/per_track_speeds.csv", "w", newline="") as f:
     w = csv.writer(f); w.writerow(["track_id", "avg_kmh"])
     for tid, kmh in sorted(per_track_avg_kmh.items()):
         w.writerow([tid, f"{kmh:.3f}"])

Tracks with valid speed: 22
Overall average speed: 54.1 km/h
Track 1: 34.6 km/h
Track 2: 24.2 km/h
Track 3: 33.0 km/h
Track 4: 40.6 km/h
Track 5: 40.8 km/h
Track 6: 43.2 km/h
Track 7: 33.6 km/h
Track 8: 41.9 km/h
Track 9: 38.0 km/h
Track 10: 29.0 km/h
