In [None]:
import gradio as gr
import cv2
import torch
import numpy as np
from PIL import Image
import os
import time
import json
from typing import List, Dict, Tuple, Optional, Set
from pathlib import Path
import threading
import random
import warnings
import tempfile
import math

# 모든 FutureWarning를 무시합니다.
warnings.filterwarnings("ignore", category=FutureWarning)

# Grounding DINO imports
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection

# Hugging Face 토큰을 여기에 설정하세요
HUGGINGFACE_TOKEN = " "

# Deep SORT imports
try:
    from deep_sort_realtime.deepsort_tracker import DeepSort
    DEEPSORT_AVAILABLE = True
    print("Deep SORT import 성공")
except ImportError as e:
    print(f"Deep SORT import 오류: {e}")
    print("Deep SORT를 사용하려면 deep-sort-realtime을 설치하세요: pip install deep-sort-realtime")
    DEEPSORT_AVAILABLE = False
except Exception as e:
    print(f"Deep SORT 초기화 오류: {e}")
    DEEPSORT_AVAILABLE = False

# --- 시드 고정 함수 ---
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    os.environ['PYTHONHASHSEED'] = str(seed)

SEED_VALUE = 42
set_seed(SEED_VALUE)
print(f"Random seed set to {SEED_VALUE}")


class GroundingDINO:
    """Grounding DINO를 사용한 제로샷 객체 탐지 클래스"""
    
    def __init__(self, model_id: str = "IDEA-Research/grounding-dino-base", device: str = None):
        print("GroundingDINO 초기화 시작")
        self.model_id = model_id
        
        print(f"PyTorch 버전: {torch.__version__}")
        print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
        
        if device:
            self.device = device
        elif torch.cuda.is_available():
            self.device = "cuda"
        else:
            self.device = "cpu"
        
        print(f"선택된 디바이스: {self.device}")
        
        print(f"Grounding DINO 모델 로딩 중... (디바이스: {self.device})")
        self.processor = AutoProcessor.from_pretrained(model_id, token=HUGGINGFACE_TOKEN)
        self.model = AutoModelForZeroShotObjectDetection.from_pretrained(model_id, token=HUGGINGFACE_TOKEN)
        
        if self.device == "cuda":
            try:
                self.model = self.model.cuda()
                print("모델이 CUDA로 성공적으로 이동되었습니다.")
            except Exception as e:
                print(f"CUDA 이동 실패: {e}")
                self.device = "cpu"
                print("CPU로 대체합니다.")
        else:
            self.model = self.model.to(self.device)
        
        print(f"모델 로딩 완료: {model_id} (디바이스: {self.device})")
    
    def detect_objects(self, 
                      image: np.ndarray, 
                      text_queries: List[str],
                      box_threshold: float = 0.4,
                      text_threshold: float = 0.3) -> Dict:
        """이미지에서 텍스트 쿼리에 해당하는 객체들을 탐지"""
        print("detect_objects 함수 호출됨")
        print(f"입력 텍스트 쿼리: {text_queries}, 박스 임계값: {box_threshold}, 텍스트 임계값: {text_threshold}")
        
        if len(image.shape) == 3 and image.shape[2] == 3:
            pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        else:
            pil_image = Image.fromarray(image)
        
        processed_queries = []
        for query in text_queries:
            query = query.lower().strip()
            if not query.endswith('.'):
                query += '.'
            processed_queries.append(query)
        
        text_prompt = " . ".join(processed_queries)
        print(f"최종 텍스트 프롬프트: '{text_prompt}'")
        
        inputs = self.processor(
            text=text_prompt,
            images=pil_image,
            return_tensors="pt"
        )
        
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        
        with torch.no_grad():
            outputs = self.model(**inputs)
        
        # --- 수정된 부분: box_threshold와 text_threshold 제거 ---
        result = self.processor.post_process_grounded_object_detection(
            outputs,
            target_sizes=[pil_image.size[::-1]]
        )[0]
        # ----------------------------------------------------
        
        final_results = self._process_results(result, processed_queries, image.shape[:2])
        print(f"Grounding DINO 탐지 결과 (개수: {len(final_results['boxes'])})")
        return final_results
    
    def _process_results(self, result, queries: List[str], image_size: Tuple[int, int]) -> Dict:
        """탐지 결과를 후처리하여 중복 제거 및 정리"""
        print("_process_results 호출됨")
        boxes = result["boxes"].cpu().numpy()
        scores = result["scores"].cpu().numpy()
        
        if "text_labels" in result:
            text_labels = result["text_labels"]
            labels = [lbl.rstrip('.') for lbl in text_labels]
            ids = [queries.index(lbl + '.') if lbl + '.' in queries else -1 for lbl in labels]
        else:
            raw = result["labels"]
            ids = raw.cpu().numpy().astype(int).tolist() if not isinstance(raw, list) else list(map(int, raw))
            labels = [queries[i].rstrip('.') if 0 <= i < len(queries) else '' for i in ids]
        
        final = {"boxes": [], "scores": [], "labels": [], "label_ids": []}
        
        def iou(a, b):
            x1, y1, x2, y2 = a
            x1b, y1b, x2b, y2b = b
            xi1, yi1 = max(x1, x1b), max(y1, y1b)
            xi2, yi2 = min(x2, x2b), min(y2, y2b)
            if xi2 <= xi1 or yi2 <= yi1:
                return 0
            inter = (xi2 - xi1) * (yi2 - yi1)
            union = (x2 - x1) * (y2 - y1) + (x2b - x1b) * (y2b - y1b) - inter
            return inter / union
        
        for box, score, label, lid in zip(boxes, scores, labels, ids):
            keep = True
            for j, box2 in enumerate(final["boxes"]):
                if iou(box, box2) > 0.7:
                    keep = False
                    if score > final["scores"][j]:
                        final["boxes"][j] = box.tolist()
                        final["scores"][j] = float(score)
                        final["labels"][j] = label
                        final["label_ids"][j] = lid
                    break
            if keep:
                final["boxes"].append(box.tolist())
                final["scores"].append(float(score))
                final["labels"].append(label)
                final["label_ids"].append(lid)
        
        print(f"후처리 결과 (중복 제거 후 개수: {len(final['boxes'])})")
        return final


class DeepSortWrapper:
    """Deep SORT 래퍼 클래스"""
    
    def __init__(self, max_age: int = 30, n_init: int = 3, nms_max_overlap: float = 1.0):
        print("DeepSortWrapper 초기화 시작")
        if not DEEPSORT_AVAILABLE:
            raise ImportError("Deep SORT를 사용할 수 없습니다. deep-sort-realtime을 설치하세요.")
        
        self.tracker = DeepSort(
            max_age=max_age,
            n_init=n_init,
            nms_max_overlap=nms_max_overlap
        )
        print("DeepSortWrapper 초기화 완료")
    
    def update(self, detections: Dict, frame: np.ndarray) -> List[Dict]:
        """Deep SORT 업데이트"""
        print("DeepSort update 호출됨")
        if not detections['boxes']:
            print("탐지된 객체가 없어 Deep SORT 업데이트를 건너뜁니다.")
            return []
        
        try:
            raw_detections = []
            for box, score, label in zip(detections['boxes'], detections['scores'], detections['labels']):
                x1, y1, x2, y2 = box
                width = x2 - x1
                height = y2 - y1
                raw_detections.append(([x1, y1, width, height], score, label))
            
            tracks = self.tracker.update_tracks(raw_detections, frame=frame)
            
            results = []
            for track in tracks:
                if not track.is_confirmed():
                    continue
                
                try:
                    bbox = track.to_tlbr()
                    track_id = track.track_id
                    score = track.get_det_conf() if track.get_det_conf() is not None else 0.8
                    label = track.get_det_class() if track.get_det_class() is not None else 'unknown'
                    
                    if all(isinstance(x, (int, float)) for x in bbox) and len(bbox) == 4:
                        results.append({
                            'track_id': track_id,
                            'bbox': [float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])],
                            'score': float(score),
                            'label': label
                        })
                except Exception as track_error:
                    print(f"개별 트랙 처리 오류: {track_error}")
                    continue
            
            print(f"Deep SORT 업데이트 완료 (추적 중인 객체 수: {len(results)})")
            return results
        except Exception as e:
            print(f"Deep SORT update 오류: {e}")
            return []


class TrackingApp:
    def __init__(self):
        print("TrackingApp 초기화 시작")
        self.dino = GroundingDINO()
        self.is_running = False
        self.stop_event = threading.Event()
        self.video_path = ''
        self.text_queries = ''
        self.box_threshold = 0.4
        self.text_threshold = 0.3
        self.fps = 25
        self.max_age = 30
        self.deepsort = None
        self.json_results = []
        self.cap = None
        print("TrackingApp 초기화 완료")

    def process_and_generate(self, video_input, text_queries, box_threshold, text_threshold, fps, max_age):
        """추적 프로세스를 시작하고 프레임을 생성하는 함수"""
        print("process_and_generate 함수 호출됨")
        if self.is_running:
            return "이미 실행 중입니다. 중지 후 다시 시작해주세요.", "", None, None
        
        if not video_input:
            return "비디오 파일을 업로드해주세요.", "", None, None
            
        self.video_path = video_input.name
        self.text_queries = text_queries
        self.box_threshold = box_threshold
        self.text_threshold = text_threshold
        self.fps = int(fps)
        self.max_age = int(max_age)

        self.deepsort = DeepSortWrapper(max_age=self.max_age)
        
        print(f"입력 비디오 경로: {self.video_path}")
        print(f"추적 쿼리: {self.text_queries}")
        print(f"설정된 FPS: {self.fps}")
        print(f"설정된 max_age: {self.max_age}")

        self.is_running = True
        self.stop_event.clear()
        self.json_results = []
        
        self.cap = cv2.VideoCapture(self.video_path)
        if not self.cap.isOpened():
            print("비디오 소스를 열 수 없습니다.")
            self.is_running = False
            return "비디오를 열 수 없습니다.", "", np.zeros((480, 640, 3), dtype=np.uint8), None

        original_fps = self.cap.get(cv2.CAP_PROP_FPS)
        
        if self.fps >= original_fps:
            frame_skip_rate = 1
        else:
            frame_skip_rate = max(1, int(round(original_fps / self.fps)))
        
        print(f"원본 비디오 FPS: {original_fps:.2f}")
        print(f"추적 FPS({self.fps})를 위해 {frame_skip_rate} 프레임마다 한 번씩 처리합니다. (실제 FPS: {original_fps / frame_skip_rate:.2f})")

        frame_idx = 0
        
        # 새로운 인덱스 카운터 변수 추가
        processed_frame_idx = 0
        
        while not self.stop_event.is_set():
            ret, frame = self.cap.read()
            if not ret:
                print("비디오 파일의 끝에 도달했습니다. 추적을 중지합니다.")
                self.is_running = False
                break
            
            if self.stop_event.is_set():
                break

            if frame_idx % frame_skip_rate == 0:
                print(f"🎥 프레임 {frame_idx} 처리 중...")
                
                detections = self.dino.detect_objects(
                    image=frame,
                    text_queries=self.text_queries.split(','),
                    box_threshold=self.box_threshold,
                    text_threshold=self.text_threshold
                )
                tracked_objects = self.deepsort.update(detections, frame)

                self.json_results.append({
                    # 원본 프레임 인덱스 대신 새 인덱스 저장
                    "frame": processed_frame_idx,
                    "timestamp": self.cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0,
                    "objects": tracked_objects
                })
                
                processed_frame = self._draw_results(frame.copy(), tracked_objects)
                
                yield "추적 중...", "", processed_frame, None

                # 처리된 프레임 인덱스 증가
                processed_frame_idx += 1
            
            frame_idx += 1
            
        self.cap.release()
        print("백그라운드 추적 루프가 종료되었습니다.")
        
        json_output = json.dumps(self.json_results, indent=2)
        
        with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", encoding="utf-8") as tmp_file:
            tmp_file.write(json_output)
            json_file_path = tmp_file.name
        
        self.json_results = []
        
        yield "추적 완료.", json_output, processed_frame, json_file_path


    def stop_process(self):
        """추적 프로세스를 중지하고 JSON 결과를 반환하는 함수"""
        print("stop_process 함수 호출됨")
        if self.is_running:
            self.stop_event.set()
            print("중단 신호 발생. 현재 프레임 처리 후 중지됩니다.")
            return "추적 중단 신호가 전송되었습니다. 잠시만 기다려주세요.", "", None
        
        print("이미 중지되었습니다.")
        return "이미 중지되었습니다.", "", None

    def _draw_results(self, image, tracked_objects):
        """이미지에 탐지 및 추적 결과 그리기"""
        for obj in tracked_objects:
            try:
                x1, y1, x2, y2 = map(int, obj['bbox'])
                track_id = obj['track_id']
                label = obj['label']
                score = obj['score']

                color = self._get_color_for_id(track_id)
                cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
                
                text = f"ID:{track_id} {label} ({score:.2f})"
                cv2.putText(
                    image, text, (x1, y1 - 10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2
                )
            except Exception as e:
                print(f"시각화 오류: {e}, 객체 데이터: {obj}")
                continue
        return image

    def _get_color_for_id(self, track_id):
        """트랙 ID에 따라 고유한 색상 생성"""
        np.random.seed(int(track_id) * 12345)
        color = np.random.randint(0, 255, size=3, dtype="uint8").tolist()
        return tuple(map(int, color))


# 전역 인스턴스 생성
app = TrackingApp()

# Gradio UI 정의
with gr.Blocks(title="제로샷 객체 추적", enable_queue=True) as demo:
    gr.Markdown("## Grounding DINO + Deep SORT를 이용한 제로샷 객체 추적")
    gr.Markdown("동영상 파일을 업로드하고 텍스트 기반 객체 추적을 수행합니다.")
    
    with gr.Row():
        with gr.Column(scale=2):
            video_input = gr.File(
                label="동영상 파일 선택", 
                file_types=[".mp4", ".mov", ".avi"]
            )
            text_queries = gr.Textbox(
                label="탐지할 객체 쿼리 (쉼표로 구분)", 
                value="person, dog, car",
                placeholder="예: person, car, phone"
            )
            with gr.Row():
                box_threshold = gr.Slider(
                    0, 1, value=0.4, step=0.05, 
                    label="박스 임계값 (Box Threshold)"
                )
                text_threshold = gr.Slider(
                    0, 1, value=0.3, step=0.05, 
                    label="텍스트 임계값 (Text Threshold)"
                )
            with gr.Row():
                fps_slider = gr.Slider(
                    1, 60, value=30, step=1,
                    label="추적 FPS 설정 (Frame Per Second)"
                )
                max_age_slider = gr.Slider(
                    1, 100, value=30, step=1,
                    label="최대 추적 유지 프레임 수 (Max Age)"
                )
            
            with gr.Row():
                start_btn = gr.Button("추적 시작", variant="primary")
                stop_btn = gr.Button("추적 중지", variant="secondary")
            
            status_output = gr.Textbox(label="상태 메시지", interactive=False)
            
        with gr.Column(scale=3):
            output_video = gr.Image(
                label="추적 결과", 
                type="pil", 
                height=480
            )
            json_output = gr.Textbox(
                label="JSON 추적 결과",
                lines=10,
                interactive=False
            )
            json_file_output = gr.File(
                label="JSON 결과 다운로드",
                file_count="single",
                interactive=False
            )
    
    start_btn.click(
        app.process_and_generate,
        inputs=[video_input, text_queries, box_threshold, text_threshold, fps_slider, max_age_slider],
        outputs=[status_output, json_output, output_video, json_file_output]
    )
    
    stop_btn.click(
        app.stop_process,
        inputs=None,
        outputs=[status_output, json_output, json_file_output]
    )


if __name__ == "__main__":
    demo.queue().launch(server_name="129.254.81.86")

  from .autonotebook import tqdm as notebook_tqdm


Deep SORT import 성공
Random seed set to 42
TrackingApp 초기화 시작
GroundingDINO 초기화 시작
PyTorch 버전: 2.6.0+cu118
CUDA 사용 가능: True
선택된 디바이스: cuda
Grounding DINO 모델 로딩 중... (디바이스: cuda)


  with gr.Blocks(title="제로샷 객체 추적", enable_queue=True) as demo:


모델이 CUDA로 성공적으로 이동되었습니다.
모델 로딩 완료: IDEA-Research/grounding-dino-base (디바이스: cuda)
TrackingApp 초기화 완료
Running on local URL:  http://129.254.81.86:7860

To create a public link, set `share=True` in `launch()`.


IMPORTANT: You are using gradio version 3.50.0, however version 4.44.1 is available, please upgrade.
--------
process_and_generate 함수 호출됨
DeepSortWrapper 초기화 시작


  import pkg_resources


DeepSortWrapper 초기화 완료
입력 비디오 경로: /tmp/gradio/2deeef22d643d6ecd2b5b7f2cef60866335ccea8/건물진입.mp4
추적 쿼리: soldier, building, car, roadblock, tree
설정된 FPS: 30
설정된 max_age: 30
원본 비디오 FPS: 30.00
추적 FPS(30)를 위해 1 프레임마다 한 번씩 처리합니다. (실제 FPS: 30.00)
🎥 프레임 0 처리 중...
detect_objects 함수 호출됨
입력 텍스트 쿼리: ['soldier', ' building', ' car', ' roadblock', ' tree'], 박스 임계값: 0.4, 텍스트 임계값: 0.3
최종 텍스트 프롬프트: 'soldier. . building. . car. . roadblock. . tree.'
_process_results 호출됨
후처리 결과 (중복 제거 후 개수: 12)
Grounding DINO 탐지 결과 (개수: 12)
DeepSort update 호출됨
Deep SORT 업데이트 완료 (추적 중인 객체 수: 0)
🎥 프레임 1 처리 중...
detect_objects 함수 호출됨
입력 텍스트 쿼리: ['soldier', ' building', ' car', ' roadblock', ' tree'], 박스 임계값: 0.4, 텍스트 임계값: 0.3
최종 텍스트 프롬프트: 'soldier. . building. . car. . roadblock. . tree.'
_process_results 호출됨
후처리 결과 (중복 제거 후 개수: 12)
Grounding DINO 탐지 결과 (개수: 12)
DeepSort update 호출됨
Deep SORT 업데이트 완료 (추적 중인 객체 수: 0)
🎥 프레임 2 처리 중...
detect_objects 함수 호출됨
입력 텍스트 쿼리: ['soldier', ' building', ' car', ' roadblock', ' tree'], 박스 임