In [17]:
from ultralytics import YOLO
import os
import shutil
from sklearn.model_selection import train_test_split
import torch
import ultralytics
import cv2
import matplotlib.pyplot as plt
from collections import defaultdict
from pathlib import Path
import glob
import optuna
from optuna.visualization import (
    plot_optimization_history,
    plot_param_importances,
    plot_parallel_coordinate,
    plot_slice
)
import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image, ImageEnhance, ImageOps
os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'

In [18]:
print(f"PyTorch 버전: {torch.__version__}")
print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
print(f"Ultralytics 버전: {ultralytics.__version__}")
print(f"CUDA 버전: {torch.version.cuda}")

model = YOLO('yolov8n.pt')
print(f"모델 작업 유형: {model.task}")

PyTorch 버전: 2.0.1+cu117
CUDA 사용 가능: True
Ultralytics 버전: 8.3.0
CUDA 버전: 11.7
모델 작업 유형: detect


In [19]:
# 자동 정제 스크립트 (박스 우선 유지)

def clean_labels(label_dir):
    for filename in os.listdir(label_dir):
        filepath = os.path.join(label_dir, filename)
        with open(filepath, 'r') as f:
            lines = [l for l in f.readlines() if len(l.split()) == 5]
        
        with open(filepath, 'w') as f:
            f.writelines(lines)
        
clean_labels("C:/Users/user/Downloads/dataset/datasetHard/labels/train")
clean_labels("C:/Users/user/Downloads/dataset/datasetHard/labels/val")
clean_labels("C:/Users/user/Downloads/dataset/datasetHard/labels/test")

In [20]:
# 비어있는 파일이 있는 경우 성능 저하를 야기할 수 있으므로 비어 있는 label 파일과 이미지도 같이 삭제하는 파일

def clean_empty_labels(label_dir, image_dir=None, delete_images=False):
    label_dir = Path(label_dir)
    if image_dir:
        image_dir = Path(image_dir)

    removed_labels = 0
    removed_images = 0

    for label_file in label_dir.glob("*.txt"):
        if label_file.stat().st_size == 0:  # 파일이 비어있는지 확인
            # 라벨 파일 삭제
            label_file.unlink()
            removed_labels += 1

            if delete_images and image_dir:
                # 이미지 파일명 추정 (확장자 jpg, png 등)
                stem = label_file.stem
                for ext in ['.jpg', '.jpeg', '.png', '.bmp']:
                    img_file = image_dir / f"{stem}{ext}"
                    if img_file.exists():
                        img_file.unlink()
                        removed_images += 1
                        break

    print(f"삭제된 비어있는 라벨 파일: {removed_labels}개")
    if delete_images:
        print(f"함께 삭제된 이미지 파일: {removed_images}개")

        
# 라벨과 이미지 모두 삭제
clean_empty_labels(
    label_dir=r"C:\Users\user\Downloads\dataset\datasetHard\labels\train",
    image_dir=r"C:\Users\user\Downloads\dataset\datasetHard\images\train",
    delete_images=True
)

clean_empty_labels(
    label_dir=r"C:\Users\user\Downloads\dataset\datasetHard\labels\val",
    image_dir=r"C:\Users\user\Downloads\dataset\datasetHard\images\val",
    delete_images=True
)

clean_empty_labels(
    label_dir=r"C:\Users\user\Downloads\dataset\datasetHard\labels\test",
    image_dir=r"C:\Users\user\Downloads\dataset\datasetHard\images\test",
    delete_images=True
)

삭제된 비어있는 라벨 파일: 0개
함께 삭제된 이미지 파일: 0개
삭제된 비어있는 라벨 파일: 0개
함께 삭제된 이미지 파일: 0개
삭제된 비어있는 라벨 파일: 0개
함께 삭제된 이미지 파일: 0개


In [21]:
# 비어있는 파일이 존재하는지 확인하는 코드

def check_labels(label_path):
    empty_found = False  # 비어있는 파일이 발견되었는지 여부
    for lbl_file in Path(label_path).glob("*.txt"):
        with open(lbl_file) as f:
            lines = f.readlines()
            if not lines:
                print(f"Empty file: {lbl_file}")
                empty_found = True
            for line in lines:
                parts = line.strip().split()
                if len(parts) != 5:
                    print(f"Invalid line in {lbl_file}: {line}")
    if not empty_found:
        print(f"[{label_path}] 비어있는 파일이 존재하지 않습니다.")

check_labels(r"C:\Users\user\Downloads\dataset\datasetHard\labels\train")
check_labels(r"C:\Users\user\Downloads\dataset\datasetHard\labels\val")
check_labels(r"C:\Users\user\Downloads\dataset\datasetHard\labels\test")

[C:\Users\user\Downloads\dataset\datasetHard\labels\train] 비어있는 파일이 존재하지 않습니다.
[C:\Users\user\Downloads\dataset\datasetHard\labels\val] 비어있는 파일이 존재하지 않습니다.
[C:\Users\user\Downloads\dataset\datasetHard\labels\test] 비어있는 파일이 존재하지 않습니다.


In [22]:
# YOLO 학습 과정에서 생성되는 캐시 파일 삭제하는 코드

cache_files = glob.glob(r"C:/Users/user/Downloads/dataset/datasetHard/labels/*.cache")
for f in cache_files:
    os.remove(f)
print(f"{len(cache_files)}개의 캐시 파일 삭제 완료")


0개의 캐시 파일 삭제 완료


In [23]:
# 클래스별 인스턴스 수를 확인할 수 있는 코드

def count_instance_per_class(name, label_dir):
    class_counts = defaultdict(int)

    for label_file in Path(label_dir).glob("*.txt"):
        with open(label_file) as f:
            for line in f:
                if line.strip():
                    class_id = int(line.split()[0])
                    class_counts[class_id] += 1

    print("{} 클래스별 인스턴스 수:".format(name))
    class_counts = dict(sorted(class_counts.items()))
    for cls, cnt in class_counts.items():
        print(f"Class {cls}: {cnt}개")

count_instance_per_class("train", r"C:\Users\user\Downloads\dataset\datasetHard\labels\train")
count_instance_per_class("val", r"C:\Users\user\Downloads\dataset\datasetHard\labels\val")
count_instance_per_class("test", r"C:\Users\user\Downloads\dataset\datasetHard\labels\test")

train 클래스별 인스턴스 수:
Class 0: 4256개
Class 1: 2596개
Class 2: 5858개
Class 3: 786개
val 클래스별 인스턴스 수:
Class 0: 408개
Class 1: 137개
Class 2: 251개
Class 3: 140개
test 클래스별 인스턴스 수:
Class 0: 192개
Class 1: 284개
Class 2: 135개
Class 3: 176개


In [None]:
# 하이퍼파리미터 + 파라미터 튜닝 하는 부분

def objective(trial):
    try:
        # 하이퍼파라미터 서제스쳔
        lr0 = trial.suggest_float('lr0', 1e-4, 1e-1, log=True)
        batch = trial.suggest_categorical('batch', [8, 16, 32])
        mosaic = trial.suggest_float('mosaic', 0.0, 1.0)
        epochs = 50  # 고정값 사용

        # 모델 초기화 및 학습
        model = YOLO('yolov8n.pt')
        results = model.train(
            data=r"C:\Users\user\Downloads\dataset\datasetHard\data.yaml",
            epochs=epochs,
            lr0=lr0,
            batch=batch,
            mosaic=mosaic,
            imgsz=800,
            device='cuda',
            patience=20,
            verbose=False,  # 학습 로그 간소화
            amp=True,       # 메모리 절약
            plots=False     # Optuna 실행시 플롯 생성 방지
        )

        # mAP50 값 추출 (Ultralytics 버전별 호환성 처리)
        try:
            map50 = results.metrics['metrics/val_map_0.5']
        except AttributeError:
            map50 = results.metrics.map_50  # 8.3.0+ 버전

        return map50

    except Exception as e:
        print(f"Trial failed with error: {e}")
        return 0.0  # 실패시 0 반환

# Optuna 스터디 실행
study = optuna.create_study(
    direction='maximize',
    sampler=optuna.samplers.TPESampler(seed=42)  # 재현성 위해 시드 고정
)
study.optimize(objective, n_trials=10, show_progress_bar=True)

# 결과 출력
print("\n=== Best parameters ===")
print(study.best_params)
print(f"Best mAP50: {study.best_value:.3f}")

# 시각화
plot_optimization_history(study).show()
plot_param_importances(study).show()

In [None]:
# 최적화 하이퍼파라미터와 파러미터 간 상관관계, 파라미터별 성능 분포 시각화 파트

# 1. 최적화 히스토리(각 trial별 best value 변화)
fig1 = plot_optimization_history(study)
fig1.show()

# 2. 하이퍼파라미터 중요도(어떤 파라미터가 성능에 영향이 큰지)
fig2 = plot_param_importances(study)
fig2.show()

# 3. 파라미터 간 상관관계(Parallel Coordinate)
fig3 = plot_parallel_coordinate(study)
fig3.show()

# 4. 파라미터별 성능 분포(Slice Plot)
fig4 = plot_slice(study)
fig4.show()

In [13]:
# threshold를 변경해가면서 test

def test_model_with_thresholds(model,test_params):
    results_list = []
    print("\n📊 Threshold별 테스트 결과")
    print("Threshold |   mAP50   | Precision |  Recall")
    print("---------------------------------------------")

    test_params['conf'] = 0.01
    results = model.val(**test_params)
        # 결과 저장 및 출력
    map50 = results.box.map50
    precision = np.mean(results.box.p)
    recall = np.mean(results.box.r)
    print(f"  {map50:.4f}  |  {precision:.4f}  |  {recall:.4f}")
    results_list.append({
        'threshold': 0.01,
        'mAP50': map50,
        'precision': precision,
        'recall': recall
    })
    return results_list

# 1. 모델 로드
best_model_path = "runs/detect/train54/weights/best.pt"
print("Best model path:", best_model_path)
model = YOLO(best_model_path)

# 2. 테스트 파라미터 설정
test_params = {
    'data': r"C:\Users\user\Downloads\dataset\datasetHard\data.yaml",
    'split': 'test',
    'batch': 32,
    'name': 'disaster_detection_test',
    'plots': True,
    'save_json': False
}
 
results = test_model_with_thresholds(model, test_params)

Best model path: runs/detect/train54/weights/best.pt

📊 Threshold별 테스트 결과
Threshold |   mAP50   | Precision |  Recall
---------------------------------------------
Ultralytics 8.3.0  Python-3.10.9 torch-2.0.1+cu117 CUDA:0 (NVIDIA GeForce RTX 3080, 10240MiB)
Model summary (fused): 186 layers, 2,685,148 parameters, 0 gradients, 6.8 GFLOPs


[34m[1mval: [0mScanning C:\Users\user\Downloads\dataset\dataset\labels\test.cache... 561 images, 0 backgrounds, 0 corrupt: 100%|█[0m
                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100%|██████████| 18/18 [00:45


                   all        561        687      0.883      0.827      0.884      0.668
                 flood        135        207       0.86      0.816       0.85      0.712
                  fire        106        142      0.924      0.965      0.976      0.697
              accident        236        249      0.902      0.775      0.885      0.626
              sinkhole         84         89      0.844      0.753      0.824      0.635
Speed: 0.8ms preprocess, 2.4ms inference, 0.0ms loss, 1.0ms postprocess per image
Results saved to [1mruns\detect\disaster_detection_test31[0m
  0.8835  |  0.8825  |  0.8272


In [None]:
# 랜덤 5개 이미지에 대한 테스트 시각화

def visualize_extreme_diverse_predictions(model):
    test_images_dir = r"C:\Users\user\Downloads\dataset\datasetHard\images\test"
    all_images = os.listdir(test_images_dir)

    # 파일명에서 첫 단어로 그룹화
    image_groups = defaultdict(list)
    for img in all_images:
        if img:
            group_key = img.split('_')[0]  # 예: 'cat_01.jpg' → 'cat'
            image_groups[group_key].append(img)

    # 각 그룹에서 하나씩 뽑아 다양성 확보
    selected_images = [random.choice(group) for group in image_groups.values()]

    # 그룹이 5개 이상이면 랜덤하게 5개만 선택
    if len(selected_images) > 5:
        selected_images = random.sample(selected_images, 5)

    # 선택된 이미지에 대해 예측 및 결과 저장
    for img_name in selected_images:
        img_path = os.path.join(test_images_dir, img_name)
        results = model.predict(
            source=img_path,
            save=True,
            conf=0.01,  # 신뢰도 임계값
            save_dir='test_results'  # 예측 결과 저장 경로
        )
        print(f"{img_name} 예측 완료 → test_results 디렉토리 확인")

visualize_extreme_diverse_predictions(model)

In [None]:
from ultralytics import YOLO
import cv2 as cv
from collections import Counter

model = YOLO("runs/detect/train54/weights/best.pt")

# 클래스별 threshold 설정 (예시: 클래스 이름 기준)
class_thresholds = {
    'sinkhole': 0.01,    # sinkhole은 낮은 threshold로 더 많이 탐지
    'fire': 0.3,         # fire은 높은 threshold로 정확도 우선
    'accident': 0.7,
    'flood': 0.5
}

# 클래스 이름을 ID로 변환 (data.yaml의 names 순서와 일치해야 함)
class_names = model.names
class_id_to_threshold = {
    class_id: class_thresholds[class_name] 
    for class_id, class_name in class_names.items() 
    if class_name in class_thresholds
}

sink_video_path = "C:\\Users\\user\\Downloads\\Man on a scooter plunges into sinkhole.mp4"
acci_video_path = "C:\\Users\\user\\Downloads\\Shocking rear-end crash in Wordsley caught on CCTV.mp4"
fire_video_path = "C:\\Users\\user\\Downloads\\RAW Traffic camera shows vehicle fire on I-94E near Clearwater.mp4"
flood_video_path = "C:\\Users\\user\\Downloads\\Security camera footage captures deadly flood in Quito   AFP.mp4"
normal_video_path = "C:\\Users\\user\\Downloads\\ordinary.mp4"
ezfire_video_path = "C:\\Users\\user\\Downloads\\easy_fire.mp4"

cap = cv.VideoCapture(normal_video_path)

fps = cap.get(cv.CAP_PROP_FPS) or 30

frame_buffer = []
threshold_frames = 10
consecutive_detection_required = 5
detection_count = 0
detected_classes = []
event_start_frame = None

while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_buffer.append(frame)

    if len(frame_buffer) == 15:
        current_frame = int(cap.get(cv.CAP_PROP_POS_FRAMES))
        batch_start_frame = current_frame - 15

        # 1. 낮은 임계값으로 모든 후보 탐지 (클래스별 필터링을 위해)
        results = model.predict(frame_buffer, imgsz=800, device='cuda', batch=15, conf=0.01)

        detected_frames = 0
        current_batch_classes = []
        
        # 2. 클래스별 threshold 적용하여 필터링
        for result in results:
            valid_detections = []
            if len(result.boxes) > 0:
                for box in result.boxes:
                    cls = int(box.cls.item())
                    conf = box.conf.item()
                    # 해당 클래스의 threshold 확인
                    if cls in class_id_to_threshold and conf >= class_id_to_threshold[cls]:
                        valid_detections.append(cls)
            
            # 유효한 detection이 있는 경우만 카운트
            if len(valid_detections) > 0:
                detected_frames += 1
                current_batch_classes.extend(valid_detections)

        # 나머지 로직은 동일
        if detected_frames >= threshold_frames:
            detection_count += 1
            detected_classes.extend(current_batch_classes)
            
            if detection_count == 1:
                event_start_frame = batch_start_frame
                
            print(f"Detection {detection_count} times in a row")
        else:
            detection_count = 0
            detected_classes = []
            event_start_frame = None

        if detection_count >= consecutive_detection_required:
            event_end_frame = current_frame - 1
            start_time_sec = event_start_frame / fps
            end_time_sec = event_end_frame / fps
            
            start_min = int(start_time_sec // 60)
            start_sec = int(start_time_sec % 60)
            end_min = int(end_time_sec // 60)
            end_sec = int(end_time_sec % 60)
            
            class_counts = Counter(detected_classes)
            
            print("\n🚨 Event detected!")
            print(f"⏰ Time: {start_min}:{start_sec:02d} ~ {end_min}:{end_sec:02d}")
            print("📦 Detected classes (with class-specific thresholds):")
            for cls, count in class_counts.items():
                print(f"- {model.names[cls]} (threshold: {class_id_to_threshold[cls]}): {count} times")
            
            detection_count = 0
            detected_classes = []
            event_start_frame = None

        frame_buffer = []

cap.release()
print("Processing finished.")

In [None]:
"C:\Users\user\Desktop\asdasdqw.png"