In [None]:
from pydantic import BaseModel

class VideoInput(BaseModel):
    name: str
    description: str
    url: str
    duration: int
    output_folder: str = "output"

class DetectionInput(BaseModel):
    name: str
    description: str
    video_id: str
    region_ids: list[str]
    model_name: str = "yolo11n.pt"
    tracker: str = "botsort.yaml"
    classes: list[int] = [0]
    max_frames: int = -1
    save: True

class RenderInput(BaseModel):
    video_id: str
    detection_id: str

In [None]:
import os
import uuid
import json
import subprocess
from datetime import datetime

class VideoInput(BaseModel):
    name: str
    description: str
    url: str
    duration: int
    output_folder: str = "output"

def download_video(video_input: VideoInput):
    """
    Menyimpan video stream dari URL (.m3u8) selama durasi tertentu ke output_folder/name
    dan menyimpan metadata dalam format JSON.

    Args:
        name (str): Nama file output (tanpa ekstensi).
        description (str): Deskripsi CCTV.
        url (str): URL stream (m3u8).
        duration (int): Durasi rekaman dalam detik.
        output_folder (str): Folder output utama.
    """
    video_id = str(uuid.uuid4())
    output_folder = os.path.join(output_folder, video_id)
    os.makedirs(output_folder, exist_ok=True)

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    output_filename = f"{video_input.name}_{timestamp}.mp4"
    output_path = os.path.join(output_folder, output_filename)

    ffmpeg_cmd = [
        'ffmpeg',
        '-y',
        '-i', video_input.url,
        '-t', str(video_input.duration),
        '-c', 'copy',
        output_path
    ]

    print(' '.join(ffmpeg_cmd))

    try:
        print(f"[INFO] Saving stream to {output_path} for {duration} seconds...")
        subprocess.run(ffmpeg_cmd, check=True)
        print(f"[DONE] Video saved: {output_path}")

        # Simpan metadata
        metadata = {
            "id": video_id,
            "name": name,
            "description": description,
            "url": url,
            "duration": duration,
            "created_at": datetime.now().isoformat(),
            "video_path": output_path
        }

        metadata_path = os.path.join(output_folder, "metadata.json")
        with open(metadata_path, "w", encoding="utf-8") as f:
            json.dump(metadata, f, indent=2)
        
        print(f"[INFO] Metadata saved to {metadata_path}")

    except subprocess.CalledProcessError as e:
        print(f"[ERROR] Gagal menyimpan stream: {e}")


In [None]:
import os
import uuid
import cv2
import json
from ultralytics import YOLO
from datetime import datetime, timedelta
import numpy as np

class DetectionInput(BaseModel):
    name: str
    description: str
    video_id: str
    region_ids: list[str]
    model_name: str = "yolo11n.pt"
    tracker: str = "botsort.yaml"
    classes: list[int] = [0]
    max_frames: int = -1
    save: True

def detect_video(
    detection_input: DetectionInput
):
    job_id = str(uuid.uuid4())

    cap = cv2.VideoCapture(video_path)
    assert cap.isOpened(), "Error reading video file"
    model = YOLO(model_name)
    class_names = model.names

    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0:
        fps = 30  # fallback default

    frame_number = 0
    results_json = []
    video_filename = os.path.basename(video_path)

    # konversi video_start_timestamp ke datetime object
    video_start_dt = datetime.fromisoformat(video_start_timestamp.replace("Z", "+00:00"))

    # timestamp job deteksi
    job_timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret or (max_frames > 0 and frame_number >= max_frames):
            break

        # timestamp frame sekarang
        current_timestamp = video_start_dt + timedelta(seconds=frame_number / fps)
        timestamp_iso = current_timestamp.isoformat() + "Z"

        result = model.track(
            frame,
            persist=True,
            conf=0.25,
            verbose=False,
            tracker=tracker,
            classes=classes
        )[0]

        for region in region_definitions:
            frame_objects = []
            count = 0  # inisialisasi counter

            boxes = result.boxes
            for i in range(len(boxes)):
                cls_id = int(boxes.cls[i].item())
                cls_name = class_names[cls_id]
                if cls_name not in [class_names[i] for i in classes]:
                    continue

                bbox = boxes.xyxy[i].tolist()
                track_id = int(boxes.id[i].item()) if boxes.id is not None else None
                conf = float(boxes.conf[i].item())
                center_x = (bbox[0] + bbox[2]) / 2
                center_y = (bbox[1] + bbox[3]) / 2

                inside = cv2.pointPolygonTest(
                    np.array(region['polygon'], dtype=np.int32),
                    (center_x, center_y),
                    False
                ) >= 0

                if inside:
                    count += 1

                frame_objects.append({
                    "id": track_id,
                    "bbox": [int(v) for v in bbox],
                    "class": cls_name,
                    "confidence": conf,
                    "inside_region": inside
                })

            results_json.append({
                "video_id": video_id,
                "video_filename": video_filename,
                "frame_number": frame_number,
                "timestamp": timestamp_iso,
                "region_id": region['id'],
                "region_name": region['name'],
                "region_description": region['description'],
                "region_polygon": region['polygon'],
                "objects": frame_objects,
                "count": count
            })

        frame_number += 1

    cap.release()

    if save:
        output_filename = f"detection_results_{job_id}_{job_timestamp}.json"
        output_path = os.path.join(os.path.dirname(video_path), output_filename)
        with open(output_path, "w") as f:
            json.dump(results_json, f, indent=2)
            
        print(output_path)

    return results_json
