# Version optimized
Phiên bản này được tối ưu hóa bằng cách sử dụng batching và cải thiện hiệu suất xử lý.

In [25]:
# Loguru tự động ghi lại traceback (dấu vết lỗi) mà không cần cấu hình thêm
! pip install loguru



In [26]:
# ! pip install opencv-python opencv-contrib-python

In [27]:
# from google.colab import drive
# drive.mount('/content/drive')

In [28]:
! pip install imageio[pyav]



In [29]:
! pip install ultralytics



In [30]:
import argparse
from collections import defaultdict
import cv2
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO
from loguru import logger
import imageio.v3 as iio

## Định nghĩa cấu hình và khởi tạo video

In [31]:
def load_config():
    return {
        "model_path": "./model/yolo11l.pt",
        "track_history_length": 120,
        "batch_size": 64, 
        "line_thickness": 4,
        "track_color": [230, 230, 230] # wi
    }

def initialize_video(video_path):
    cap = cv2.VideoCapture(video_path)
    file_name = video_path.split("/")[-1].split(".")[0]
    output_path = f"./{file_name}_tracking.mp4"

    return cap, output_path

## Cập nhật lịch sử theo dõi: Cập nhật lịch sử theo dõi và loại bỏ các track cũ.

In [32]:
def update_track_history(track_history, last_seen, track_ids, frame_count, batch_size, frame_idx, history_length):
    """Update tracking history and remove old tracks."""
    current_tracks = set(track_ids)
    for track_id in list(track_history.keys()): 
        if track_id in current_tracks:
            # Cập nhật thời điểm cuối cùng đối tượng được nhìn thấy
            last_seen[track_id] = frame_count - (batch_size - frame_idx - 1)
        elif frame_count - last_seen[track_id] > history_length:
            # Xóa đối tượng nếu đã quá lâu không xuất hiện
            del track_history[track_id] 
            del last_seen[track_id]  

## Vẽ các đường theo dõi trên frame

In [33]:
def draw_tracks(boxes, track_ids, track_history, annotated_frame, config):
    for box, track_id in zip(boxes, track_ids):
        x, y, w, h = box
        track = track_history[track_id]
        track.append((float(x), float(y)))  # x, y center point
        # retain 120 tracks for 120 frames
        if len(track) > config['track_history_length']:
            track.pop(0)

            # Draw the tracking lines
        points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
        cv2.polylines(annotated_frame, [points], isClosed=False, color=     
            config['track_color'], thickness=config['line_thickness'])

        return annotated_frame

## Xử lý một batch các frames

In [34]:
def process_batch (model, batch_frames, track_history, last_seen, frame_count, config):
    """Process a batch of frames through YOLO model"""
    results = model.track(
        batch_frames,
        persist=True,
        tracker="botsort.yaml",
        show=False,
        verbose=False,
        iou=0.5,
    )

    processed_frames = []
    for frame_idx, result in enumerate(results):
        boxes = result.boxes.xywh.cpu()
        track_ids = result.boxes.id.int().cpu().tolist()
        # print(boxes)
        # print(track_ids)
        annotated_frame = result.plot(font_size=4, line_width=2)
        annotated_frame = draw_tracks(boxes, track_ids, track_history, annotated_frame, config)
        processed_frames.append(annotated_frame)
    return processed_frames

## Main Processing video

In [35]:
def main(video_path):
    config = load_config()
    model = YOLO(config['model_path'])
    cap, output_path = initialize_video(video_path)
    track_history = defaultdict(list)
    last_seen = defaultdict(int)

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    with iio.imopen(output_path, "w", plugin="pyav") as writer:
        writer.init_video_stream("libx264", fps=fps)
        with tqdm(total=total_frames, desc="Processing frames", colour="green") as pbar:
            frame_count = 0
            batch_frames = []

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    print("Lỗi đọc frame")
                    # logger.info("No more frames to process. Exiting loop.")
                    break
                frame_count += 1
                batch_frames.append(frame)

                if len(batch_frames) == config['batch_size'] or frame_count == total_frames:
                    try:
                        processed_frames = process_batch(model, batch_frames, track_history, last_seen, frame_count, config)
                        for id_batch, frame in enumerate(processed_frames):
                            writer.write_frame(frame)
                            pbar.update(1)

                        batch_frames = []
                    except Exception as e:
                        # logger.error(
                        #     f"An error occurred while processing frames: {e}")
                        # logger.exception(e)  # Log the full traceback for debugging
                        print(e)

        cap.release()
        print(f"The video was successfully saved to {output_path}")

In [36]:
main('/content/drive/MyDrive/AIO/data/data_ex3_module7/video/highway.mp4')

Processing frames: 100%|[32m██████████[0m| 1275/1275 [39:36<00:00,  1.86s/it]


Lỗi đọc frame
The video was successfully saved to ./highway_tracking.mp4
