In [1]:
from ultralytics import YOLO
import os
import numpy as np
import pandas as pd
import torch
from PIL import Image
from ensemble_boxes import weighted_boxes_fusion
from tqdm import tqdm

def ensemble_predictions(predictions_list, width, height, iou_threshold=0.5, conf_threshold=0.05):
    boxes_list = []
    scores_list = []
    labels_list = []

    for preds in predictions_list:
        boxes = []
        scores = []
        labels = []
        for pred in preds:
            # pred는 'boxes', 'conf', 'cls' 키를 가진 딕셔너리입니다.
            boxes_tensor = pred['boxes']
            confs = pred['conf']
            clss = pred['cls']
            if confs.shape[0] > 0:
                for box, conf, cls in zip(boxes_tensor, confs, clss):
                    if conf.item() > conf_threshold:
                        x1, y1, x2, y2 = box.cpu().numpy()
                        # 박스 좌표를 정규화
                        x1 /= width
                        y1 /= height
                        x2 /= width
                        y2 /= height
                        boxes.append([x1, y1, x2, y2])
                        scores.append(conf.item())
                        labels.append(int(cls.item()))
        boxes_list.append(boxes)
        scores_list.append(scores)
        labels_list.append(labels)
    # WBF 수행
    boxes, scores, labels = weighted_boxes_fusion(
        boxes_list, scores_list, labels_list,
        iou_thr=iou_threshold,
        skip_box_thr=conf_threshold
    )
    # 박스를 원본 크기로 되돌림
    boxes = boxes * [width, height, width, height]

    final_predictions = []
    for i in range(len(boxes)):
        final_predictions.append({
            'bbox': boxes[i],
            'conf': scores[i],
            'cls': labels[i]
        })
    return final_predictions

# 모델 정의
models = [YOLO(f'/data/ephemeral/home/baseline/Yolo11/Yolo11_epoch100_fold{i}/weights/best.pt') for i in range(5)]

def main():
    test_dir = '/data/ephemeral/home/dataset/test'
    output_dir = '/data/ephemeral/home/baseline/Yolo11'
    output_file = os.path.join(output_dir, 'Yolo11_100_TTA_test.csv')
    
    os.makedirs(output_dir, exist_ok=True)
    
    prediction_strings = []
    file_names = []
    
    # 이미지 파일을 오름차순으로 정렬
    image_list = sorted([img for img in os.listdir(test_dir) if img.lower().endswith(('.jpg', '.jpeg', '.png'))])
    
    # TTA 설정 정의
    image_scales = [(600, 600), (800, 800), (1024, 1024), (1333, 800), (1333, 1333)]
    flip = True  # 수평 반전 적용

    for img_name in tqdm(image_list):
        img_path = os.path.join(test_dir, img_name)
        
        # 이미지 로드
        original_image = Image.open(img_path).convert('RGB')
        width, height = original_image.size

        # 증강된 이미지 준비
        augmented_images = []
        for scale in image_scales:
            # 비율을 유지하면서 리사이즈
            resized_image = original_image.resize(scale, Image.ANTIALIAS)
            augmented_images.append((resized_image, scale, False))  # 원본 방향
            if flip:
                flipped_image = resized_image.transpose(Image.FLIP_LEFT_RIGHT)
                augmented_images.append((flipped_image, scale, True))  # 반전된 방향

        # 모든 모델과 증강에 대한 예측 수집
        predictions_list = []
        for model in models:
            model_predictions = []
            for aug_image, scale_size, is_flipped in augmented_images:
                # 추론 수행
                results = model(aug_image, verbose=False)
                for result in results:
                    if result.boxes is not None and len(result.boxes) > 0:
                        # 박스 좌표 복사
                        boxes = result.boxes.xyxy.clone()
                        # 이미지가 반전되었다면 박스 조정
                        if is_flipped:
                            boxes[:, [0, 2]] = scale_size[0] - boxes[:, [2, 0]]
                        # 박스를 원본 이미지 크기에 맞게 조정
                        scale_w = width / scale_size[0]
                        scale_h = height / scale_size[1]
                        boxes[:, [0, 2]] *= scale_w
                        boxes[:, [1, 3]] *= scale_h
                        # 신뢰도와 클래스 정보 복사
                        confs = result.boxes.conf.clone()
                        clss = result.boxes.cls.clone()
                        # 모델 예측에 추가
                        model_predictions.append({
                            'boxes': boxes,
                            'conf': confs,
                            'cls': clss
                        })
            predictions_list.append(model_predictions)
        
        # WBF를 사용하여 예측 결과 앙상블
        ensemble_preds = ensemble_predictions(predictions_list, width, height)
            
        # 제출 형식으로 변환
        prediction_string = ' '.join([
            f"{int(pred['cls'])} {pred['conf']:.6f} "
            f"{pred['bbox'][0]:.2f} {pred['bbox'][1]:.2f} {pred['bbox'][2]:.2f} {pred['bbox'][3]:.2f}"
            for pred in ensemble_preds
        ])
            
        prediction_strings.append(prediction_string)
        file_names.append(f"test/{img_name}") 
        
        # 주기적으로 GPU 메모리 비우기
        if len(prediction_strings) % 100 == 0:
            torch.cuda.empty_cache()
    
    # 최종 예측을 CSV로 저장
    submission = pd.DataFrame({
        'PredictionString': prediction_strings,
        'image_id': file_names
    })
    
    submission.to_csv(output_file, index=None)
    print(f"앙상블 예측이 {output_file}에 저장되었습니다.")

if __name__ == "__main__":
    main()


  resized_image = original_image.resize(scale, Image.ANTIALIAS)
  3%|▎         | 136/4871 [05:38<3:16:17,  2.49s/it]


KeyboardInterrupt: 