## Basic version

### Import các thư viện cần thiết

In [None]:
!pip install ultralytics



In [None]:
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO

In [None]:
model = YOLO('yolo11l.pt')

# open the video file
video_path = '/content/vietnam.mp4'
cap = cv2.VideoCapture(video_path)


### Thiết lập VideoWriter để lưu video kết quả

In [None]:
# get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# create VideoWriter object
video_name = video_path.split('/')[-1]
output_path = f"/content/{video_name.split('.')[0]}_tracked.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

### Khởi tạo lịch sử theo dõi và vòng lặp xử lý frames

In [None]:
# Store the track history
track_history = defaultdict(lambda: [])

# Loop through the video frames
while cap.isOpened():
    # read a frame from the video
    success, frame = cap.read()

    if success:
        # run YOLO11 tracking on the frame, persisting tracks between frames
        results = model.track(frame, persist=True, show=False)

        # Get the boxes and track IDs (with error handling)
        boxes = results[0].boxes.xywh.cpu()
        try:
            track_ids = results[0].boxes.id
            if track_ids is not None:
                track_ids = track_ids.int().cpu().tolist()
            else:
                track_ids = [] # no tracks found in this frame
        except AttributeError:
            track_ids = [] # handle case where tracking fails

        # visualize the results on the frame
        annotated_frame = results[0].plot()

        # plot the tracks only if we have valid tracking data
        if track_ids:
            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history[track_id]
                track.append((float(x), float(y))) # x, y center point

                if len(track) > 120: # retain 30 tracks for 30 frame
                    track.pop(0)

                # draw the tracking lines
                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(
                    annotated_frame,
                    [points],
                    isClosed=False,
                    color=(230, 230, 230),
                    thickness=4
                )
        # write the frame to output video
        out.write(annotated_frame)
    else:
        # break the loop if the end of the video is reached
        break

# release everything
cap.release()
out.release()
print(f'video has been saved to {output_path}')


0: 384x640 19 persons, 6 cars, 26 motorcycles, 2 buss, 2 trucks, 1369.7ms
Speed: 5.2ms preprocess, 1369.7ms inference, 7.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 19 persons, 6 cars, 26 motorcycles, 1 bus, 2 trucks, 4202.3ms
Speed: 4.4ms preprocess, 4202.3ms inference, 2.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 19 persons, 5 cars, 25 motorcycles, 1 bus, 3 trucks, 2036.5ms
Speed: 4.5ms preprocess, 2036.5ms inference, 12.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 19 persons, 5 cars, 25 motorcycles, 1 bus, 4 trucks, 2552.5ms
Speed: 4.6ms preprocess, 2552.5ms inference, 5.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 18 persons, 5 cars, 25 motorcycles, 1 bus, 4 trucks, 1 backpack, 2267.2ms
Speed: 4.7ms preprocess, 2267.2ms inference, 2.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 18 persons, 5 cars, 25 motorcycles, 1 bus, 4 trucks, 1 backpack, 2814.5ms
Speed: 5.0ms preprocess, 2814.5ms i

## Version optimized

### Import các thư viện cần thiết

In [None]:
!pip install loguru

Collecting loguru
  Downloading loguru-0.7.3-py3-none-any.whl.metadata (22 kB)
Downloading loguru-0.7.3-py3-none-any.whl (61 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/61.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.6/61.6 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.7.3


In [None]:
import argparse
from collections import defaultdict
import cv2
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO
from loguru import logger

### Định nghĩa cấu hình và khởi tạo video

In [None]:
def load_config():
    """Load and return configuration settings"""
    return {
        'model_path': 'yolo11x.pt',
        'track_history_length': 120,
        'batch_size': 64,
        'line_thickness': 4,
        'track_color': (230, 230, 230)
    }

def initialize_video(video_path):
    """Initialize video capture and writer objects"""
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    video_name = video_path.split('/')[-1]
    output_path = f"/content/{video_name.split('.')[0]}_tracked.mp4"
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    return cap, out, output_path

### Cập nhật lịch sử theo dõi

In [None]:
def update_track_history(
    track_history,
    last_seen,
    track_ids,
    frame_count,
    batch_size,
    frame_idx,
    history_length
):
    """Update tracking history and remove old tracks"""
    current_tracks = set(track_ids)
    for track_id in list(track_history.keys()):
        if track_id in current_tracks:
            last_seen[track_id] = frame_count - (batch_size - frame_idx - 1)
        elif frame_count - last_seen[track_id] > history_length:
            del track_history[track_id]
            del last_seen[track_id]

### Vẽ các đường theo dõi trên frame

In [None]:
def draw_tracks(frame, boxes, track_ids, track_history, config):
    """Draw tracking lines on frame"""
    if not track_ids:
        return frame

    for box, track_id in zip(boxes, track_ids):
        x, y, w, h = box
        track = track_history[track_id]
        track.append((float(x), float(y))) # x, y center point

        if len(track) > config['track_history_length']: # retain 30 tracks for 30 frame
            track.pop(0)

        # draw the tracking lines
        points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
        cv2.polylines(
            annotated_frame,
            [points],
            isClosed=False,
            color=config['track_color'],
            thickness=config['line_thickness']
        )
    return frame

### Xử lý một batch các frames

In [None]:
def process_batch(model, batch_frames, track_history, last_seen, frame_count, config):
    """Process a batch of frames through YOLO model"""
    results = model.track(
        batch_frames,
        persist=True,
        tracker='botsort.yaml',
        show=False,
        verbose=False,
        iou=0.5
    )

    processed_frames = []
    for frame_idx, result in enumerate(results):
       boxes = result.boxes.xywh.cpu()
       track_ids = (
           result.boxes.id.int().cpu().tolist() if result.boxes.id is not None else []
       )

       update_track_history(
           track_history,
           last_seen,
           track_ids,
           frame_count,
           len(batch_frames),
           frame_idx,
           config['track_history_length']
       )

       annotated_frame = result.plot(font_size=4, line_width=2)
       annotated_frame = draw_tracks(
           annotated_frame, boxes, track_ids, track_history, config
       )
       processed_frames.append(annotated_frame)
    return processed_frames

### Hàm chính xử lý video

In [None]:
def main(video_path):
    """Main function to process video"""
    CONFIG = load_config()
    model = YOLO(CONFIG.get('model_path', 'yolo11x.pt'))

    cap, out, output_path = initialize_video(video_path)
    track_history = defaultdict(lambda: [])
    last_seen = defaultdict(int)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    with tqdm(
        total=total_frames,
        desc='Processing frames',
        colour='green'
    ) as pbar:
        frame_count = 0
        batch_frames = []

        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                break

            frame_count += 1
            batch_frames.append(frame)

            if len(batch_frames) == CONFIG['batch_size'] or frame_count == total_frames:
                try:
                    processed_frames = process_batch(
                        model,
                        batch_frames,
                        track_history,
                        last_seen,
                        frame_count,
                        CONFIG
                    )
                    for frame in processed_frames:
                        out.write(frame)
                        pbar.update(1)
                    batch_frames = []
                except Exception as e:
                    logger.error(
                        f'Error when handlinig frames {frame_count - len(batch_frames) + 1} tp {frame_count}: {str(e)}'
                    )
                    batch_frames = []
                    continue

    try:
        cap.release()
        out.release()
        cv2.destroyAllWindows()
        logger.info(f'{output_path}')
    except Exception as e:
        logger.error(f'{str(e)}')

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--video-path', type=str, default='/content/vietnam.mp4')
    args = parser.parse_args()
    main(args.video_path)

usage: colab_kernel_launcher.py [-h] [--video-path VIDEO_PATH]
colab_kernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-c879c4d1-e4dd-4ad4-94ee-9334fff9352d.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
