In [None]:
import cv2
import json
import torch
import numpy as np
from datetime import datetime
from ultralytics import YOLO
from shapely.geometry import Point, Polygon
from collections import defaultdict
from sqlalchemy.orm import Session
from be_fastapi.app.database.session import SessionLocal
from be_fastapi.app.models.zone import Zone 

class VideoProcessor:
    def __init__(self, model_path='be_fastapi\\app\\engine\\ckpt\\best.onnx', source_id=None):
        """
        model_path: checkpoint (.pt or .onnx)
        source_id: source ID
        """
        self.source_id = source_id
        if model_path.endswith('.onnx'):
            self.device = 'cpu'
            print(f"[Source {self.source_id}] Running ONNX mode on CPU")
        else:
            self.device = 0 if torch.cuda.is_available() else 'cpu'
            print(f"[Source {self.source_id}] Using device: {torch.cuda.get_device_name(0) if self.device == 0 else 'CPU'}")

        self.model = YOLO(model_path, task='detect')

        self.CLASS_NAMES = {
            0: 'Car', 1: 'Bus', 2: 'Truck', 3: 'Motorcycle',
            4: 'Person', 5: 'Traffic Light', 
            6: 'Helmet', 7: 'No Helmet', 8: 'License Plate'
        }
        
        self.VEHICLE_CLASSES = [0, 1, 2, 3] 
        self.ATTRIBUTE_CLASSES = [6, 7, 8]
        
        # load zones from DB
        self.zones = {} 
        if self.source_id:
            self.load_zones_from_db(self.source_id)
        else:
            print("No source_id provided, cannot load zones.")
        
        # Temporary memory
        self.track_history = defaultdict(lambda: [])
        self.zone_entry_times = defaultdict(lambda: {})

    def load_zones_from_db(self, source_id):
        """
        Load zones from the database for the given source_id.
        """
        db: Session = SessionLocal()
        try:
            db_zones = db.query(Zone).filter(Zone.source_id == source_id).all()
            
            self.zones = {}
            for z in db_zones:
                try:
                    points = z.coordinates
                    if isinstance(points, str):
                        points = json.loads(points) #convert to list if stored as string
                    if points and isinstance(points, list) and len(points) >= 3:
                        poly = Polygon(points)
                        self.zones[z.name] = poly
                    else:
                        print(f"Zone '{z.name}' has invalid coordinates.")
                except Exception as e:
                    print(f"Error parsing zone '{z.name}': {e}")
        except Exception as e:
            print(f"Error connecting to Database when loading zones: {e}")
        finally:
            db.close()

    def calculate_iou(self, box1, box2):
        """
            Calculate Intersection over Union (IoU) of two bounding boxes.
            box1, box2: [x_min, y_min, x_max, y_max]
        """
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])
        
        intersection = max(0, x2 - x1) * max(0, y2 - y1)
        area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
        area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
        
        if area1 + area2 - intersection == 0: return 0
        return intersection / (area1 + area2 - intersection)

    def estimate_speed(self, track_id, current_center):
        """
            Estimate speed in km/h based on pixel movement.
            track_id: ID of the tracked object
            sclae_factor: conversion factor from pixels to meters, in this case 0.05 m/pixel
        """
        scale_factor = 0.05
        if track_id not in self.track_history or len(self.track_history[track_id]) < 2:
            return 0.0
        prev_center = self.track_history[track_id][-1]
        pixel_dist = np.sqrt((current_center[0] - prev_center[0])**2 + (current_center[1] - prev_center[1])**2)
        speed_mps = pixel_dist * scale_factor * 30 
        return round(speed_mps * 3.6, 1)

    def process_video(self, video_source=0):
        cap = cv2.VideoCapture(video_source)
        
        while cap.isOpened():
            success, frame = cap.read()
            if not success: break

            # Run detection and tracking
            results = self.model.track(
                frame, 
                persist=True, 
                tracker="bytetrack.yaml", 
                device=self.device,
                verbose=False,
                conf=0.25
            )
            
            # If no detections, just show original frame
            if not results or not results[0].boxes:
                cv2.imshow(f"Cam {self.source_id}", frame)
                if cv2.waitKey(1) & 0xFF == ord("q"): break
                continue

            # Process results
            boxes = results[0].boxes
            current_timestamp = datetime.utcnow().isoformat() + "Z" # ISO 8601 format
            
            # Classify boxes into vehicles and attributes
            vehicles = []
            attributes = []
            for box in boxes:
                cls_id = int(box.cls[0])
                if cls_id in self.VEHICLE_CLASSES:
                    vehicles.append(box)
                elif cls_id in self.ATTRIBUTE_CLASSES:
                    attributes.append(box)

            # Process each vehicle
            dsl_objects = []
            for veh in vehicles:
                if veh.id is None: continue 
                
                track_id = int(veh.id[0])
                cls_id = int(veh.cls[0])
                x1, y1, x2, y2 = veh.xyxy[0].tolist()
                center_point = Point((x1 + x2) / 2, (y1 + y2) / 2)
                center_tuple = ((x1 + x2) / 2, (y1 + y2) / 2)

                self.track_history[track_id].append(center_tuple)
                if len(self.track_history[track_id]) > 30:
                    self.track_history[track_id].pop(0)

                # Gh√©p thu·ªôc t√≠nh
                has_helmet = None
                license_text = "Unknown"
                veh_box = [x1, y1, x2, y2]
                
                for attr in attributes:
                    attr_box = attr.xyxy[0].tolist()
                    attr_id = int(attr.cls[0])
                    if self.calculate_iou(veh_box, attr_box) > 0.01:
                        if attr_id == 6: has_helmet = True
                        elif attr_id == 7: has_helmet = False
                        elif attr_id == 8: license_text = "DETECTED_PLATE"

                current_zone_name = None
                zone_durations = {}
                for z_name, z_poly in self.zones.items():
                    if z_poly.contains(center_point):
                        current_zone_name = z_name 
                        
                        if track_id not in self.zone_entry_times: self.zone_entry_times[track_id] = {}
                        if z_name not in self.zone_entry_times[track_id]: self.zone_entry_times[track_id][z_name] = datetime.now()
                        
                        duration = (datetime.now() - self.zone_entry_times[track_id][z_name]).total_seconds()
                        zone_durations[z_name] = round(duration, 1)
                    else:
                        if track_id in self.zone_entry_times and z_name in self.zone_entry_times[track_id]:
                            del self.zone_entry_times[track_id][z_name]

                obj_data = {
                    "track_id": track_id,
                    "class_name": self.CLASS_NAMES[cls_id],
                    "class_id": cls_id,
                    "bbox": [int(x1), int(y1), int(x2), int(y2)],
                    "confidence": float(veh.conf[0]),
                    "speed_kmh": self.estimate_speed(track_id, center_tuple),
                    "direction_angle": 0.0,
                    "current_zone": current_zone_name,
                    "zone_duration_seconds": zone_durations,
                    "attributes": {
                        "has_helmet": has_helmet,
                        "license_plate_text": license_text
                    }
                }
                dsl_objects.append(obj_data)

            final_json = {
                "source_id": str(self.source_id), # G·ª≠i ID camera ƒëi k√®m
                "frame_timestamp": current_timestamp,
                "objects": dsl_objects
            }
            

            annotated_frame = results[0].plot()
            # V·∫Ω v√πng l√™n h√¨nh ƒë·ªÉ debug
            for z_name, z_poly in self.zones.items():
                pts = np.array(z_poly.exterior.coords, np.int32)
                pts = pts.reshape((-1, 1, 2))
                cv2.polylines(annotated_frame, [pts], True, (0, 255, 255), 2)
                cv2.putText(annotated_frame, z_name, (pts[0][0][0], pts[0][0][1]), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 255), 2)

            cv2.imshow(f"Cam {self.source_id}", annotated_frame)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

        cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    """
    H√†m main ƒë·ªÉ test AI Engine ƒë·ªôc l·∫≠p.
    Ch·∫°y file n√†y tr·ª±c ti·∫øp ƒë·ªÉ debug v√† ki·ªÉm tra xem model c√≥ ho·∫°t ƒë·ªông ƒë√∫ng kh√¥ng.
    """
    print("="*60)
    print("üöÄ Starting AI Engine Test Mode")
    print("="*60)
    
    # --- C·∫•u h√¨nh ---
    TEST_SOURCE_ID = 1  # ID c·ªßa camera trong Database
    MODEL_PATH = 'be_fastapi\\app\\engine\\ckpt\\best.onnx'  # ƒê∆∞·ªùng d·∫´n model
    VIDEO_SOURCE = 'test_video.mp4'  # C√≥ th·ªÉ l√† file video ho·∫∑c 0 cho webcam
    
    # --- Kh·ªüi t·∫°o Processor ---
    try:
        processor = VideoProcessor(
            model_path=MODEL_PATH,
            source_id=TEST_SOURCE_ID
        )
        
        print(f"‚úÖ Model loaded successfully: {MODEL_PATH}")
        print(f"‚úÖ Loaded {len(processor.zones)} zones from database for Source ID {TEST_SOURCE_ID}")
        
        if len(processor.zones) == 0:
            print("‚ö†Ô∏è  WARNING: No zones found! Make sure database has zones for this source_id.")
        
        print("\nüìπ Starting video processing...")
        print("   Press 'q' to quit\n")
        
        # --- Ch·∫°y x·ª≠ l√Ω video ---
        processor.process_video(video_source=VIDEO_SOURCE)
        
    except FileNotFoundError:
        print(f"‚ùå ERROR: Model file not found at: {MODEL_PATH}")
        print("   Please check the path and make sure the model file exists.")
    except Exception as e:
        print(f"‚ùå ERROR: {e}")
        import traceback
        traceback.print_exc()
    finally:
        print("\n" + "="*60)
        print("üõë AI Engine stopped")
        print("="*60)

In [2]:
# VSCode.Cell
import torch
from ultralytics import YOLO
import gc

# T·∫Øt weights_only security
_original_load = torch.load
def patched_load(*args, **kwargs):
    kwargs['weights_only'] = False
    return _original_load(*args, **kwargs)
torch.load = patched_load

try:
    print("üîÑ Loading model...")
    model = YOLO('D:\\projects\\Vehicle_Fault_Dectection\\be_fastapi\\app\\engine\\ckpt\\best.pt')
    
    print("üîÑ Exporting to ONNX (without simplify)...")
    onnx_path = model.export(
        format='onnx',
        imgsz=640,
        dynamic=True,
        simplify=False,  # ‚ö†Ô∏è T·∫Øt simplify ƒë·ªÉ tr√°nh crash
        opset=12,
        half=False
    )
    
    print(f"‚úÖ Export th√†nh c√¥ng!")
    print(f"üìÅ File: {onnx_path}")
    
    # Clear memory
    del model
    gc.collect()
    
    # Test ONNX
    print("\nüß™ Testing ONNX model...")
    onnx_model = YOLO(onnx_path)
    print("‚úÖ ONNX ho·∫°t ƒë·ªông OK!")
    
except Exception as e:
    print(f"‚ùå L·ªói: {e}")
finally:
    torch.load = _original_load

üîÑ Loading model...
üîÑ Exporting to ONNX (without simplify)...
Ultralytics YOLOv8.1.34 üöÄ Python-3.11.14 torch-2.6.0+cpu CPU (13th Gen Intel Core(TM) i9-13900HX)
YOLOv10s summary (fused): 293 layers, 8041926 parameters, 0 gradients, 24.5 GFLOPs

[34m[1mPyTorch:[0m starting from 'D:\projects\Vehicle_Fault_Dectection\be_fastapi\app\engine\ckpt\best.pt' with input shape (1, 3, 640, 640) BCHW and output shape(s) (1, 300, 6) (93.3 MB)

[34m[1mONNX:[0m starting export with onnx 1.20.0 opset 12...
[34m[1mONNX:[0m export success ‚úÖ 0.7s, saved as 'D:\projects\Vehicle_Fault_Dectection\be_fastapi\app\engine\ckpt\best.onnx' (27.6 MB)

Export complete (2.4s)
Results saved to [1mD:\projects\Vehicle_Fault_Dectection\be_fastapi\app\engine\ckpt[0m
Predict:         yolo predict task=detect model=D:\projects\Vehicle_Fault_Dectection\be_fastapi\app\engine\ckpt\best.onnx imgsz=640  
Validate:        yolo val task=detect model=D:\projects\Vehicle_Fault_Dectection\be_fastapi\app\engine\ckp