## Grounding DINO

In [3]:
# 환경 설정 및 라이브러리 import 
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '2'  # GPU 설정

import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from pathlib import Path
import json
from tqdm import tqdm
from typing import List, Dict, Tuple, Optional

# Grounding DINO 관련
try:
    from groundingdino.util.inference import load_model, load_image, predict, annotate
    from groundingdino.util import box_ops
    import groundingdino.datasets.transforms as T
    GROUNDING_DINO_AVAILABLE = True
except ImportError:
    print("Grounding DINO가 설치되지 않았습니다.")
    print("설치: pip install groundingdino-py")
    GROUNDING_DINO_AVAILABLE = False

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

  from .autonotebook import tqdm as notebook_tqdm


PyTorch version: 2.5.1+cu118
CUDA available: True
GPU: NVIDIA RTX A6000


In [4]:
# 프로젝트 루트 탐색
def find_project_root(marker_filename=".project-root"):
    current_dir = os.path.abspath(os.getcwd())
    while True:
        if os.path.isfile(os.path.join(current_dir, marker_filename)):
            return current_dir
        parent_dir = os.path.dirname(current_dir)
        if parent_dir == current_dir:
            raise FileNotFoundError(f"Could not find {marker_filename}")
        current_dir = parent_dir

def ensure_dir(path):
    os.makedirs(path, exist_ok=True)

PROJECT_ROOT = find_project_root()
def get_project_path(*paths):
    return os.path.join(PROJECT_ROOT, *paths)

# 데이터 경로
coco_path = get_project_path("data", "TomatOD_COCO_3")
test_path = os.path.join(coco_path, "test")
test_images_dir = os.path.join(test_path, "images")
test_ann_file = os.path.join(test_path, "custom_test.json")

# 출력 경로
output_dir = get_project_path("notebook", "results", "grounding_dino")
ensure_dir(output_dir)
ensure_dir(os.path.join(output_dir, "predictions"))
ensure_dir(os.path.join(output_dir, "visualizations"))

print(f"Project root: {PROJECT_ROOT}")
print(f"Test images: {test_images_dir}")
print(f"Output dir: {output_dir}")

Project root: /home/hyeonjin/tomato-detection-agentic
Test images: /home/hyeonjin/tomato-detection-agentic/data/TomatOD_COCO_3/test/images
Output dir: /home/hyeonjin/tomato-detection-agentic/notebook/results/grounding_dino


In [8]:
# 모델 설정
PROJECT_ROOT = find_project_root()
CONFIG_PATH = get_project_path("configs", "gdino", "gdino_3class.yaml")
WEIGHTS_PATH = get_project_path("weights", "groundingdino_swint_ogc.pth")
py_config_path = get_project_path("configs", "gdino", "gdino_3class.py")

# YAML을 읽어서 Python 파일로 변환
# 파일을 그대로 읽어서 Python 파일로 저장
with open(CONFIG_PATH, 'r') as f:
    config_content = f.read()

# Python config 파일로 저장
with open(py_config_path, 'w') as f:
    f.write(config_content)

CONFIG_PATH = py_config_path
WEIGHTS_PATH = get_project_path("weights", "groundingdino_swint_ogc.pth")

print(f"Config path: {CONFIG_PATH}")
print(f"Weights path: {WEIGHTS_PATH}")


# 모델 로드
if GROUNDING_DINO_AVAILABLE:
    try:
        model = load_model(CONFIG_PATH, WEIGHTS_PATH, device='cuda')
        model.eval()
        print("✅ Grounding DINO 모델 로드 완료")
    except Exception as e:
        print(f"❌ 모델 로드 실패: {e}")
        print("수동으로 모델을 다운로드하거나 경로를 확인하세요.")
        model = None
else:
    model = None

Config path: /home/hyeonjin/tomato-detection-agentic/configs/gdino/gdino_3class.py
Weights path: /home/hyeonjin/tomato-detection-agentic/weights/groundingdino_swint_ogc.pth
❌ 모델 로드 실패: There are syntax errors in config file /home/hyeonjin/tomato-detection-agentic/configs/gdino/gdino_3class.py
수동으로 모델을 다운로드하거나 경로를 확인하세요.


In [9]:
# 프롬프트 엔지니어링: 클래스별 개별 추론

# 클래스 정의
CLASS_NAMES = ['fully-ripe', 'semi-ripe', 'unripe']
CLASS_COLORS = ['#D0021B', '#F8E71C', '#7ED321']  # 빨강, 노랑, 초록

def predict_with_grounding_dino_per_class(
    model,
    image_path: str,
    class_name: str,
    box_threshold: float = 0.3,
    text_threshold: float = 0.25
) -> Tuple[np.ndarray, np.ndarray, int]:
    """
    단일 클래스에 대해 Grounding DINO로 예측 수행
    
    Returns:
        boxes: (N, 4) numpy array [x1, y1, x2, y2] (정규화된 좌표)
        scores: (N,) numpy array
        label: 클래스 인덱스
    """
    # 단일 클래스 프롬프트
    text_prompt = f"{class_name} tomato."
    
    # 이미지 로드 및 전처리
    image_source, image = load_image(image_path)
    
    # 예측 수행
    boxes, logits, phrases = predict(
        model=model,
        image=image,
        caption=text_prompt,
        box_threshold=box_threshold,
        text_threshold=text_threshold
    )
    
    # 클래스 인덱스 찾기
    class_idx = CLASS_NAMES.index(class_name)
    
    # 모든 박스에 동일한 라벨 할당
    labels = np.full(len(boxes), class_idx)
    
    return boxes, logits, labels

# 배치 추론 실행 (클래스별 개별 추론)
all_results = []

BOX_THRESHOLD = 0.3
TEXT_THRESHOLD = 0.25
MIN_BOX_AREA = 100
MAX_BOX_AREA_RATIO = 0.8

if model is not None:
    for img_id, img_path, file_name in tqdm(test_image_files, desc="추론 중"):
        try:
            # 이미지 정보 가져오기
            img_info = image_id_to_info[img_id]
            img_width = img_info['width']
            img_height = img_info['height']
            img_area = img_width * img_height
            
            # 모든 클래스에 대해 개별 추론
            all_boxes = []
            all_scores = []
            all_labels = []
            
            for class_idx, class_name in enumerate(CLASS_NAMES):
                # 각 클래스별로 개별 추론
                boxes, scores, labels = predict_with_grounding_dino_per_class(
                    model=model,
                    image_path=img_path,
                    class_name=class_name,
                    box_threshold=BOX_THRESHOLD,
                    text_threshold=TEXT_THRESHOLD
                )
                
                # 박스를 COCO 형식으로 변환 및 필터링
                for i in range(len(boxes)):
                    x1, y1, x2, y2 = boxes[i]
                    
                    # 정규화된 좌표를 픽셀 좌표로 변환
                    x1 = int(x1 * img_width)
                    y1 = int(y1 * img_height)
                    x2 = int(x2 * img_width)
                    y2 = int(y2 * img_height)
                    
                    # 박스 크기 계산
                    w = x2 - x1
                    h = y2 - y1
                    box_area = w * h
                    
                    # 필터링
                    if box_area < MIN_BOX_AREA:
                        continue
                    if box_area > img_area * MAX_BOX_AREA_RATIO:
                        continue
                    
                    # 이미지 범위 내인지 확인 및 클리핑
                    if x1 < 0 or y1 < 0 or x2 > img_width or y2 > img_height:
                        x1 = max(0, min(x1, img_width))
                        y1 = max(0, min(y1, img_height))
                        x2 = max(0, min(x2, img_width))
                        y2 = max(0, min(y2, img_height))
                        w = x2 - x1
                        h = y2 - y1
                        if w * h < MIN_BOX_AREA:
                            continue
                    
                    # COCO 형식: [x, y, width, height]
                    coco_box = [x1, y1, w, h]
                    all_boxes.append(coco_box)
                    all_scores.append(float(scores[i]))
                    all_labels.append(int(labels[i]))
            
            # 결과 저장
            result = {
                'image_id': img_id,
                'file_name': file_name,
                'boxes': all_boxes,
                'scores': all_scores,
                'labels': all_labels
            }
            all_results.append(result)
            
        except Exception as e:
            print(f"❌ {file_name} 처리 실패: {e}")
            continue
    
    print(f"✅ 추론 완료: {len(all_results)}개 이미지")
else:
    print("❌ 모델이 로드되지 않았습니다.")

❌ 모델이 로드되지 않았습니다.


In [None]:
# 추론 함수 정의

def predict_with_grounding_dino(
    model,
    image_path: str,
    text_prompt: str,
    box_threshold: float = 0.3,
    text_threshold: float = 0.25
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Grounding DINO로 예측 수행
    
    Returns:
        boxes: (N, 4) numpy array [x1, y1, x2, y2]
        scores: (N,) numpy array
        labels: (N,) numpy array (class indices)
    """
    # 이미지 로드 및 전처리
    image_source, image = load_image(image_path)
    
    # 예측 수행
    boxes, logits, phrases = predict(
        model=model,
        image=image,
        caption=text_prompt,
        box_threshold=box_threshold,
        text_threshold=text_threshold
    )
    
    # phrases를 클래스 인덱스로 변환
    labels = []
    for phrase in phrases:
        # phrase에서 클래스 이름 추출 (예: "fully-ripe tomato" -> "fully-ripe")
        phrase_lower = phrase.lower()
        class_idx = -1
        for idx, class_name in enumerate(CLASS_NAMES):
            if class_name in phrase_lower:
                class_idx = idx
                break
        if class_idx == -1:
            class_idx = 0  # 기본값
        labels.append(class_idx)
    
    labels = np.array(labels)
    
    return boxes, logits, labels

# 테스트 이미지 경로 수집
test_image_files = []
for img_id, file_name in image_id_to_file.items():
    img_path = os.path.join(test_images_dir, file_name)
    if os.path.exists(img_path):
        test_image_files.append((img_id, img_path, file_name))

print(f"테스트 이미지 수: {len(test_image_files)}")

In [None]:
# 배치 추론 실행 (박스 필터링 강화)

# 추론 결과 저장
predictions = []
all_results = []

BOX_THRESHOLD = 0.3
TEXT_THRESHOLD = 0.25
MIN_BOX_AREA = 100  # 최소 박스 면적 (너무 작은 박스 제거)
MAX_BOX_AREA_RATIO = 0.8  # 최대 박스 면적 비율 (이미지의 80% 이상은 제거)

if model is not None:
    for img_id, img_path, file_name in tqdm(test_image_files, desc="추론 중"):
        try:
            # 예측 수행
            boxes, scores, labels = predict_with_grounding_dino(
                model=model,
                image_path=img_path,
                text_prompt=TEXT_PROMPT,
                box_threshold=BOX_THRESHOLD,
                text_threshold=TEXT_THRESHOLD
            )
            
            # 이미지 정보 가져오기
            img_info = image_id_to_info[img_id]
            img_width = img_info['width']
            img_height = img_info['height']
            img_area = img_width * img_height
            
            # 박스를 COCO 형식으로 변환 및 필터링
            coco_boxes = []
            coco_scores = []
            coco_labels = []
            
            for i in range(len(boxes)):
                x1, y1, x2, y2 = boxes[i]
                
                # 정규화된 좌표를 픽셀 좌표로 변환
                x1 = int(x1 * img_width)
                y1 = int(y1 * img_height)
                x2 = int(x2 * img_width)
                y2 = int(y2 * img_height)
                
                # 박스 크기 계산
                w = x2 - x1
                h = y2 - y1
                box_area = w * h
                
                # 필터링: 너무 작거나 너무 큰 박스 제거
                if box_area < MIN_BOX_AREA:
                    continue
                if box_area > img_area * MAX_BOX_AREA_RATIO:
                    continue
                
                # 이미지 범위 내인지 확인
                if x1 < 0 or y1 < 0 or x2 > img_width or y2 > img_height:
                    # 클리핑
                    x1 = max(0, min(x1, img_width))
                    y1 = max(0, min(y1, img_height))
                    x2 = max(0, min(x2, img_width))
                    y2 = max(0, min(y2, img_height))
                    w = x2 - x1
                    h = y2 - y1
                    
                    # 클리핑 후에도 너무 작으면 스킵
                    if w * h < MIN_BOX_AREA:
                        continue
                
                # COCO 형식: [x, y, width, height]
                coco_box = [x1, y1, w, h]
                coco_boxes.append(coco_box)
                coco_scores.append(float(scores[i]))
                coco_labels.append(int(labels[i]))
            
            # 결과 저장
            result = {
                'image_id': img_id,
                'file_name': file_name,
                'boxes': coco_boxes,
                'scores': coco_scores,
                'labels': coco_labels
            }
            all_results.append(result)
            
        except Exception as e:
            print(f"❌ {file_name} 처리 실패: {e}")
            continue
    
    print(f"✅ 추론 완료: {len(all_results)}개 이미지")
else:
    print("❌ 모델이 로드되지 않았습니다.")

In [None]:
# 시각화 (visualization.py 방식으로 변경)

from PIL import ImageDraw, ImageFont

# visualization.py의 COLORS 상수 사용 (RGB 0-255 범위)
CLASS_COLORS_RGB = {
    0: (255, 0, 0),        # fully-ripe (red)
    1: (255, 165, 0),      # semi-ripe (orange) 
    2: (0, 128, 0),        # unripe (green)
}

def visualize_predictions(
    image_path: str,
    predictions: Dict,
    ground_truth: Optional[List[Dict]] = None,
    save_path: Optional[str] = None,
    confidence_threshold: float = 0.3
):
    """예측 결과 시각화 (visualization.py 방식)"""
    # 이미지 로드
    img = Image.open(image_path).convert('RGB')
    img_width, img_height = img.size
    
    # PIL ImageDraw 사용
    draw = ImageDraw.Draw(img)
    
    # 폰트 설정
    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16)
    except:
        font = ImageFont.load_default()
    
    # 예측 박스 그리기 (confidence threshold 적용)
    for box, score, label in zip(
        predictions['boxes'],
        predictions['scores'],
        predictions['labels']
    ):
        # Confidence threshold 적용
        if score < confidence_threshold:
            continue
            
        x, y, w, h = box
        
        # 박스 좌표 검증 및 정규화
        # w, h가 음수이거나 0인 경우 스킵
        if w <= 0 or h <= 0:
            continue
        
        # 좌표를 정수로 변환
        x, y, w, h = int(x), int(y), int(w), int(h)
        
        # 이미지 범위 내로 클리핑
        x1 = max(0, min(x, img_width))
        y1 = max(0, min(y, img_height))
        x2 = max(0, min(x + w, img_width))
        y2 = max(0, min(y + h, img_height))
        
        # 클리핑 후 유효한 박스인지 확인
        if x2 <= x1 or y2 <= y1:
            continue
        
        # 박스가 너무 작은 경우 스킵
        box_area = (x2 - x1) * (y2 - y1)
        img_area = img_width * img_height
        if box_area < 100:  # 최소 면적
            continue
        if box_area > img_area * 0.8:  # 너무 큰 박스 제거
            continue
        
        # 색상 가져오기
        color_rgb = CLASS_COLORS_RGB.get(label, (255, 255, 255))
        
        # 박스 그리기 (visualization.py처럼 width=8)
        draw.rectangle([x1, y1, x2, y2], outline=color_rgb, width=8)
        
        # 라벨 텍스트
        class_name = CLASS_NAMES[label]
        text = f"{class_name}: {score:.2f}"
        
        # 텍스트 배경 그리기
        bbox = draw.textbbox((x1, y1 - 20), text, font=font)
        draw.rectangle(bbox, fill=color_rgb)
        draw.text((x1, y1 - 20), text, fill="white", font=font)
    
    # GT 박스 그리기 (있는 경우) - 파란색 점선
    if ground_truth:
        for ann in ground_truth:
            x, y, w, h = ann['bbox']
            
            # GT 박스 좌표 검증 및 정규화
            if w <= 0 or h <= 0:
                continue
            
            x, y, w, h = int(x), int(y), int(w), int(h)
            
            # 이미지 범위 내로 클리핑
            x1 = max(0, min(x, img_width))
            y1 = max(0, min(y, img_height))
            x2 = max(0, min(x + w, img_width))
            y2 = max(0, min(y + h, img_height))
            
            # 유효한 박스인지 확인
            if x2 <= x1 or y2 <= y1:
                continue
            
            # GT는 파란색으로 표시
            draw.rectangle([x1, y1, x2, y2], outline="blue", width=4)
            
            category_id = ann['category_id']
            category_name = category_id_to_name[category_id]
            text = f"GT: {category_name}"
            
            bbox = draw.textbbox((x1, y1 - 20), text, font=font)
            draw.rectangle(bbox, fill="blue")
            draw.text((x1, y1 - 20), text, fill="white", font=font)
    
    # 저장 또는 표시
    if save_path:
        img.save(save_path, quality=95, optimize=False)
    else:
        img.show()

# 몇 개 이미지 시각화
if model is not None and len(all_results) > 0:
    num_visualize = min(5, len(all_results))
    confidence_threshold = 0.3  # 임계값 설정
    
    for i in range(num_visualize):
        result = all_results[i]
        img_id = result['image_id']
        img_path = os.path.join(test_images_dir, result['file_name'])
        
        # GT 어노테이션 가져오기
        gt_anns = annotations_by_image.get(img_id, [])
        
        # 예측 결과 준비
        pred_dict = {
            'boxes': result['boxes'],
            'scores': result['scores'],
            'labels': result['labels']
        }
        
        # 시각화 저장
        save_path = os.path.join(
            output_dir, "visualizations",
            f"pred_{result['file_name']}"
        )
        visualize_predictions(
            img_path, 
            pred_dict, 
            gt_anns, 
            save_path,
            confidence_threshold=confidence_threshold
        )
    
    print(f"✅ {num_visualize}개 이미지 시각화 완료")

In [25]:
# 시각화 (visualization.py 방식으로 변경)

from PIL import ImageDraw, ImageFont

# visualization.py의 COLORS 상수 사용 (RGB 0-255 범위)
CLASS_COLORS_RGB = {
    0: (255, 0, 0),        # fully-ripe (red) - 빨간색
    1: (255, 165, 0),      # semi-ripe (orange) - 오렌지색
    2: (0, 128, 0),        # unripe (green) - 초록색
}

def visualize_predictions(
    image_path: str,
    predictions: Dict,
    save_path: Optional[str] = None,
    confidence_threshold: float = 0.2
):
    """예측 결과 시각화 (모델 예측만 표시)"""
    # 이미지 로드
    img = Image.open(image_path).convert('RGB')
    img_width, img_height = img.size
    
    # PIL ImageDraw 사용
    draw = ImageDraw.Draw(img)
    
    # 폰트 설정
    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16)
    except:
        font = ImageFont.load_default()
    
    # 예측 박스 그리기 (confidence threshold만 적용)
    for box, score, label in zip(
        predictions['boxes'],
        predictions['scores'],
        predictions['labels']
    ):
        # Confidence threshold 적용
        if score < confidence_threshold:
            continue
            
        x, y, w, h = box
        
        # 박스 좌표 정규화 (w, h가 음수인 경우 처리)
        if w < 0:
            x = x + w
            w = abs(w)
        if h < 0:
            y = y + h
            h = abs(h)
        
        # 좌표를 정수로 변환
        x, y, w, h = int(x), int(y), int(w), int(h)
        
        # 이미지 범위 내로 클리핑
        x1 = max(0, min(x, img_width))
        y1 = max(0, min(y, img_height))
        x2 = max(0, min(x + w, img_width))
        y2 = max(0, min(y + h, img_height))
        
        # 클리핑 후 유효한 박스인지 확인
        if x2 <= x1 or y2 <= y1:
            continue
        
        # 색상 가져오기 (클래스별 색상)
        # label=0: fully-ripe (빨간색)
        # label=1: semi-ripe (오렌지색)
        # label=2: unripe (초록색)
        color_rgb = CLASS_COLORS_RGB.get(label, (255, 255, 255))
        
        # 박스 그리기 (visualization.py처럼 width=8)
        draw.rectangle([x1, y1, x2, y2], outline=color_rgb, width=8)
        
        # 라벨 텍스트
        class_name = CLASS_NAMES[label]
        text = f"{class_name}: {score:.2f}"
        
        # 텍스트 배경 그리기
        bbox = draw.textbbox((x1, y1 - 20), text, font=font)
        draw.rectangle(bbox, fill=color_rgb)
        draw.text((x1, y1 - 20), text, fill="white", font=font)
    
    # 저장 또는 표시
    if save_path:
        img.save(save_path, quality=95, optimize=False)
    else:
        img.show()

# 몇 개 이미지 시각화
if model is not None and len(all_results) > 0:
    num_visualize = min(5, len(all_results))
    confidence_threshold = 0.2  # 임계값 설정
    
    for i in range(num_visualize):
        result = all_results[i]
        img_id = result['image_id']
        img_path = os.path.join(test_images_dir, result['file_name'])
        
        # 예측 결과 준비 (모델이 예측한 결과만 사용)
        pred_dict = {
            'boxes': result['boxes'],      # 모델 예측 박스
            'scores': result['scores'],     # 모델 예측 점수
            'labels': result['labels']      # 모델 예측 라벨
        }
        
        # 시각화 저장
        save_path = os.path.join(
            output_dir, "visualizations",
            f"pred_{result['file_name']}"
        )
        visualize_predictions(
            img_path, 
            pred_dict, 
            save_path,
            confidence_threshold=confidence_threshold
        )
    
    print(f"✅ {num_visualize}개 이미지 시각화 완료")

✅ 5개 이미지 시각화 완료


In [None]:
# COCO 메트릭 계산

from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

# 예측 결과를 COCO 형식으로 변환
coco_predictions = []
for result in all_results:
    img_id = result['image_id']
    for box, score, label in zip(
        result['boxes'],
        result['scores'],
        result['labels']
    ):
        # COCO 카테고리 ID로 변환
        class_name = CLASS_NAMES[label]
        category_id = category_name_to_id[class_name]
        
        coco_pred = {
            'image_id': img_id,
            'category_id': category_id,
            'bbox': box,
            'score': score
        }
        coco_predictions.append(coco_pred)

# COCO 평가 수행
if len(coco_predictions) > 0:
    # 예측 결과를 JSON 파일로 저장
    pred_file = os.path.join(output_dir, "predictions", "predictions.json")
    with open(pred_file, 'w') as f:
        json.dump(coco_predictions, f)
    
    # COCO annotation 파일에 'info' 키가 없으면 추가
    with open(test_ann_file, 'r') as f:
        coco_gt_data = json.load(f)
    
    # 'info' 키가 없으면 추가
    if 'info' not in coco_gt_data:
        coco_gt_data['info'] = {
            'description': 'Tomato Detection Dataset',
            'version': '1.0',
            'year': 2024
        }
        # 임시 파일로 저장
        temp_ann_file = os.path.join(output_dir, "temp_annotations.json")
        with open(temp_ann_file, 'w') as f:
            json.dump(coco_gt_data, f)
        test_ann_file = temp_ann_file
    
    # COCO 객체 생성
    coco_gt = COCO(test_ann_file)
    coco_dt = coco_gt.loadRes(pred_file)
    
    # 평가 실행
    coco_eval = COCOeval(coco_gt, coco_dt, 'bbox')
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()
    
    # 결과 저장
    metrics = {
        'mAP': float(coco_eval.stats[0]),
        'mAP50': float(coco_eval.stats[1]),
        'mAP75': float(coco_eval.stats[2]),
        'mAP_small': float(coco_eval.stats[3]),
        'mAP_medium': float(coco_eval.stats[4]),
        'mAP_large': float(coco_eval.stats[5]),
        'mAR_1': float(coco_eval.stats[6]),
        'mAR_10': float(coco_eval.stats[7]),
        'mAR_100': float(coco_eval.stats[8]),
        'mAR_small': float(coco_eval.stats[9]),
        'mAR_medium': float(coco_eval.stats[10]),
        'mAR_large': float(coco_eval.stats[11]),
    }
    
    metrics_file = os.path.join(output_dir, "metrics.json")
    with open(metrics_file, 'w') as f:
        json.dump(metrics, f, indent=2)
    
    print("\n✅ 평가 완료")
    print(f"mAP: {metrics['mAP']:.4f}")
    print(f"mAP50: {metrics['mAP50']:.4f}")
    print(f"mAP75: {metrics['mAP75']:.4f}")
else:
    print("❌ 평가할 예측 결과가 없습니다.")

In [None]:
# 결과 요약 출력
if model is not None:
    print("\n" + "="*50)
    print("Grounding DINO 추론 결과 요약")
    print("="*50)
    print(f"처리된 이미지 수: {len(all_results)}")
    print(f"총 예측 박스 수: {sum(len(r['boxes']) for r in all_results)}")
    
    # 클래스별 예측 수
    class_counts = {name: 0 for name in CLASS_NAMES}
    for result in all_results:
        for label in result['labels']:
            class_counts[CLASS_NAMES[label]] += 1
    
    print("\n클래스별 예측 수:")
    for class_name, count in class_counts.items():
        print(f"  {class_name}: {count}")
    
    print(f"\n결과 저장 위치: {output_dir}")

### Finetuning

In [34]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from groundingdino.util.inference import load_model
from tqdm import tqdm
import json
import torchvision.transforms as T
from PIL import Image

In [35]:
# 커스텀 collate_fn: 가변 길이 데이터 처리
def collate_fn(batch):
    """가변 길이 박스와 라벨을 처리하는 collate 함수"""
    images = []
    text_prompts = []
    boxes = []
    labels = []
    image_ids = []
    
    for item in batch:
        images.append(item['image'])
        text_prompts.append(item['text_prompt'])
        boxes.append(torch.tensor(item['boxes'], dtype=torch.float32))
        labels.append(torch.tensor(item['labels'], dtype=torch.long))
        image_ids.append(item['image_id'])
    
    # 이미지들을 배치로 묶기
    images = torch.stack(images)
    
    return {
        'image': images,
        'text_prompt': text_prompts,  # 리스트로 유지 (각각 다른 프롬프트일 수 있음)
        'boxes': boxes,  # 리스트로 유지 (각 이미지마다 박스 개수 다름)
        'labels': labels,  # 리스트로 유지
        'image_id': image_ids
    }
# Grounding DINO의 inference에서 사용하는 transforms 확인
# load_image 함수를 참고하여 동일한 전처리 사용
def create_grounding_dino_transform():
    """Grounding DINO와 동일한 이미지 전처리"""
    return T.Compose([
        T.Resize([800], max_size=1333),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

In [28]:
# 데이터셋 클래스 정의
class GroundingDINODataset(Dataset):
    def __init__(self, ann_file, images_dir, transform=None):
        with open(ann_file, 'r') as f:
            self.coco_data = json.load(f)
        
        self.images_dir = images_dir
        self.transform = transform
        
        # 이미지 ID와 파일명 매핑
        self.image_id_to_file = {img['id']: img['file_name'] for img in self.coco_data['images']}
        self.image_ids = list(self.image_id_to_file.keys())
        
        # 어노테이션을 이미지 ID별로 그룹화
        self.annotations_by_image = {}
        for ann in self.coco_data['annotations']:
            img_id = ann['image_id']
            if img_id not in self.annotations_by_image:
                self.annotations_by_image[img_id] = []
            self.annotations_by_image[img_id].append(ann)
        
        # 카테고리 매핑
        self.category_id_to_name = {cat['id']: cat['name'] for cat in self.coco_data['categories']}
        self.category_name_to_id = {cat['name']: cat['id'] for cat in self.coco_data['categories']}
    
    def __len__(self):
        return len(self.image_ids)
    
    def __getitem__(self, idx):
        img_id = self.image_ids[idx]
        file_name = self.image_id_to_file[img_id]
        img_path = os.path.join(self.images_dir, file_name)
        
        # 이미지 로드
        from PIL import Image
        image = Image.open(img_path).convert('RGB')
        
        # 어노테이션 가져오기
        anns = self.annotations_by_image.get(img_id, [])
        
        # 텍스트 프롬프트 생성
        text_prompt = ". ".join([f"{name} tomato" for name in CLASS_NAMES]) + "."
        
        # 변환 적용
        if self.transform:
            image = self.transform(image)
        
        # 타겟 준비 (bbox, labels)
        boxes = []
        labels = []
        for ann in anns:
            boxes.append(ann['bbox'])  # [x, y, w, h]
            category_id = ann['category_id']
            category_name = self.category_id_to_name[category_id]
            label = CLASS_NAMES.index(category_name)
            labels.append(label)
        
        return {
            'image': image,
            'text_prompt': text_prompt,
            'boxes': boxes,
            'labels': labels,
            'image_id': img_id
        }

In [38]:
# 데이터셋 및 데이터로더 생성
train_ann_file = get_project_path("data", "TomatOD_COCO_3", "train", "custom_train.json")
train_images_dir = get_project_path("data", "TomatOD_COCO_3", "train", "images")
val_ann_file = get_project_path("data", "TomatOD_COCO_3", "val", "custom_val.json")
val_images_dir = get_project_path("data", "TomatOD_COCO_3", "val", "images")

# 변환 정의
transform = create_grounding_dino_transform()

# 데이터셋 생성
train_dataset = GroundingDINODataset(train_ann_file, train_images_dir, transform=transform)
val_dataset = GroundingDINODataset(val_ann_file, val_images_dir, transform=transform)

# DataLoader에 collate_fn 추가
train_loader = DataLoader(
    train_dataset, 
    batch_size=2, 
    shuffle=True, 
    num_workers=0,  # 멀티프로세싱 문제 방지를 위해 0으로 설정
    collate_fn=collate_fn
)
val_loader = DataLoader(
    val_dataset, 
    batch_size=2, 
    shuffle=False, 
    num_workers=0,  # 멀티프로세싱 문제 방지를 위해 0으로 설정
    collate_fn=collate_fn
)
print(f"Train samples: {len(train_dataset)}")
print(f"Val samples: {len(val_dataset)}")

Train samples: 193
Val samples: 54


In [39]:

# 모델 설정 (학습 모드)
model.train()

# 옵티마이저 및 스케줄러 설정
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5, weight_decay=0.0001)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)


In [40]:
# 학습 루프
num_epochs = 10
device = 'cuda'

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    
    print(f"\nEpoch {epoch+1}/{num_epochs}")
    for batch in tqdm(train_loader, desc="Training"):
        # 배치 데이터 처리
        images = batch['image'].to(device)
        text_prompts = batch['text_prompt']
        boxes = batch['boxes']
        labels = batch['labels']
        
        # Forward pass
        optimizer.zero_grad()
        
        # Grounding DINO의 forward는 복잡하므로 실제 구현 필요
        # 여기서는 예시 구조만 제공
        # outputs = model(images, captions=text_prompts)
        # loss = compute_loss(outputs, boxes, labels)
        # loss.backward()
        # optimizer.step()
        
        # train_loss += loss.item()
    
    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Validation"):
            # Validation 로직
            pass
    
    scheduler.step()
    print(f"Train Loss: {train_loss/len(train_loader):.4f}, Val Loss: {val_loss/len(val_loader):.4f}")

# 모델 저장
save_path = get_project_path("weights", "groundingdino_finetuned.pth")
torch.save(model.state_dict(), save_path)
print(f"✅ 파인튜닝된 모델 저장: {save_path}")


Epoch 1/10


Training: 100%|██████████| 97/97 [00:25<00:00,  3.77it/s]
Validation: 100%|██████████| 27/27 [00:06<00:00,  4.00it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 2/10


Training: 100%|██████████| 97/97 [00:27<00:00,  3.55it/s]
Validation: 100%|██████████| 27/27 [00:06<00:00,  4.03it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 3/10


Training: 100%|██████████| 97/97 [00:26<00:00,  3.64it/s]
Validation: 100%|██████████| 27/27 [00:07<00:00,  3.64it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 4/10


Training: 100%|██████████| 97/97 [00:26<00:00,  3.65it/s]
Validation: 100%|██████████| 27/27 [00:07<00:00,  3.48it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 5/10


Training: 100%|██████████| 97/97 [00:24<00:00,  3.96it/s]
Validation: 100%|██████████| 27/27 [00:09<00:00,  2.90it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 6/10


Training: 100%|██████████| 97/97 [00:24<00:00,  3.96it/s]
Validation: 100%|██████████| 27/27 [00:08<00:00,  3.22it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 7/10


Training: 100%|██████████| 97/97 [00:25<00:00,  3.77it/s]
Validation: 100%|██████████| 27/27 [00:08<00:00,  3.12it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 8/10


Training: 100%|██████████| 97/97 [00:24<00:00,  3.93it/s]
Validation: 100%|██████████| 27/27 [00:08<00:00,  3.11it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 9/10


Training: 100%|██████████| 97/97 [00:24<00:00,  3.95it/s]
Validation: 100%|██████████| 27/27 [00:07<00:00,  3.51it/s]


Train Loss: 0.0000, Val Loss: 0.0000

Epoch 10/10


Training: 100%|██████████| 97/97 [00:25<00:00,  3.80it/s]
Validation: 100%|██████████| 27/27 [00:06<00:00,  4.04it/s]


Train Loss: 0.0000, Val Loss: 0.0000
✅ 파인튜닝된 모델 저장: /home/hyeonjin/tomato-detection-agentic/weights/groundingdino_finetuned.pth


In [45]:
# 모델 설정 (파인튜닝된 가중치 사용)
PROJECT_ROOT = find_project_root()
CONFIG_PATH = get_project_path("configs", "gdino", "gdino_3class.py")
ORIGINAL_WEIGHTS_PATH = get_project_path("weights", "groundingdino_swint_ogc.pth")  # 원본 가중치
FINETUNED_WEIGHTS_PATH = get_project_path("weights", "groundingdino_finetuned.pth")  # 파인튜닝된 가중치

print(f"Config path: {CONFIG_PATH}")
print(f"Original weights path: {ORIGINAL_WEIGHTS_PATH}")
print(f"Finetuned weights path: {FINETUNED_WEIGHTS_PATH}")

# 모델 로드 (원본 가중치로 먼저 로드)
if GROUNDING_DINO_AVAILABLE:
    try:
        # 1단계: 원본 가중치로 모델 구조 로드
        model = load_model(CONFIG_PATH, ORIGINAL_WEIGHTS_PATH, device='cuda')
        
        # 2단계: 파인튜닝된 state_dict 로드 및 적용
        if os.path.exists(FINETUNED_WEIGHTS_PATH):
            print("파인튜닝된 가중치를 로드합니다...")
            finetuned_state_dict = torch.load(FINETUNED_WEIGHTS_PATH, map_location='cuda')
            model.load_state_dict(finetuned_state_dict)
            print("✅ 파인튜닝된 가중치 적용 완료")
        else:
            print("⚠️ 파인튜닝된 가중치 파일을 찾을 수 없습니다. 원본 가중치를 사용합니다.")
        
        model.eval()
        print("✅ Grounding DINO 모델 로드 완료")
    except Exception as e:
        print(f"❌ 모델 로드 실패: {e}")
        import traceback
        traceback.print_exc()
        print("수동으로 모델을 다운로드하거나 경로를 확인하세요.")
        model = None
else:
    model = None

Config path: /home/hyeonjin/tomato-detection-agentic/configs/gdino/gdino_3class.py
Original weights path: /home/hyeonjin/tomato-detection-agentic/weights/groundingdino_swint_ogc.pth
Finetuned weights path: /home/hyeonjin/tomato-detection-agentic/weights/groundingdino_finetuned.pth
final text_encoder_type: bert-base-uncased
파인튜닝된 가중치를 로드합니다...


  finetuned_state_dict = torch.load(FINETUNED_WEIGHTS_PATH, map_location='cuda')


✅ 파인튜닝된 가중치 적용 완료
✅ Grounding DINO 모델 로드 완료


In [7]:
# 프롬프트 엔지니어링: 클래스별 개별 추론

# 클래스 정의
CLASS_NAMES = ['fully-ripe', 'semi-ripe', 'unripe']
CLASS_COLORS = ['#D0021B', '#F8E71C', '#7ED321']  # 빨강, 노랑, 초록

def predict_with_grounding_dino_per_class(
    model,
    image_path: str,
    class_name: str,
    box_threshold: float = 0.3,
    text_threshold: float = 0.25
) -> Tuple[np.ndarray, np.ndarray, int]:
    """
    단일 클래스에 대해 Grounding DINO로 예측 수행
    
    Returns:
        boxes: (N, 4) numpy array [x1, y1, x2, y2] (정규화된 좌표)
        scores: (N,) numpy array
        label: 클래스 인덱스
    """
    # 단일 클래스 프롬프트
    text_prompt = f"{class_name} tomato."
    
    # 이미지 로드 및 전처리
    image_source, image = load_image(image_path)
    
    # 예측 수행
    boxes, logits, phrases = predict(
        model=model,
        image=image,
        caption=text_prompt,
        box_threshold=box_threshold,
        text_threshold=text_threshold
    )
    
    # 클래스 인덱스 찾기
    class_idx = CLASS_NAMES.index(class_name)
    
    # 모든 박스에 동일한 라벨 할당
    labels = np.full(len(boxes), class_idx)
    
    return boxes, logits, labels

# 배치 추론 실행 (클래스별 개별 추론)
all_results = []

BOX_THRESHOLD = 0.3
TEXT_THRESHOLD = 0.25
MIN_BOX_AREA = 100
MAX_BOX_AREA_RATIO = 0.8

if model is not None:
    for img_id, img_path, file_name in tqdm(test_image_files, desc="추론 중"):
        try:
            # 이미지 정보 가져오기
            img_info = image_id_to_info[img_id]
            img_width = img_info['width']
            img_height = img_info['height']
            img_area = img_width * img_height
            
            # 모든 클래스에 대해 개별 추론
            all_boxes = []
            all_scores = []
            all_labels = []
            
            for class_idx, class_name in enumerate(CLASS_NAMES):
                # 각 클래스별로 개별 추론
                boxes, scores, labels = predict_with_grounding_dino_per_class(
                    model=model,
                    image_path=img_path,
                    class_name=class_name,
                    box_threshold=BOX_THRESHOLD,
                    text_threshold=TEXT_THRESHOLD
                )
                
                # 박스를 COCO 형식으로 변환 및 필터링
                for i in range(len(boxes)):
                    x1, y1, x2, y2 = boxes[i]
                    
                    # 정규화된 좌표를 픽셀 좌표로 변환
                    x1 = int(x1 * img_width)
                    y1 = int(y1 * img_height)
                    x2 = int(x2 * img_width)
                    y2 = int(y2 * img_height)
                    
                    # 박스 크기 계산
                    w = x2 - x1
                    h = y2 - y1
                    box_area = w * h
                    
                    # 필터링
                    if box_area < MIN_BOX_AREA:
                        continue
                    if box_area > img_area * MAX_BOX_AREA_RATIO:
                        continue
                    
                    # 이미지 범위 내인지 확인 및 클리핑
                    if x1 < 0 or y1 < 0 or x2 > img_width or y2 > img_height:
                        x1 = max(0, min(x1, img_width))
                        y1 = max(0, min(y1, img_height))
                        x2 = max(0, min(x2, img_width))
                        y2 = max(0, min(y2, img_height))
                        w = x2 - x1
                        h = y2 - y1
                        if w * h < MIN_BOX_AREA:
                            continue
                    
                    # COCO 형식: [x, y, width, height]
                    coco_box = [x1, y1, w, h]
                    all_boxes.append(coco_box)
                    all_scores.append(float(scores[i]))
                    all_labels.append(int(labels[i]))
            
            # 결과 저장
            result = {
                'image_id': img_id,
                'file_name': file_name,
                'boxes': all_boxes,
                'scores': all_scores,
                'labels': all_labels
            }
            all_results.append(result)
            
        except Exception as e:
            print(f"❌ {file_name} 처리 실패: {e}")
            continue
    
    print(f"✅ 추론 완료: {len(all_results)}개 이미지")
else:
    print("❌ 모델이 로드되지 않았습니다.")

❌ 모델이 로드되지 않았습니다.


In [1]:
# 시각화 (visualization.py 방식으로 변경)

from PIL import ImageDraw, ImageFont

# visualization.py의 COLORS 상수 사용 (RGB 0-255 범위)
CLASS_COLORS_RGB = {
    0: (255, 0, 0),        # fully-ripe (red) - 빨간색
    1: (255, 165, 0),      # semi-ripe (orange) - 오렌지색
    2: (0, 128, 0),        # unripe (green) - 초록색
}

def visualize_predictions(
    image_path: str,
    predictions: Dict,
    save_path: Optional[str] = None,
    confidence_threshold: float = 0.2
):
    """예측 결과 시각화 (모델 예측만 표시)"""
    # 이미지 로드
    img = Image.open(image_path).convert('RGB')
    img_width, img_height = img.size
    
    # PIL ImageDraw 사용
    draw = ImageDraw.Draw(img)
    
    # 폰트 설정
    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16)
    except:
        font = ImageFont.load_default()
    
    # 예측 박스 그리기 (confidence threshold만 적용)
    for box, score, label in zip(
        predictions['boxes'],
        predictions['scores'],
        predictions['labels']
    ):
        # Confidence threshold 적용
        if score < confidence_threshold:
            continue
            
        x, y, w, h = box
        
        # 박스 좌표 정규화 (w, h가 음수인 경우 처리)
        if w < 0:
            x = x + w
            w = abs(w)
        if h < 0:
            y = y + h
            h = abs(h)
        
        # 좌표를 정수로 변환
        x, y, w, h = int(x), int(y), int(w), int(h)
        
        # 이미지 범위 내로 클리핑
        x1 = max(0, min(x, img_width))
        y1 = max(0, min(y, img_height))
        x2 = max(0, min(x + w, img_width))
        y2 = max(0, min(y + h, img_height))
        
        # 클리핑 후 유효한 박스인지 확인
        if x2 <= x1 or y2 <= y1:
            continue
        
        # 색상 가져오기 (클래스별 색상)
        # label=0: fully-ripe (빨간색)
        # label=1: semi-ripe (오렌지색)
        # label=2: unripe (초록색)
        color_rgb = CLASS_COLORS_RGB.get(label, (255, 255, 255))
        
        # 박스 그리기 (visualization.py처럼 width=8)
        draw.rectangle([x1, y1, x2, y2], outline=color_rgb, width=8)
        
        # 라벨 텍스트
        class_name = CLASS_NAMES[label]
        text = f"{class_name}: {score:.2f}"
        
        # 텍스트 배경 그리기
        bbox = draw.textbbox((x1, y1 - 20), text, font=font)
        draw.rectangle(bbox, fill=color_rgb)
        draw.text((x1, y1 - 20), text, fill="white", font=font)
    
    # 저장 또는 표시
    if save_path:
        img.save(save_path, quality=95, optimize=False)
    else:
        img.show()

# 몇 개 이미지 시각화
if model is not None and len(all_results) > 0:
    num_visualize = 30
    confidence_threshold = 0.2  # 임계값 설정
    
    for i in range(num_visualize):
        result = all_results[i]
        img_id = result['image_id']
        img_path = os.path.join(test_images_dir, result['file_name'])
        
        # 예측 결과 준비 (모델이 예측한 결과만 사용)
        pred_dict = {
            'boxes': result['boxes'],      # 모델 예측 박스
            'scores': result['scores'],     # 모델 예측 점수
            'labels': result['labels']      # 모델 예측 라벨
        }
        
        # 시각화 저장
        save_path = os.path.join(
            output_dir, "visualizations",
            f"pred_{result['file_name']}"
        )
        visualize_predictions(
            img_path, 
            pred_dict, 
            save_path,
            confidence_threshold=confidence_threshold
        )
    
    print(f"✅ {num_visualize}개 이미지 시각화 완료")

NameError: name 'Dict' is not defined