In [22]:
import cv2
import supervision as sv
from ultralytics import YOLOv10

# 加载模型
model = YOLOv10('yolov10n.pt')
box_annotator = sv.BoxAnnotator()  # 使用新的 BoxAnnotator 类
label_annotator = sv.LabelAnnotator()

# 修改图片路径，确保路径正确
image_path = r'./images_import/CarsOnTheRoad.png'  # 替换为你的图片路径
image = cv2.imread(image_path)

# 检查图片是否加载成功
if image is None:
    print(f"Failed to load image. Check the file path: {image_path}")
else:
    # 执行目标检测
    results = model(image)[0]
    detections = sv.Detections.from_ultralytics(results)

    # 标记检测到的物体
    annotated_image = box_annotator.annotate(
        scene=image, detections=detections
    )
    annotated_image = label_annotator.annotate(
        scene=annotated_image, detections=detections
    )

    # 创建一个窗口显示图片
    window_name = "Object Detection"
    cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)  # 可调整窗口大小
    cv2.resizeWindow(window_name, 1280, 720)  # 设置窗口大小
    cv2.imshow(window_name, annotated_image)

    # 保存标记后的图片
    output_path = r'./images_import/CarsonTheRoad_annotated.jpg'
    cv2.imwrite(output_path, annotated_image)
    print(f"Annotated image saved to {output_path}")

    # 等待用户按键关闭窗口
    cv2.waitKey(0)
    cv2.destroyAllWindows()



0: 384x640 1 person, 19 cars, 2 buss, 1 truck, 25.7ms
Speed: 3.0ms preprocess, 25.7ms inference, 4.0ms postprocess per image at shape (1, 3, 384, 640)
Annotated image saved to ./images_import/CarsonTheRoad_annotated.jpg


In [6]:
import cv2
import supervision as sv
from ultralytics import YOLOv10
from collections import Counter

# 加载 YOLOv10 模型（无需 weights_only 参数）
model = YOLOv10('yolov10n.pt')
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()

# 指定视频文件路径或使用摄像头 (0)
video_path = r'./images_import/CarVideo.mp4'  # 替换为你的视频文件路径
cap = cv2.VideoCapture(video_path)

# 检查视频是否成功打开
if not cap.isOpened():
    print(f"Failed to open video. Check the file path: {video_path}")
    exit()

# 创建计数器用于统计每种物体的总数
total_count = Counter()

# 处理视频帧
while True:
    ret, frame = cap.read()
    if not ret:
        print("End of video stream or error reading frame.")
        break

    # 目标检测
    results = model(frame)[0]
    detections = sv.Detections.from_ultralytics(results)

    # 提取类别索引，并获取对应的标签名称
    class_ids = results.boxes.cls.cpu().numpy().astype(int)
    labels = [model.names[id] for id in class_ids]

    # 更新总计数
    total_count.update(labels)

    # 标注检测框和标签
    annotated_frame = box_annotator.annotate(
        scene=frame, detections=detections
    )
    annotated_frame = label_annotator.annotate(
        scene=annotated_frame, detections=detections
    )

    # 显示标注后的帧
    cv2.imshow("Video Detection", annotated_frame)

    # 按 'q' 键退出视频播放
    if cv2.waitKey(1) & 0xFF == ord('q'):
        print("Video playback stopped by user.")
        break

# 输出统计结果
print("Total Detection Count:")
for label, quantity in total_count.items():
    print(f"{label}: {quantity}")

# 释放视频捕获对象并关闭所有窗口
cap.release()
cv2.destroyAllWindows()



0: 384x640 1 person, 6 cars, 1 truck, 12.5ms
Speed: 2.0ms preprocess, 12.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6 cars, 15.9ms
Speed: 2.0ms preprocess, 15.9ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 11.2ms
Speed: 2.0ms preprocess, 11.2ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 12.7ms
Speed: 1.0ms preprocess, 12.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 8.3ms
Speed: 2.0ms preprocess, 8.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 1 truck, 9.1ms
Speed: 2.0ms preprocess, 9.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 8 cars, 1 truck, 10.6ms
Speed: 1.0ms preprocess, 10.6ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 9 cars, 1 truck, 

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from filterpy.kalman import KalmanFilter

# SORT 算法实现
def linear_assignment(cost_matrix):
    try:
        from scipy.optimize import linear_sum_assignment
        x, y = linear_sum_assignment(cost_matrix)
        return np.array(list(zip(x, y)))
    except ImportError:
        raise ImportError("Install `scipy` for linear sum assignment.")

def iou_batch(bb_test, bb_gt):
    bb_gt = np.expand_dims(bb_gt, 0)
    bb_test = np.expand_dims(bb_test, 1)
    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
    yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
    yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
              + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
    return o

def convert_bbox_to_z(bbox):
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w / 2.
    y = bbox[1] + h / 2.
    s = w * h
    r = w / float(h)
    return np.array([x, y, s, r]).reshape((4, 1))

def convert_x_to_bbox(x, score=None):
    w = np.sqrt(x[2] * x[3])
    h = x[2] / w
    if score is None:
        return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2.]).reshape((1, 4))
    else:
        return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2., score]).reshape((1, 5))

class KalmanBoxTracker(object):
    count = 0
    def __init__(self, bbox):
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([[1, 0, 0, 0, 1, 0, 0],
                              [0, 1, 0, 0, 0, 1, 0],
                              [0, 0, 1, 0, 0, 0, 1],
                              [0, 0, 0, 1, 0, 0, 0],
                              [0, 0, 0, 0, 1, 0, 0],
                              [0, 0, 0, 0, 0, 1, 0],
                              [0, 0, 0, 0, 0, 0, 1]])
        self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0],
                              [0, 1, 0, 0, 0, 0, 0],
                              [0, 0, 1, 0, 0, 0, 0],
                              [0, 0, 0, 1, 0, 0, 0]])
        self.kf.R[2:, 2:] *= 10.
        self.kf.P[4:, 4:] *= 1000.
        self.kf.P *= 10.
        self.kf.Q[-1, -1] *= 0.01
        self.kf.Q[4:, 4:] *= 0.01
        self.kf.x[:4] = convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0

    def update(self, bbox):
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(convert_bbox_to_z(bbox))

    def predict(self):
        if (self.kf.x[6] + self.kf.x[2]) <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        return convert_x_to_bbox(self.kf.x)
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    if (len(trackers) == 0):
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)

    iou_matrix = iou_batch(detections, trackers)

    if min(iou_matrix.shape) > 0:
        a = (iou_matrix > iou_threshold).astype(np.int32)
        if a.sum(1).max() == 1 and a.sum(0).max() == 1:
            matched_indices = np.stack(np.where(a), axis=1)
        else:
            matched_indices = linear_assignment(-iou_matrix)
    else:
        matched_indices = np.empty(shape=(0, 2))

    unmatched_detections = []
    for d, det in enumerate(detections):
        if (d not in matched_indices[:, 0]):
            unmatched_detections.append(d)
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if (t not in matched_indices[:, 1]):
            unmatched_trackers.append(t)

    matches = []
    for m in matched_indices:
        if (iou_matrix[m[0], m[1]] < iou_threshold):
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1, 2))
    if (len(matches) == 0):
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
class Sort(object):
    def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []
        self.frame_count = 0

    def update(self, dets=np.empty((0, 5))):
        self.frame_count += 1
        trks = np.zeros((len(self.trackers), 5))
        to_del = []
        ret = []
        for t, trk in enumerate(trks):
            pos = self.trackers[t].predict()[0]
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):
                to_del.append(t)
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        for t in reversed(to_del):
            self.trackers.pop(t)
        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks, self.iou_threshold)
        for m in matched:
            self.trackers[m[1]].update(dets[m[0], :])
        for i in unmatched_dets:
            trk = KalmanBoxTracker(dets[i, :])
            self.trackers.append(trk)
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            d = trk.get_state()[0]
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                ret.append(np.concatenate((d, [trk.id + 1])).reshape(1, -1))
            i -= 1
            if trk.time_since_update > self.max_age:
                self.trackers.pop(i)
        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 5))

# 主逻辑整合 YOLO 和 SORT
model = YOLO('yolov8n.pt')
vehicle_classes = ['car', 'truck', 'bus', 'motorcycle']
tracker = Sort(max_age=3, min_hits=3, iou_threshold=0.3)
cap = cv2.VideoCapture('./images_import/CarVideo.mp4')
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
output_video = cv2.VideoWriter(
    'output_with_vehicle_count.mp4',
    cv2.VideoWriter_fourcc(*'mp4v'),
    fps,
    (frame_width, frame_height)
)
vehicle_count = set()

while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = model(frame_rgb)
    detections = results[0]
    bboxes = detections.boxes.xyxy.cpu().numpy()
    scores = detections.boxes.conf.cpu().numpy()
    class_ids = detections.boxes.cls.cpu().numpy().astype(int)
    vehicle_detections = []
    for bbox, score, class_id in zip(bboxes, scores, class_ids):
        if model.names[class_id] in vehicle_classes:
            vehicle_detections.append(np.append(bbox, score))
    vehicle_detections = np.array(vehicle_detections)
    tracks = tracker.update(vehicle_detections)
    for track in tracks:
        x1, y1, x2, y2, track_id = track.astype(int)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, f"ID {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        vehicle_count.add(track_id)
    cv2.putText(frame, f"Total Vehicles: {len(vehicle_count)}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    cv2.imshow('Vehicle Tracking', frame)
    output_video.write(frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
output_video.release()
cv2.destroyAllWindows()
print(f"Total unique vehicles: {len(vehicle_count)}")
