# Contagem de veículos (v0.1.0)
Para a câmera dos primeiros 23 minutos - https://www.youtube.com/watch?v=3uwRHtiyMnI

In [1]:
from pathlib import Path
from copy import deepcopy

import cv2
from ultralytics import YOLO

In [2]:
model = YOLO('yolov8x')

In [3]:
video_path = 'camera.mp4'  # Local video path or RSTP Url
limit_count_not_detected = 30  # How many frames without detect a track id until remove from history
debug = True  # Whether to save predictions images

if debug:
    Path('debug').mkdir(parents=True, exist_ok=True)

# If model was already executed, reset tracker id
if model.predictor:
    model.predictor.trackers[0].reset_id()

counter = {'left': 0, 'right': 0}
detections = {}
idx_frame = 0

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise ConnectionError('Not possible to load image')

is_rtsp = video_path.startswith('rtsp://')

while True:
    ret, frame = cap.read()
    if not ret:
        if is_rtsp:
            print('Failed to receive image from RTSP stream')
            continue
        else:
            print('Video processed successfully!')
            break
        
    results_video = model.track(source=frame,  # image or video; single value or a list; URL, PIL (RGB), CV2 (BGR), ...
                                conf=0.25,
                                iou=0.7,
                                imgsz=640,
                                classes=[2, 3, 5, 7],
                                show=False,
                                save=False,
                                save_txt=False,  # Save bbox coordination
                                save_conf=False,  # save_txt must be True
                                save_crop=False,
                                verbose=False,
                                persist=True,  # To continue tracker_id
                               )

    if debug:
        img = deepcopy(frame)
        cv2.line(img, (649, 590), (180, 590), (0, 0, 255), 3)  # Left
        cv2.line(img, (650, 531), (1023, 531), (255, 0, 0), 3)  # Right
        cv2.line(img, (650, 1280), (650, 0), (255, 0, 255), 3)  # Division
    
    for result in results_video:
        for idx in range(len(result.boxes.cls)):
            track_id = int(result.boxes.id[idx])
            
            center_x = int((result.boxes.xyxy[idx][0] + result.boxes.xyxy[idx][2]) / 2)
            center_y = int((result.boxes.xyxy[idx][1] + result.boxes.xyxy[idx][3]) / 2)
    
            side = 'left' if center_x < 650 else 'right'
            if side == 'left':
                position_y = 'up' if center_y < 590 else 'down'
            else:
                position_y = 'up' if center_y < 531 else 'down'
    
            if debug:
                circle_color = (0, 0, 200) if side == 'left' else (200, 0, 0)
                cv2.circle(img, (center_x, center_y), color=circle_color, radius=1, thickness=10)
                cv2.putText(img, f'{track_id}-{int(result.boxes.cls[idx])}', (center_x, center_y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
            
            # Track_id already detected
            if track_id in detections:
                detections[track_id]['count_not_detected'] = 0
                if detections[track_id]['position_y'] != position_y:
                    # Only consider those who crossed the line at the correct position_y, otherwise, treat it as a mistake and ignore it
                    if (side == 'left' and position_y == 'down') or (side == 'right' and position_y == 'up'):
                        counter[side] += 1
                        detections[track_id]['position_y'] = position_y
            # New track_id
            else:
                detections[track_id] = {'count_not_detected': 0, 'side': side, 'position_y': position_y}
    
    if debug:
        # White board
        cv2.rectangle(img, (10, 10), (230, 95), (255, 255, 255), thickness=-1)
        cv2.rectangle(img, (10, 10), (230, 95), (0, 0, 0), thickness=2)
        cv2.putText(img, f'Left: {counter['left']}', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
        cv2.putText(img, f'Right: {counter['right']}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
        cv2.imwrite(f'debug/{idx_frame}.png', img)
        idx_frame += 1
    
    # Remove any detections that no longer exist for N consecutive frames
    for key in list(detections.keys()):
        detections[key]['count_not_detected'] += 1
        if detections[key]['count_not_detected'] >= limit_count_not_detected:
            del detections[key]

### Converte debug imagens para vídeo
Não faz parte do sistema principal, utilizado apenas para facilitar no debug.

In [4]:
import cv2
import os
import glob

# Path to the folder containing images
image_folder = 'debug'
pattern = os.path.join(image_folder, '*.png')

# Get list of image file paths matching the pattern
image_paths = glob.glob(pattern)

# Sort image paths based on the numeric value of the filename (e.g., 0.png, 1.png, 3.png)
def get_index_from_filename(path):
    filename = os.path.basename(path)         # e.g., '3.png'
    index_str = os.path.splitext(filename)[0] # e.g., '3'
    return int(index_str)                     # Convert to integer

# Sort using the extracted index
image_paths = sorted(image_paths, key=get_index_from_filename)

# Check if there are any images to process
if not image_paths:
    print("No images found.")
    exit()

# Read the first image to get dimensions
sample_img = cv2.imread(image_paths[0])
height, width, _ = sample_img.shape

# Video output settings
output_video_path = 'output_video.mp4'
fps = 30
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

# Write each image to the video
for img_path in image_paths:
    img = cv2.imread(img_path)
    if img is None:
        print(f"Failed to load image: {img_path}")
        continue
    video_writer.write(img)

# Release the video writer and finish
video_writer.release()
print(f"Video saved as: {output_video_path}")

Video saved as: output_video.mp4
