### There is a project where is from here : [click here](https://github.com/finyster/carflow.git)

## Main Function

### A Part

In [23]:
# line_selector_routes.py

import cv2
import yaml
import argparse
import sys
from pathlib import Path

"""
line_selector_routes.py
-----------------------
Interactively define traffic-counting routes for an intersection video.

• Left-click two points to create one line.  
  - **Entry line (green)** is drawn first,  
  - **Exit line (red)** follows automatically.  
  Each entry/exit pair constitutes one route.
• Press **r** to reset and redraw, **s** to save, **Esc** to quit without saving.
• The script writes/updates `config.yaml` with keys:
    video_path: <input video>
    routes:
      - entry: {p1: [x, y], p2: [x, y]}
        exit : {p1: [x, y], p2: [x, y]}

Author : Lin J-H and Hjc , 2025
"""

# ------------------------------------------------------------------
# 1) 解析命令列參數，若無提供則採用「預設路徑」
# ------------------------------------------------------------------
parser = argparse.ArgumentParser(description="Select entry/exit lines for each route")

# 將 --video 改為非必要參數，並預設為當前資料夾底下的 demo.mp4
parser.add_argument(
    "--video",
    required=False,
    default="videos/demo.mp4",
    help="影片檔案路徑（預設：demo.mp4）"
)

# 將 --config 改為非必要參數，並預設為 config.yaml
parser.add_argument(
    "--config",
    required=False,
    default="config.yaml",
    help="YAML 設定檔路徑（預設：config.yaml）"
)

# 改用 parse_known_args() 以避免 Jupyter 自動傳入的 --f=... 參數導致錯誤
args, _ = parser.parse_known_args()

# ------------------------------------------------------------------
# 2) 確認預設路徑或使用者傳入的路徑是否存在
# ------------------------------------------------------------------
video_path = args.video
config_path = args.config

# 檢查影片檔是否存在
if not Path(video_path).is_file():
    print(f"[Error] 無法找到影片檔：{video_path}")
    sys.exit(1)

# 讀取或初始化 config.yaml
try:
    if Path(config_path).is_file():
        with open(config_path, "r", encoding="utf-8") as f:
            cfg = yaml.safe_load(f) or {}
    else:
        cfg = {}
except yaml.YAMLError as e:
    print(f"[Error] 讀取 YAML 失敗：\n{e}")
    sys.exit(1)

# ------------------------------------------------------------------
# 3) 使用 OpenCV 開啟影片
# ------------------------------------------------------------------
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
if not ret:
    raise RuntimeError(f"Unable to read video file: {video_path}")

clicks = []
routes = []
route_idx = 1
line_type = "entry"  # toggles between drawing entry (green) and exit (red) lines

# Mouse callback: collect two clicks → draw line, store coordinates.
def mouse_cb(event, x, y, flags, _):
    global clicks, line_type, route_idx
    if event == cv2.EVENT_LBUTTONDOWN:
        clicks.append((x, y))
        cv2.circle(frame, (x, y), 5, (0, 255, 255), -1)
        if len(clicks) == 2:
            p1, p2 = clicks
            color = (0, 255, 0) if line_type == "entry" else (0, 0, 255)
            name = f"route{route_idx}_{line_type}"
            # Draw the line on the preview frame and label it.
            cv2.line(frame, p1, p2, color, 2)
            cv2.putText(frame, name, p1, cv2.FONT_HERSHEY_SIMPLEX,
                        0.8, color, 2, cv2.LINE_AA)
            if line_type == "entry":
                routes.append({"entry": {"p1": list(p1), "p2": list(p2)}})
                line_type = "exit"
            else:
                routes[-1]["exit"] = {"p1": list(p1), "p2": list(p2)}
                line_type = "entry"
                route_idx += 1
            clicks.clear()

cv2.namedWindow("Select Entry/Exit Lines", cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback("Select Entry/Exit Lines", mouse_cb)
print("Click TWO points to make ONE line. Draw an entry line first (green), then an exit line (red). Repeat for as many routes as needed.")
print("Press 's' to save, 'r' to reset and redraw, or Esc to exit without saving.")

# Real-time preview loop: listen for user key commands.
while True:
    cv2.imshow("Select Entry/Exit Lines", frame)
    key = cv2.waitKey(1) & 0xFF
    if key == ord('r'):
        # 回到影片第一幀，重置畫面
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        ret, frame = cap.read()
        routes.clear()
        clicks.clear()
        route_idx = 1
        line_type = "entry"
        print("Reset complete. You can start drawing again.")
    elif key == ord('s'):
        break
    elif key == 27:  # Esc
        cap.release()
        cv2.destroyAllWindows()
        print("Exit without saving.")
        sys.exit(0)

cap.release()
cv2.destroyAllWindows()

# ------------------------------------------------------------------
# 4) 儲存 routes 回 config.yaml
# ------------------------------------------------------------------
cfg["video_path"] = video_path
cfg["routes"]     = routes

with open(config_path, "w", encoding="utf-8") as f:
    yaml.dump(cfg, f, sort_keys=False, allow_unicode=True)

print(f"Wrote {config_path}. Total routes saved: {len(routes)} (each includes an entry and exit line).")


Click TWO points to make ONE line. Draw an entry line first (green), then an exit line (red). Repeat for as many routes as needed.
Press 's' to save, 'r' to reset and redraw, or Esc to exit without saving.
Wrote config.yaml. Total routes saved: 2 (each includes an entry and exit line).


### B part

In [24]:
# utils/tracker.py
# Minimal centroid tracker used by CarFlow project.
# Author: <Your Name> · 2025

import math

class Tracker:
    """
    Simple centroid‑based tracker.

    How it works
    ------------
    1.  The caller passes a list of bounding boxes in **(x, y, w, h)** format.
        Each box is assumed to be a detection for the current video frame.
    2.  For every detection we compute its centroid (cx, cy) and search for the
        previously stored centroid that is within ``max_distance`` pixels.
        • If a match is found, we keep the same object ID.  
        • Otherwise a **new object ID** is assigned.
    3.  The tracker returns a list shaped like
        ``[[x, y, w, h, id], …]`` so downstream code can draw the box and
        display a consistent object ID.

    This implementation purposefully stays minimal:
      * **No track termination** – IDs live forever once created.  
      * **No motion prediction / Kalman filter** – a pure distance check.  
      * **No appearance features** – suitable for small‑scale demos.

    Parameters
    ----------
    max_distance : int, default=70
        Maximum allowable Euclidean distance (in pixels) between the centroid of
        a detection and an existing track in order to be considered the same
        object.
    """
    def __init__(self, max_distance=70):
        self.center_points = {}          # Maps object ID → last known centroid (cx, cy)
        self.id_count      = 0
        self.max_distance  = max_distance

    def update(self, rectangles):
        objects_bbs_ids=[]
        # Iterate over every detection in the current frame
        for (x,y,w,h) in rectangles:
            cx,cy = x+w//2, y+h//2
            # Attempt to match this detection to an existing track
            matched_id=None
            for oid,pt in self.center_points.items():
                if math.hypot(cx-pt[0], cy-pt[1]) < self.max_distance:
                    matched_id=oid; break
            if matched_id is None:              # No matching track found → create a NEW object ID
                matched_id=self.id_count
                self.id_count+=1
            self.center_points[matched_id]=(cx,cy)
            objects_bbs_ids.append([x,y,w,h,matched_id])

        # Note: we do not delete stale IDs in this minimal version.
        return objects_bbs_ids


### C Part

In [30]:
"""
main.py — CarFlow project
=========================
Run vehicle detection, minimal centroid tracking, and bi‑directional route counting
for an intersection video.

Pipeline
--------
1. Load *config.yaml* to obtain:
   • video path  
   • list of counting routes, each with an **entry** and **exit** line
2. Use **YOLOv8** for object detection (cars, buses, trucks, motorcycles).
3. Pass all bounding boxes to a simple centroid tracker (utils/tracker.py) to obtain
   persistent object IDs.
4. For every track, decide whether its *centroid path* crosses an entry / exit line.  
   • Crossing *entry* assigns a **route** but **does NOT** increment the counter.  
   • Crossing *exit* assigns a **serial number** **and** increments class counter.
5. Draw bounding boxes, labels, route lines, live statistics, FPS.
6. Write an annotated video (MP4) plus a CSV summary
   ```csv
   route,class,count
   route1,car,120
   route1,bus,3
   ...
   ```
"""
import os, csv, time, yaml, cv2
import numpy as np
from collections import defaultdict
from ultralytics import YOLO

# ---------- Geometry helpers ----------
def ccw(a, b, c):
    return (c[1]-a[1])*(b[0]-a[0]) > (b[1]-a[1])*(c[0]-a[0])

def crossed(p_prev, p_now, l1, l2):
    return ccw(p_prev, p_now, l1) != ccw(p_prev, p_now, l2)

def get_color(cls):
    pal = {
        "car": (0,255,255),
        "bus": (255,0,0),
        "truck": (0,0,255),
        "motorcycle": (0,255,0)   # 新增摩托車（綠色）
    }
    return pal.get(cls, (200,200,200))

# ---------- Read configuration ----------
with open("config.yaml") as f:
    cfg = yaml.safe_load(f)

routes = cfg["routes"]             # Entry / Exit 線設定
TARGET_CLASSES = cfg.get("classes",["car","bus","truck", "motorcycle"])

# ---------- YOLOv8 model ----------
model = YOLO(cfg.get("model_path","yolov8l.pt"))
CLASSES = [c.strip() for c in open("coco.txt").readlines()]
CLS2ID = {c:i for i,c in enumerate(CLASSES)}

# ---------- Centroid tracker ----------
tracker = Tracker()   # 簡易中心點追蹤

# ---------- Video I/O ----------
cap = cv2.VideoCapture(cfg["video_path"])
W,H = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 30
os.makedirs("results",exist_ok=True)
out = cv2.VideoWriter("results/annotated.mp4",cv2.VideoWriter_fourcc(*"mp4v"),FPS,(W,H))

# ---------- Counters & bookkeeping ----------
route_serials = defaultdict(lambda: defaultdict(int))
vehicle_info  = {}   # id → {route,serial,class}
route_counts  = defaultdict(lambda: defaultdict(int))
last_center   = {}
# defaultdict structures track per‑route counts, serial numbering and centroid history

# ---------- Main loop ----------
frame_idx, t0 = 0, time.time()
while True:
    ret, frame = cap.read()
    if not ret: break
    frame_idx+=1
    if frame_idx % (cfg.get("skip_frame",1)+1): out.write(frame); continue

    # ---------- YOLO detection ----------
    results = model(frame,verbose=False)[0]
    dets   = []  # [x1,y1,x2,y2]
    det_cls= []  # class name
    for box in results.boxes:
        cls_name = CLASSES[int(box.cls[0])]
        if cls_name not in TARGET_CLASSES: continue
        x1,y1,x2,y2 = map(int,box.xyxy[0].tolist())
        dets.append([x1,y1,x2,y2])
        det_cls.append(cls_name)

    # ---------- Update tracker ----------
    # Tracker 需 (x,y,w,h)
    cv_rectangles = []
    for (x1,y1,x2,y2) in dets:
        cv_rectangles.append([x1,y1,x2-x1,y2-y1])  # 轉 w,h
    tracks = tracker.update(cv_rectangles)         # 回傳 [x,y,w,h,id]

    # ---------- Track‑wise processing ----------
    for (x, y, w, h, tid), cls_name in zip(tracks, det_cls):
        x1, y1, x2, y2 = x, y, x + w, y + h
        cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
        cur = (cx, cy)

        # Only if this ID was seen in the previous frame do we have a previous centroid for line‑crossing test
        if tid in last_center:
            prev_center = last_center[tid]

            # STEP‑1  :  assign a route when the centroid first crosses an *entry* line (no counting yet)
            if tid not in vehicle_info:
                for rt in routes:
                    ent1, ent2 = tuple(rt["entry"]["p1"]), tuple(rt["entry"]["p2"])
                    if crossed(prev_center, cur, ent1, ent2):
                        vehicle_info[tid] = {
                            "route": rt["name"],
                            "serial": None,  # serial number not assigned yet
                            "class": cls_name
                        }
                        break

            # STEP‑2  :  assign serial + increment counter only after crossing the *exit* line
            elif vehicle_info[tid]["serial"] is None:
                rt_name = vehicle_info[tid]["route"]
                rt_cfg = next(r for r in routes if r["name"] == rt_name)
                ext1, ext2 = tuple(rt_cfg["exit"]["p1"]), tuple(rt_cfg["exit"]["p2"])
                if crossed(prev_center, cur, ext1, ext2):
                    route_serials[rt_name][cls_name] += 1
                    serial = route_serials[rt_name][cls_name]
                    vehicle_info[tid]["serial"] = serial    # serial assigned here
                    route_counts[rt_name][cls_name] += 1    # counter incremented here

        # Store current centroid for next‑frame comparison
        last_center[tid] = cur


        # 畫框
        color=get_color(cls_name)
        cv2.rectangle(frame,(x1,y1),(x2,y2),color,2)
        if tid in vehicle_info and vehicle_info[tid]["serial"]:
            label=f'{vehicle_info[tid]["route"]}:{vehicle_info[tid]["serial"]} {cls_name}'
        else:
            label=f'{cls_name} ID:{tid}'
        cv2.putText(frame,label,(x1,y1-10),cv2.FONT_HERSHEY_SIMPLEX,0.6,color,2)

    # ---------- Draw entry / exit lines ----------
    for rt in routes:
        ent1,ent2=tuple(rt["entry"]["p1"]),tuple(rt["entry"]["p2"])
        ext1,ext2=tuple(rt["exit"]["p1"]), tuple(rt["exit"]["p2"])
        cv2.line(frame,ent1,ent2,(0,255,0),2)
        cv2.putText(frame,f'{rt["name"]}_entry',ent1,cv2.FONT_HERSHEY_SIMPLEX,0.6,(0,255,0),2)
        cv2.line(frame,ext1,ext2,(0,0,255),2)
        cv2.putText(frame,f'{rt["name"]}_exit',ext1,cv2.FONT_HERSHEY_SIMPLEX,0.6,(0,0,255),2)

    # ---------- Draw live statistics ----------
    y0=40
    for i,(rt,cnts) in enumerate(route_counts.items()):
        txt=f'{rt}: '+'  '.join([f'{k}={v}' for k,v in cnts.items()])
        cv2.putText(frame,txt,(20,y0+i*30),cv2.FONT_HERSHEY_SIMPLEX,0.8,(255,255,0),2)

    # ---------- Draw FPS ----------
    # Simple real‑time FPS estimate
    fps=frame_idx/(time.time()-t0+1e-6)
    cv2.putText(frame,f'FPS:{fps:.1f}',(W-150,30),cv2.FONT_HERSHEY_SIMPLEX,0.8,(255,255,255),2)
    cv2.namedWindow("CarFlow", cv2.WINDOW_NORMAL)
    cv2.imshow("CarFlow",frame)
    out.write(frame)
    if cv2.waitKey(1)&0xFF==27: break

cap.release(); out.release(); cv2.destroyAllWindows()

# ---------- Write CSV summary ----------
os.makedirs("results",exist_ok=True)
with open("results/counts.csv","w",newline="") as f:
    wr=csv.writer(f); wr.writerow(["route","class","count"])
    for rt,cnts in route_counts.items():
        for cls,c in cnts.items(): wr.writerow([rt,cls,c])
print("Summary written to results/counts.csv")
print("Annotated video saved to results/annotated.mp4")


Summary written to results/counts.csv
Annotated video saved to results/annotated.mp4


## Annotated Video Download.

### Annotated Link
[annotated.mp4](results/annotated.mp4)
<video src="results/annotated.mp4" width="50" controls></video>