# VLM_SafetyGuidance Quick Start

물류창고 안전 관제 시스템 - 각 모듈 코드 참고

## 1. 전처리 (AI Hub -> YOLO)

In [None]:
# src/preprocessing/aihub_to_yolo.py 핵심 코드

import json
import shutil
from pathlib import Path

# 카테고리 매핑
CATEGORY_NAMES = {
    "01": "도크설비", "02": "보관", "03": "부가가치서비스",
    "04": "설비및장비", "05": "운반", "06": "입고",
    "07": "지게차", "08": "출고", "09": "파렛트렉",
    "10": "피킹분배", "11": "화재"
}

# 클래스 매핑 (57개) - AI Hub 공식 정의
CLASS_MAPPING = {
    # Static Object (SO) - 정적 객체 21개
    "SO-01": 0,   # 보관랙(선반)
    "SO-02": 1,   # 적재물류(그룹)
    "SO-03": 2,   # 물류(개별)
    "SO-05": 3,   # (미사용)
    "SO-06": 4,   # 도크
    "SO-07": 5,   # 출입문
    "SO-08": 6,   # 화물승강기
    "SO-09": 7,   # 차단멀티탭
    "SO-10": 8,   # 멀티탭
    "SO-11": 9,   # 개인 전열기구
    "SO-12": 10,  # 소화기
    "SO-13": 11,  # 작업 안전구역
    "SO-14": 12,  # 용접 작업 구역
    "SO-15": 13,  # 지게차 이동영역
    "SO-16": 14,  # 출입제한 구역
    "SO-17": 15,  # 화재 대피로
    "SO-18": 16,  # 안전펜스
    "SO-19": 17,  # 화기(용접기,토치)
    "SO-21": 18,  # 이물질(물,기름)
    "SO-22": 19,  # 가연물,인화물
    "SO-23": 20,  # 샌드위치판넬
    # Work Object (WO) - 동적 객체 8개
    "WO-01": 21,  # 작업자(작업복 착용)
    "WO-02": 22,  # 작업자(작업복 미착용)
    "WO-03": 23,  # 화물트럭
    "WO-04": 24,  # 지게차
    "WO-05": 25,  # 핸드파레트카
    "WO-06": 26,  # 롤테이너
    "WO-07": 27,  # 운반수레
    "WO-08": 28,  # 흡연
    # Unsafe Action (UA) - 위험 행동 13개
    "UA-01": 29,  # 지게차 시야 미확보
    "UA-02": 30,  # 지게차 적재 시 장애물
    "UA-03": 31,  # 3단 이상 평치 적재
    "UA-04": 32,  # 랙 적재상태 불량
    "UA-05": 33,  # 운반장비 불안정 적재
    "UA-06": 34,  # 화물 붕괴
    "UA-10": 35,  # 지게차 통로에 사람
    "UA-12": 36,  # 지게차 안전수칙 미준수
    "UA-13": 37,  # 지게차 화물 붕괴
    "UA-14": 38,  # 지게차 구역 내 작업자
    "UA-16": 39,  # 핸드파레트카 과적재
    "UA-17": 40,  # 용접구역 가연물 침범
    "UA-20": 41,  # 비흡연구역 흡연
    # Unsafe Condition (UC) - 위험 상태 15개
    "UC-02": 42,  # 입고 시 트럭 내 작업자
    "UC-06": 43,  # 출고 시 트럭 내 작업자
    "UC-08": 44,  # 지게차 통로 미표시
    "UC-09": 45,  # 도크 출입문 장애물
    "UC-10": 46,  # 도크 접차 시 후방 사람
    "UC-13": 47,  # 빈 파렛트 미정돈
    "UC-14": 48,  # 랙에 기대는 작업자
    "UC-15": 49,  # 파렛트 파손
    "UC-16": 50,  # 화물승강기 탑승
    "UC-17": 51,  # 과부하차단 없는 멀티탭
    "UC-18": 52,  # 소화기 미비치
    "UC-19": 53,  # 출입제한구역 문 열림
    "UC-20": 54,  # 화재대피로 적재물
    "UC-21": 55,  # 도크-트럭 분리
    "UC-22": 56,  # 지게차 영역 이탈
}

def convert_bbox_to_yolo(bbox, img_width, img_height):
    """바운딩박스를 YOLO 형식으로 변환 (normalized x_center, y_center, width, height)"""
    x, y, w, h = bbox
    x_center = (x + w / 2) / img_width
    y_center = (y + h / 2) / img_height
    norm_w = w / img_width
    norm_h = h / img_height
    return x_center, y_center, norm_w, norm_h

In [None]:
# 전처리 실행 (터미널 명령어)
!cd ../src && python -m preprocessing.aihub_to_yolo --folders 01 --sample 100

## 2. 학습 (YOLO 모델)

In [None]:
# src/training/train_yolo.py 핵심 코드

from ultralytics import YOLO
from pathlib import Path

class YOLOTrainer:
    def __init__(self, model_name="yolov8n.pt", device="auto"):
        self.model = YOLO(model_name)
        self.device = device
    
    def train(self, category, epochs=100, batch=16, imgsz=640):
        """YOLO 모델 학습"""
        # data.yaml 경로 찾기
        data_yaml = self._find_data_yaml(category)
        
        # 학습 실행
        results = self.model.train(
            data=data_yaml,
            epochs=epochs,
            batch=batch,
            imgsz=imgsz,
            device=self.device,
            project="../models",
            name=f"safety_{category}"
        )
        return results
    
    def _find_data_yaml(self, category):
        """카테고리별 data.yaml 경로 찾기"""
        CATEGORIES = {
            "01": "도크설비", "02": "보관", "03": "부가가치서비스",
            "04": "설비및장비", "05": "운반", "06": "입고",
            "07": "지게차", "08": "출고", "09": "파렛트렉",
            "10": "피킹분배", "11": "화재"
        }
        category_name = CATEGORIES.get(category)
        return f"../data/{category}_{category_name}/logistics_yolo/data.yaml"

In [None]:
# 학습 실행 (터미널 명령어)
!cd ../src && python -m training.train_yolo --category 01 --epochs 10

## 3. Monitoring (객체 탐지)

In [None]:
# src/monitoring/__init__.py 핵심 코드

from ultralytics import YOLO
import torch
from pathlib import Path

PROJECT_ROOT = Path("..")
MODELS_DIR = PROJECT_ROOT / "models"

def _find_best_model():
    """models/ 폴더에서 가장 최근 학습된 best.pt 찾기"""
    if not MODELS_DIR.exists():
        return None
    best_models = list(MODELS_DIR.rglob("weights/best.pt"))
    if not best_models:
        return None
    best_models.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    return str(best_models[0])

# 모델 로드
custom_model_path = _find_best_model()
model = YOLO(custom_model_path) if custom_model_path else YOLO('yolov8n.pt')
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

# 위험 클래스 (57개 중 UA/UC) - AI Hub 공식 정의
ANOMALY_CLASSES = [
    # Unsafe Action (UA) - 위험 행동 13개
    "forklift_blind_spot",       # UA-01: 지게차 시야 미확보
    "forklift_obstacle_nearby",  # UA-02: 지게차 적재 시 장애물
    "stacking_3_levels_flat",    # UA-03: 3단 이상 평치 적재
    "rack_improper_stacking",    # UA-04: 랙 적재상태 불량
    "unstable_cargo_loading",    # UA-05: 운반장비 불안정 적재
    "cargo_collapse",            # UA-06: 화물 붕괴
    "person_in_forklift_path",   # UA-10: 지게차 통로에 사람
    "forklift_safety_violation", # UA-12: 지게차 안전수칙 미준수
    "forklift_cargo_collapse",   # UA-13: 지게차 화물 붕괴
    "worker_in_forklift_zone",   # UA-14: 지게차 구역 내 작업자
    "pallet_truck_over_stacking",# UA-16: 핸드파레트카 과적재
    "flammable_in_welding_zone", # UA-17: 용접구역 가연물 침범
    "smoking_in_no_smoke_zone",  # UA-20: 비흡연구역 흡연
    # Unsafe Condition (UC) - 위험 상태 15개
    "worker_in_truck_loading",   # UC-02: 입고 시 트럭 내 작업자
    "worker_in_truck_unloading", # UC-06: 출고 시 트럭 내 작업자
    "forklift_path_unmarked",    # UC-08: 지게차 통로 미표시
    "dock_door_obstacle",        # UC-09: 도크 출입문 장애물
    "person_behind_docking",     # UC-10: 도크 접차 시 후방 사람
    "pallet_disorganized",       # UC-13: 빈 파렛트 미정돈
    "worker_leaning_on_rack",    # UC-14: 랙에 기대는 작업자
    "pallet_damaged",            # UC-15: 파렛트 파손
    "worker_in_elevator",        # UC-16: 화물승강기 탑승
    "no_surge_protector",        # UC-17: 과부하차단 없는 멀티탭
    "no_fire_extinguisher",      # UC-18: 소화기 미비치
    "restricted_door_open",      # UC-19: 출입제한구역 문 열림
    "cargo_in_fire_escape",      # UC-20: 화재대피로 적재물
    "truck_dock_separated",      # UC-21: 도크-트럭 분리
    "forklift_outside_path",     # UC-22: 지게차 영역 이탈
]

## 4. Reasoning (위험 분석)

In [None]:
# src/reasoning/__init__.py 핵심 코드

import os
import base64
import json
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

def analyze_risk_with_vlm(detection_result):
    """GPT-4o VLM으로 위험 분석"""
    image_path = detection_result.get("image_path")
    with open(image_path, "rb") as f:
        base64_image = base64.b64encode(f.read()).decode('utf-8')
    
    prompt = """
    당신은 공장의 AI 안전 관리자입니다.
    이미지를 분석하여 위험을 평가해주세요.
    - 위험 수준: "LOW", "MED", "HIGH"
    - 이유: 한국어 한 문장
    JSON: {"risk_level": "...", "reason": "..."}
    """
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": [
                {"type": "text", "text": prompt},
                {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
            ]
        }],
        response_format={"type": "json_object"}
    )
    return json.loads(response.choices[0].message.content)

## 5. Action (안전 가이드라인 생성)

In [None]:
# src/action/__init__.py 핵심 코드

def generate_safety_guideline(analysis_result):
    """위험 등급에 따라 다국어 안전 지침 생성"""
    risk_level = analysis_result.get("risk_level")
    reason = analysis_result.get("reason", "")
    
    if risk_level == "LOW":
        return {"status": "logged"}
    
    elif risk_level == "MED":
        print(f"확인 필요: {reason}")
        return {"status": "confirmation_requested"}
    
    elif risk_level == "HIGH":
        # LLM으로 다국어 지침 생성
        prompt = f"""
        상황: {reason}
        다국어 안전 지침 (한국어, 영어, 베트남어)
        JSON: {{"guideline_ko": "...", "guideline_en": "...", "guideline_vi": "..."}}
        """
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        guidelines = json.loads(response.choices[0].message.content)
        print(f"한국어: {guidelines.get('guideline_ko')}")
        print(f"English: {guidelines.get('guideline_en')}")
        print(f"Tiếng Việt: {guidelines.get('guideline_vi')}")
        return {"status": "generated", "guidelines": guidelines}

## 6. 전체 파이프라인 실행

In [None]:
# 파이프라인 실행
import sys
sys.path.append('../src')

from monitoring import detect_objects
from reasoning import analyze_risk_with_vlm
from action import generate_safety_guideline

# 테스트 이미지
image_path = "../data/01_도크설비/logistics_yolo/val/images/image_000001.jpg"

# 1. 객체 탐지
detection_result = detect_objects(image_path)
print("탐지:", detection_result["status"])

# 2. 위험 분석
if detection_result["status"] == "anomaly_detected":
    analysis_result = analyze_risk_with_vlm(detection_result)
    print("분석:", analysis_result)
    
    # 3. 가이드라인 생성
    if analysis_result:
        generate_safety_guideline(analysis_result)

In [None]:
# 또는 run.py로 실행
!cd ../src && python run.py --image ../data/01_도크설비/logistics_yolo/val/images/image_000001.jpg