In [None]:

!pip install -q inference-gpu[yolo-world]==0.9.12rc1
!pip install -q supervision==0.19.0rc3
!pip install fastapi uvicorn
!pip install python-multipart
!pip install pyngrok
!pip install deepl

In [None]:
import cv2
import supervision as sv
from tqdm import tqdm
from inference.models.yolo_world.yolo_world import YOLOWorld
from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import base64
import io
import matplotlib.pyplot as plt
from pyngrok import ngrok
import nest_asyncio
import os
import deepl
import pickle
import numpy as np
from scipy.ndimage import gaussian_filter1d
from scipy.interpolate import make_interp_spline

# FastAPI 앱 설정
app = FastAPI()

# Bounding Box Annotator 설정
BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator(thickness=2)
LABEL_ANNOTATOR = sv.LabelAnnotator(text_thickness=2, text_scale=1, text_color=sv.Color.BLACK)

# YOLO-World 모델 로드
model = YOLOWorld(model_id="yolo_world/l")

@app.middleware("http")
async def add_ngrok_headers(request: Request, call_next):
    response = await call_next(request)
    response.headers["ngrok-skip-browser-warning"] = "any_value"
    return response

@app.post("/detect_objects")
async def detect_objects(request: Request):
    data = await request.json()
    video_url = data["video_url"]
    object_name = data["object_name"]
    print(f"Object Name: {object_name}")
    print(f"Video URL: {video_url}")

    # Extract the video name from the URL (assuming the URL ends with the video name)
    video_name = os.path.basename(video_url)
    video_name_without_ext = os.path.splitext(video_name)[0]

    auth_key = "AUTHORIZATION TOKEN"
    translator = deepl.Translator(auth_key)

    result = translator.translate_text(object_name, target_lang="EN-US")
    classes = [cls.strip() for cls in result.text.split(',')]
    print(classes)
    model.set_classes(classes)  # 클래스 이름 설정

    cap = cv2.VideoCapture(video_url)
    if not cap.isOpened():
        raise HTTPException(status_code=500, detail="Failed to open video URL")

    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_area = frame_width * frame_height
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    process_fps = 5
    frame_interval = int(fps / process_fps)

    # 각 클래스별 바운딩 박스 수를 저장할 딕셔너리 초기화
    bounding_box_counts = {cls: [] for cls in classes}

    # 아웃풋 동영상 경로 설정
    output_dir = os.path.join(os.getcwd(), 'output_asset')
    os.makedirs(output_dir, exist_ok=True)
    TARGET_VIDEO_PATH = os.path.join(output_dir, f'{video_name_without_ext}_output_video.mp4')

    out = cv2.VideoWriter(TARGET_VIDEO_PATH, cv2.VideoWriter_fourcc(*'mp4v'), 20.0, (frame_width, frame_height))

    for frame_idx in tqdm(range(frame_count)):
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_interval == 0:
            results = model.infer(frame, confidence=0.08)
            detections = sv.Detections.from_inference(results).with_nms(threshold=0.1)
            detections = detections[(detections.area / frame_area) < 0.5]

            # 각 클래스별 바운딩 박스 수를 세기
            class_counts = {cls: 0 for cls in classes}
            for det in detections:
                class_name = det[5]['class_name']  # det의 5번째 요소에서 class_name 가져오기
                if class_name in class_counts:
                    class_counts[class_name] += 1
            
            for cls in classes:
                bounding_box_counts[cls].append(class_counts[cls])

            annotated_frame = frame.copy()
            annotated_frame = BOUNDING_BOX_ANNOTATOR.annotate(annotated_frame, detections)
            annotated_frame = LABEL_ANNOTATOR.annotate(annotated_frame, detections)
            out.write(annotated_frame)

    cap.release()
    out.release()

    total_bounding_boxes = sum(sum(counts) for counts in bounding_box_counts.values())

    # 객체 수 저장
    frame_detections = [(i * frame_interval, {cls: bounding_box_counts[cls][i] for cls in classes}) for i in range(len(bounding_box_counts[classes[0]]))]
    with open(f'{video_name_without_ext}_detections.pkl', 'wb') as f:
        pickle.dump(frame_detections, f)

    # 비디오 파일을 Base64로 인코딩
    with open(TARGET_VIDEO_PATH, "rb") as video_file:
        video_base64 = base64.b64encode(video_file.read()).decode('utf-8')

    # 그래프 시각화
    fig, ax = plt.subplots(figsize=(12, 8))
    for cls in classes:
        counts_smoothed = gaussian_filter1d(bounding_box_counts[cls], sigma=2)
        frames_np = np.arange(len(bounding_box_counts[cls]))
        spline = make_interp_spline(frames_np, counts_smoothed, k=3)
        frames_smooth = np.linspace(frames_np.min(), frames_np.max(), 500)
        counts_smooth = spline(frames_smooth)
        ax.plot(frames_smooth, counts_smooth, label=cls, linewidth=3)

    threshold = min(min(gaussian_filter1d(bounding_box_counts[cls], sigma=2)) for cls in classes) + 0.5
    ax.axhline(y=threshold, color='black', linestyle='--')

    # ax.set_title('Object Detection Count per Frame')
    # ax.set_xlabel('Frame Index')
    # ax.set_ylabel('Count')
    ax.legend()
    ax.grid(False)

    # 그래프를 PNG로 저장
    graph_path = os.path.join(output_dir, f'{video_name_without_ext}_result.png')
    plt.savefig(graph_path)

    # 그래프를 Base64로 인코딩
    with open(graph_path, "rb") as image_file:
        graph_base64 = base64.b64encode(image_file.read()).decode('utf-8')

    return {
        "total_bounding_boxes": total_bounding_boxes,
        "graph_data": graph_base64,
        "video_data": video_base64,
        "graph_path": graph_path
    }

# ngrok 설정 및 FastAPI 서버 실행
ngrok.set_auth_token("AUTHORIZATION TOKEN")
nest_asyncio.apply()
ngrok_tunnel = ngrok.connect(8000)
print("Public URL:", ngrok_tunnel.public_url)

import uvicorn

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)