In [None]:
!nvidia-smi

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install ultralytics
!pip install opencv-python

In [None]:
# Train
from ultralytics import YOLO

model = YOLO("yolov8s.pt")
model.train(
    data="/content/drive/MyDrive/vehicle_latest_dataset/data.yaml",
    epochs=50,
    imgsz=640,
    batch=16,
    device=0,
    project="/content/drive/MyDrive/yolo_runs",
    patience=10,
    optimizer="AdamW",
    cos_lr=True,
    cache=True,
    save_period=1,
    plots=True
)

In [None]:
# Predict
import time
from ultralytics import YOLO

MODEL_PATH = "/content/drive/MyDrive/assets/traffic3.pt"
SOURCE_PATH = "/content/drive/MyDrive/assets/vehicle_vid_10.mp4"
OUTPUT_DIR = "/content/drive/MyDrive/yolo_runs/detection"
RUN_NAME = "predict_local"
IMG_SIZE = 640
CONF_THRESHOLD = 0.15

def main():
    start = time.time()
    model = YOLO(MODEL_PATH)
    print(f"Loaded model from: {MODEL_PATH}")
    results = model.predict(
        source=SOURCE_PATH,
        imgsz=IMG_SIZE,
        conf=CONF_THRESHOLD,
        project=OUTPUT_DIR,
        save=True,
        name=RUN_NAME,
        save_txt=True,
        save_conf=True,
        verbose=True,
        show=True,
        line_width=2
    )
if __name__ == "__main__":
    main()

In [None]:
# Lay frame video
import cv2

SOURCE_VIDEO = "/content/drive/MyDrive/assets/vehicle_vid_11.mp4"
OUTPUT_IMAGE = "roi_reference_frame1.png"

cap = cv2.VideoCapture(SOURCE_VIDEO)

if not cap.isOpened():
    print(f"Lỗi: Không thể mở video tại {SOURCE_VIDEO}")
else:
    # Đọc khung hình đầu tiên
    ret, frame = cap.read()
    if ret:
        # Lấy kích thước gốc
        height, width, _ = frame.shape
        print(f"Video gốc có kích thước: {width} x {height}")

        # Lưu khung hình lại
        cv2.imwrite(OUTPUT_IMAGE, frame)
        print(f"Đã lưu khung hình tham chiếu tại: {OUTPUT_IMAGE}")
    else:
        print("Lỗi: Không thể đọc frame từ video.")

    cap.release()

In [None]:
# File main project
import time
import os
import cv2
import numpy as np
from ultralytics import YOLO
from ultralytics.utils.files import increment_path
from pathlib import Path
from datetime import datetime

MODEL_PATH = "/content/drive/MyDrive/vehicle_counting_project/assets/traffic4.pt"
SOURCE_PATH = "/content/drive/MyDrive/vehicle_counting_project/assets/media/vehicle_vid_13.mp4"
OUTPUT_DIR = "/content/drive/MyDrive/vehicle_counting_project/yolo_runs"

BASE_RUN_NAME = "run"

IMG_SIZE = 640
CONF_THRESHOLD = 0.25
TRACKER_CONFIG = "/content/drive/MyDrive/vehicle_counting_project/assets/my_tracker.yaml"
POLYGON_ROI = np.array([[639, 764], [1718, 755], [1669, 552], [878, 552]], dtype=np.int32)

ROI_ALPHA = 0.3
ROI_COLOR = (255, 0, 0)
TEXT_COLOR = (255, 255, 255)

CLASS_COLOR_MAP = {
    'car': (0, 255, 0),
    'bus': (255, 0, 0),
    'truck': (0, 51, 102),
    'motor': (0, 255, 255)
}
DEFAULT_COLOR = (255, 255, 255)
TEXT_COUNTED_COLOR = (0, 0, 255)

def main():
    start_time = time.time()

    # --- INITIALIZATION ---
    model = YOLO(MODEL_PATH)
    print(f"[INFO] Model loaded successfully: {MODEL_PATH}")

    CLASS_NAMES = model.names
    print(f"[INFO] Model classes: {CLASS_NAMES}")

    vehicle_counts = {name: 0 for name in CLASS_NAMES.values()}
    id_da_dem = set()

    cap = cv2.VideoCapture(SOURCE_PATH)
    if not cap.isOpened():
        print(f"[ERROR] Unable to open source video: {SOURCE_PATH}")
        return

    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps_video = cap.get(cv2.CAP_PROP_FPS)

    # Setup Output Directory
    base_output_dir = Path(OUTPUT_DIR) / BASE_RUN_NAME
    final_run_dir = increment_path(base_output_dir, exist_ok=False)
    os.makedirs(final_run_dir, exist_ok=True)

    run_folder_name = os.path.basename(final_run_dir)
    video_filename = f"video_{run_folder_name}.mp4"
    output_video_path = os.path.join(final_run_dir, video_filename)

    report_filename = f"report_{run_folder_name}.txt"
    output_report_path = os.path.join(final_run_dir, report_filename)

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps_video, (frame_width, frame_height))

    print(f"[INFO] Output directory created: {final_run_dir}")
    print(f"[INFO] Initiating processing pipeline (stream=True)...")

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"[INFO] Total frames to process: {total_frames}")

    results_generator = model.track(
        source=SOURCE_PATH,
        imgsz=IMG_SIZE,
        conf=CONF_THRESHOLD,
        tracker=TRACKER_CONFIG,
        stream=True,
        verbose=False,
    )

    prev_frame_time = 0
    new_frame_time = 0
    fps_smoothed = 0
    alpha = 0.9

    # --- PROCESSING LOOP ---
    for frame_idx, results in enumerate(results_generator):

        # FPS Calculation
        new_frame_time = time.time()
        fps_instant = 0
        if prev_frame_time > 0:
            time_diff = new_frame_time - prev_frame_time
            if time_diff > 0:
                fps_instant = 1 / time_diff
        prev_frame_time = new_frame_time

        if fps_smoothed == 0:
            fps_smoothed = fps_instant
        else:
            fps_smoothed = (alpha * fps_smoothed) + ((1 - alpha) * fps_instant)

        fps_text = f"FPS: {int(fps_smoothed)}"

        # Progress Log
        if frame_idx % 100 == 0:
            print(f"  ... Processing: Frame {frame_idx}/{total_frames}")

        frame = results.orig_img

        # Draw ROI
        overlay = frame.copy()
        cv2.fillPoly(overlay, [POLYGON_ROI], ROI_COLOR)
        cv2.polylines(overlay, [POLYGON_ROI], isClosed=True, color=ROI_COLOR, thickness=2)
        cv2.addWeighted(overlay, ROI_ALPHA, frame, 1 - ROI_ALPHA, 0, dst=frame)

        # Draw Tracking & Counting
        boxes = results.boxes
        if boxes.id is not None:
            track_ids = boxes.id.int().tolist()
            xyxy_coords = boxes.xyxy.cpu().numpy()
            class_ids = boxes.cls.int().tolist()

            for i, track_id in enumerate(track_ids):
                x1, y1, x2, y2 = xyxy_coords[i]
                cls_name = CLASS_NAMES[class_ids[i]]

                anchor_point = (int((x1 + x2) / 2), int(y2))
                is_inside = cv2.pointPolygonTest(POLYGON_ROI, anchor_point, False)

                if is_inside > 0:
                    if track_id not in id_da_dem:
                        id_da_dem.add(track_id)
                        vehicle_counts[cls_name] += 1

                box_color = CLASS_COLOR_MAP.get(cls_name, DEFAULT_COLOR)
                text_draw_color = box_color
                if track_id in id_da_dem and is_inside > 0:
                    text_draw_color = TEXT_COUNTED_COLOR

                box_text = f"ID:{track_id} {cls_name}"
                cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), box_color, 2)
                cv2.putText(frame, box_text, (int(x1), int(y1) - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_draw_color, 2)

        # Draw FPS
        font_scale = max(0.5, frame_width / 1500)
        line_height = int(font_scale * 40)

        fps_box_width = int(frame_width * 0.12)
        fps_box_height = line_height

        overlay_fps = frame.copy()
        cv2.rectangle(overlay_fps, (0, 0), (fps_box_width, fps_box_height), (0, 0, 0), cv2.FILLED)
        cv2.addWeighted(overlay_fps, 0.6, frame, 0.4, 0, dst=frame)

        text_x = int(fps_box_width * 0.1)
        text_y = int(line_height * 0.75)
        cv2.putText(frame, fps_text, (text_x, text_y),
                    cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), 2)

        # Draw Results
        box_width = int(frame_width * 0.15)
        start_x = frame_width - box_width
        num_classes = len(vehicle_counts)
        box_height = (num_classes * line_height) + int(line_height / 2)
        y_off = line_height

        overlay_box = frame.copy()
        cv2.rectangle(overlay_box, (start_x, 0), (frame_width, box_height), (0, 0, 0), cv2.FILLED)
        cv2.addWeighted(overlay_box, 0.6, frame, 0.4, 0, dst=frame)

        for vehicle_type, count in vehicle_counts.items():
            text = f"{vehicle_type.capitalize()}: {count}"
            cv2.putText(frame, text, (start_x + int(box_width*0.1), y_off),
                        cv2.FONT_HERSHEY_SIMPLEX, font_scale, TEXT_COLOR, 2)
            y_off += line_height

        video_writer.write(frame)

    cap.release()
    video_writer.release()
    cv2.destroyAllWindows()

    # --- FINALIZATION & REPORTING ---
    end_time = time.time()
    total_seconds = end_time - start_time
    minutes = int(total_seconds // 60)
    seconds = total_seconds % 60

    time_str = f"{minutes} min {seconds:.2f} sec"

    # 1. Print to Console (Professional English)
    print("\n" + "="*40)
    print("       PROCESSING COMPLETE       ")
    print("="*40)
    print(f"Total Execution Time : {time_str}")
    print(f"Results Directory    : {final_run_dir}")
    print("-" * 40)
    print("FINAL COUNT SUMMARY:")
    for vehicle_type, count in vehicle_counts.items():
        print(f"  - {vehicle_type.capitalize():<10}: {count}")
    print("="*40)

    # 2. Generate Text Report File
    try:
        with open(output_report_path, "w") as f:
            f.write("========================================\n")
            f.write("      VEHICLE COUNTING REPORT           \n")
            f.write("========================================\n")
            f.write(f"Date & Time      : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"Model Used       : {os.path.basename(MODEL_PATH)}\n")
            f.write(f"Input Video      : {os.path.basename(SOURCE_PATH)}\n")
            f.write(f"Total Frames     : {total_frames}\n")
            f.write(f"Execution Time   : {time_str}\n")
            f.write("-" * 40 + "\n")
            f.write("FINAL COUNTS:\n")
            for vehicle_type, count in vehicle_counts.items():
                f.write(f"  - {vehicle_type.capitalize():<10}: {count}\n")
            f.write("========================================\n")
        print(f"[INFO] Report file generated successfully: report.txt")
    except Exception as e:
        print(f"[ERROR] Failed to generate report file: {e}")

if __name__ == "__main__":
    main()