<a href="https://colab.research.google.com/github/Tuevu110405/AIO_Module_7/blob/feature%2Ftraining/detection3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO


In [None]:
model = YOLO("yolov11l.pt")

video_path = "samples/vietnam.mp4"
cap = cv2.VideoCapture(video_path)

In [None]:
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

video_name = video_path.split("/")[-1]
output_path = f"run/{video_name.split('.')[0]}_tracked.mp4"
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))

In [None]:
#Store the track history
track_history = defaultdict(lambda: [])
while cap.isOpened():
    sucess, frame = cap.read()

    if sucess:
        results = model.track(frame, persist=True, show=False)
        boxes = results[0].boxes.xywh.cpu()
        try:
            track_ids = results[0].boxes.id
            if track_ids is not None:
                track_ids = track_ids.int().cpu().tolist()

            else:
                track_ids = []

        except AttributeError:
            track_ids = []

        annotated_frame = results[0].plot()

        if track_ids:
            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history[track_id]
                track.append((float(x), float(y)))

                if len(track) > 120:
                    track.pop(0)

                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=4)



        out.write(annotated_frame)
    else:
        break

cap.release()
out.release()
print(f"Video has been saved to {output_path}")




In [None]:
import argparse
from collections import defaultdict
import cv2
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO
from loguru import logger


In [None]:
def load_config():
    return {
        "model_path": "yolo11x.pt",
        "track_history_length": 120,
        "batch_size" : 4,
        "track_color" : (230, 230, 230),
    }

def initialize_video(video_path):
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    video_name = video_path.split("/")[-1]
    output_path = f"run/{video_name.split('.')[0]}_tracked.mp4"
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))

    return cap, out, output_path

In [None]:
def update_track_history(
        track_history,
        last_seen,
        track_ids,
        frame_count,
        batch_size,
        frame_idx,
        history_length,
):

    current_tracks = set(track_ids)
    for track_id in list(track_history.keys()):
        if track_id not in current_tracks:
            last_seen[track_id] = frame_count - (batch_size - frame_idx - 1)

        elif frame_count - last_seen[track_id] > history_length:
            del track_history[track_id]
            del last_seen[track_id]




In [None]:
def draw_tracks(frame, boxes, track_ids, track_history, config):
    if not track_ids:
        return frame

    for box, track_id in zip(boxes, track_ids):
        x, y, w, h = box
        track = track_history[track_id]
        track.append((float(x), float(y)))
        if len(track) > config["track_history_length"]:
            track.pop(0)

        points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
        cv2.polylines(frame,
                      [points],
                      isClosed=False,
                      color = config["track_color"],
                      thickness = config["line_thickness"])

    return frame



In [None]:
def process_batch(model, batch_frames, track_history, frame_count, config):
    results = model.track(
        batch_frames,
        persist = True,
        tracker = "botsort.yaml",
        show = False,
        verbose = False,
        iou = 0.5,
    )

    processed_frames = []
    for frame_idx, result in enumerate(results):
        boxes = result.boxes.xywh.cpu()
        track_ids = (
            result.boxes.id.int().cpu().tolist() if result.boxes.id is not None
    else []
        )

        update_track_history(
            track_history,
            last_seen,
            track_ids,
            frame_count,
            len(batch_frames),
            frame_idx,
            config["track_history_length"],
        )

        annotated_frame = result.plot(font_size = 4, line_width = 2)
        annotated_frame = draw_tracks(
            annotated_frame, boxes, track_ids, track_history, config
        )
        processed_frames.append(annotated_frame)

    return processed_frames


In [None]:
def main(video_path):
    CONFIG = load_config()
    model(CONFIG.get("model_path", "yolo11x.pt"))

    cap, out, output_path = initialize_video(video_path)
    track_history = defaultdict(int)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    with tqdm(
        total=total_frames,
        desc = "Processing frames",
        colour = "green",
    ) as pbar:
        frame_count = 0
        batch_frames = []

        while cap.isOpened():
            success, frame = cap.read()

            if not success:
                break

            batch_frames.append(frame)
            frame_count += 1

            if len(batch_frames) == CONFIG["batch_size"] or frame_count == total_frames:
                try:
                    processed_frames = process_batch(
                        model,
                        batch_frames,
                        track_history,
                        last_seen,
                        frame_count,
                        CONFIG,
                    )
                    for frame in processed_frames:
                        out.write(frame)
                        pbar.update(1)
                    batch_frames = []

                except Exception as e:
                    logger.error(
                        f"Error when handling frames {frame_count - len(batch_frames) + 1} to {frame_count} : {str(e)}"
                    )
                    batch_frames = []
                    continue

    try:
        cap.release()
        out.release()
        cv2.destroyAllWindows()
        logger.info(f"{output_path}")

    except Exception as e:
        logger.error(f"{str(e)}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--video-path", type=str, default = "samples/vietnam-2.mp4")
    args = parser.parser_args()
    main(args.video_path)


Object counting

In [None]:
import cv2
from ultralytics import solutions

In [None]:
cap = cv2.VideoCapture("samples/highway.mp4")
assert cap.isOpened(), "Error reading video file"
w, h, fps = (
    int(cap.get(x))
    for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS)
)

In [None]:
#Define region points
#region_points = [(20, 400), (1080, 400)]
region_points = [
    (430, 700),
    (1600, 700),
    (1600, 1080),
    (430, 1080),
]

In [None]:
video_writer = cv2.VideoWriter(
    "./run/highway_tracked.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)
)


In [None]:
counter = solutions.ObjectCounter(
    show = False,
    region = region_points,
    model = "yolo11x.pt"
)

In [None]:
while cap.isOpened():
    sucess, im0 = cap.read()
    if not sucess:
        print(
            "Video frame is empty or video processing has been sucessfully completed."
        )
        break
    im0 = counter.count(im0)
    video_writer.write(im0)

cap.release()
video_writer.release()
cv2.destroyAllWindows()

In [None]:
from ultralytics import YOLOWorld
from ultralytics.engine.results import Boxes

from src.utils import save_detection_results

In [None]:
model = YOLOWorld("yolov8s-world.pt")

In [None]:
model.set_classes(["bus"])

In [None]:
results: Boxes = model.predict("samples/bus.jpg")

In [None]:
save_detection_results(results)