In [None]:
import torch
import torchvision

print(torch.__version__)         # 2.3.1+cu118
print(torchvision.__version__)   # 0.18.1+cu118
print(torch.cuda.is_available()) # True
print(torch.cuda.get_device_name(0))

2.3.1+cu118
0.18.1+cu118
True
NVIDIA GeForce RTX 4060 Laptop GPU


In [None]:
import os
import cv2
import time
import base64
import torch
import threading
import numpy as np
import cvzone
import logging
from ultralytics import YOLO
from openai import OpenAI
import json
from dotenv import load_dotenv

# === Configuration ===
VIDEO_PATH = "circulation.mp4"
YOLO_MODEL_PATH = "yolo12m.pt"
OUTPUT_FOLDER = "cropped_vehicles"
ALLOWED_CLASSES = {"car", "truck", "bus", "motorcycle", "bicycle"}

# Load environment variables
load_dotenv()
api_key_env = os.getenv("API_KEY")

# Initialize OpenAI-compatible client (e.g., OpenRouter)
client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=api_key_env,
)

# Logger setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

class SmartVehicleDetector:
    def __init__(self, video_file: str, yolo_model_path: str = YOLO_MODEL_PATH):
        self._load_yolo_model(yolo_model_path)
        self.cap = self._open_video_file(video_file)
        self.area = np.array([(420, 407), (382, 448), (940, 456), (930, 419)], np.int32)
        self.processed_track_ids = set()
        self.detected_country = "Unknown"
        self.current_date = time.strftime("%Y-%m-%d")

        self.vehicle_data_list = []
        self.output_json_path = f"vehicle_data_{self.current_date}.json"
        if not os.path.exists(self.output_json_path):
            with open(self.output_json_path, "w", encoding="utf-8") as f:
                json.dump([], f, indent=4)
            logging.info(f"📝 JSON output initialized: {self.output_json_path}")

        self.cropped_images_folder = OUTPUT_FOLDER
        os.makedirs(self.cropped_images_folder, exist_ok=True)

    def _load_yolo_model(self, path: str):
        try:
            self.yolo_model = YOLO(path)
            self.names = self.yolo_model.names
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            self.yolo_model.to(self.device)
            logging.info(f"✅ YOLO model loaded on {self.device.upper()}")
        except Exception as e:
            raise RuntimeError(f"❌ Failed to load YOLO model: {e}")

    def _open_video_file(self, path: str):
        cap = cv2.VideoCapture(path)
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
        if not cap.isOpened():
            raise FileNotFoundError(f"❌ Could not open video file: {path}")
        return cap

    def encode_image_to_base64(self, image):
        _, buffer = cv2.imencode(".jpg", image)
        return base64.b64encode(buffer).decode("utf-8")

    def detect_country_from_scene(self, frame):
        """Send the first clean frame to the LLM to estimate the country."""
        try:
            base64_img = self.encode_image_to_base64(frame)

            prompt = (
                "You are an expert in geolocation and vehicle classification. "
                "From the image provided, analyze the visual scene (vehicles, road signs, license plates, environment) "
                "and deduce the most likely country. Only return the name of the country."
            )

            completion = client.chat.completions.create(
                model="google/gemini-2.5-flash-lite",
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt},
                            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}}
                        ]
                    }
                ]
            )
            country_name = completion.choices[0].message.content.strip()
            logging.info(f"🌍 Country detected from scene: {country_name}")
            return country_name
        except Exception as e:
            logging.error(f"❌ Failed to detect country: {e}")
            return "Unknown"

    def analyze_image_with_openai(self, image_path):
        """Send a cropped vehicle image to the LLM for recognition based on the detected country."""
        try:
            with open(image_path, "rb") as img_file:
                base64_image = base64.b64encode(img_file.read()).decode("utf-8")

            prompt = (
                f"You are an expert in vehicle recognition. The vehicle is from {self.detected_country}. "
                "Analyze the image and extract:\n"
                "1. Vehicle Type (e.g. car, truck, bus, motorcycle, bicycle)\n"
                "2. Vehicle Color (e.g. red, black, white)\n"
                "3. Vehicle Brand (e.g. Toyota, Ford)\n\n"
                "Return only this table:\n"
                "| Vehicle Type | Vehicle Color | Vehicle Company |\n"
                "|--------------|---------------|-----------------|\n"
            )

            completion = client.chat.completions.create(
                model="google/gemini-2.5-flash-lite",
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt},
                            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
                        ]
                    }
                ]
            )
            return completion.choices[0].message.content.strip()
        except Exception as e:
            logging.error(f"❌ OpenAI vehicle analysis failed: {e}")
            return ""

    def process_crop_image(self, image, track_id):
        """Crop vehicle image, send to LLM, and save structured JSON data."""
        timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
        image_filename = os.path.join(self.cropped_images_folder, f"vehicle_{track_id}_{timestamp}.jpg")
        cv2.imwrite(image_filename, image)

        response = self.analyze_image_with_openai(image_filename)
        lines = response.split("\n")[2:]

        for line in lines:
            if "--------------" in line or not line.strip():
                continue
            values = [col.strip() for col in line.split("|")[1:-1]]
            if len(values) == 3:
                v_type, v_color, v_brand = values
                entry = {
                    "timestamp": timestamp,
                    "track_id": track_id,
                    "vehicle_type": v_type,
                    "vehicle_color": v_color,
                    "vehicle_company": v_brand
                }
                self.vehicle_data_list.append(entry)
                logging.info(f"🚗 JSON entry added for track ID {track_id}")
                self._save_json_data()

    def crop_and_process(self, clean_frame, box, track_id):
        """Crop vehicle from the clean frame and send for analysis."""
        if track_id in self.processed_track_ids:
            return
        x1, y1, x2, y2 = map(int, box)
        cropped_image = clean_frame[y1:y2, x1:x2]
        self.processed_track_ids.add(track_id)
        threading.Thread(target=self.process_crop_image, args=(cropped_image, track_id), daemon=True).start()

    def process_video_frame(self, clean_frame):
        """Detect and track vehicles in a single frame."""
        frame = clean_frame.copy()
        frame = cv2.resize(frame, (1020, 600))
        clean_frame_resized = frame.copy()

        results = self.yolo_model.track(frame, persist=True, device=self.device)

        if results and results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.int().cpu().tolist()
            class_ids = results[0].boxes.cls.int().cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)

            for box, track_id, class_id in zip(boxes, track_ids, class_ids):
                class_name = self.names[class_id]
                if class_name not in ALLOWED_CLASSES:
                    continue

                x1, y1, x2, y2 = map(int, box)
                if cv2.pointPolygonTest(self.area, (x2, y2), False) >= 0:
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 1)
                    cvzone.putTextRect(frame, f"ID: {track_id}", (x2, y2), 1, 1)
                    cvzone.putTextRect(frame, class_name, (x1, y1), 1, 1)
                    self.crop_and_process(clean_frame_resized, box, track_id)

        cvzone.putTextRect(frame, f"Country: {self.detected_country}", (10, 30), 1, 2)
        return frame

    def _save_json_data(self):
        """Save vehicle data list to JSON file."""
        try:
            with open(self.output_json_path, "w", encoding="utf-8") as f:
                json.dump(self.vehicle_data_list, f, indent=4)
            logging.debug("✅ JSON file updated.")
        except Exception as e:
            logging.error(f"❌ Error saving JSON file: {e}")

    def start_processing(self):
        logging.info("🎬 Starting video processing...")
        ret, frame = self.cap.read()
        if ret:
            self.detected_country = self.detect_country_from_scene(frame)

        while self.cap.isOpened():
            ret, frame = self.cap.read()
            if not ret:
                break
            annotated_frame = self.process_video_frame(frame)
            cv2.polylines(annotated_frame, [self.area], True, (0, 255, 0), 2)
            cv2.imshow("Smart Vehicle Detector", annotated_frame)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

        self.cap.release()
        cv2.destroyAllWindows()
        logging.info(f"✅ Data saved to {self.output_json_path}")

if __name__ == "__main__":
    processor = SmartVehicleDetector(VIDEO_PATH)
    processor.start_processing()
