In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
import time
import os

In [None]:
def __init__(self, model_path='trained_yolov8m.pt', input_video='HK_Falldown.mp4', output_video='output.mp4'):
        """
        비디오 파일 기반 쓰러짐 감지 시스템 초기화
        
        Args:
            model_path (str): 훈련된 YOLOv8 모델 경로
            input_video (str): 입력 비디오 파일 경로
            output_video (str): 출력 비디오 파일 경로
        """
        # YOLO 모델 로드
        self.model = YOLO(model_path)
        
        # 비디오 파일 설정
        self.input_video_path = input_video
        self.output_video_path = output_video
        self.cap = None
        self.out = None
        
        # 비디오 정보 저장 변수
        self.fps = 0
        self.width = 0
        self.height = 0
        self.total_frames = 0
        self.current_frame = 0
        
        # 쓰러짐 감지 상태 변수
        self.falldown_start_time = None
        self.falldown_start_frame = None
        self.falldown_detected = False
        self.warning_triggered = False
        #duration : 쓰러짐 유지 시간
        self.current_falldown_duration = 0
        
        # 바운딩 박스 위치 추적을 위한 변수
        self.previous_falldown_boxes = []
        self.position_tolerance = 50  # 픽셀 단위 허용 거리
        self.max_missing_frames = 10  # 최대 연속 미탐지 프레임 수
        self.missing_frame_count = 0
        
        # 쓰러짐 이벤트 기록 (시간, 지속시간 등)
        self.falldown_events = []

In [None]:
def initialize_video(self):
        """
        비디오 파일 초기화 및 설정
        Returns:
            bool: 초기화 성공 여부
        """
        # 입력 비디오 파일 열기
        self.cap = cv2.VideoCapture(self.input_video_path)
        #비디오 파일 존재 유무 확인
        if not self.cap.isOpened():
            print(f"오류: 입력 비디오 파일을 열 수 없습니다: {self.input_video_path}")
            return False
        
        # 비디오 정보 가져오기
        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        #콘솔에 출력되는 정보
        print(f"비디오 정보:")
        print(f"  해상도: {self.width}x{self.height}")
        print(f"  FPS: {self.fps}")
        print(f"  총 프레임 수: {self.total_frames}")
        print(f"  총 길이: {self.total_frames/self.fps:.2f}초")
        
        # 출력 비디오 설정
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # 코덱 설정
        self.out = cv2.VideoWriter(self.output_video_path, fourcc, self.fps, (self.width, self.height))
        
        if not self.out.isOpened():
            print(f"오류: 출력 비디오 파일을 생성할 수 없습니다: {self.output_video_path}")
            return False
        
        return True

In [None]:
def get_current_time_seconds(self):
        """
        현재 프레임의 시간(초) 계산
        Returns:
            float: 현재 시간(초)
        """
        return self.current_frame / self.fps

In [None]:
def format_time(self, seconds):
        """
        초를 mm:ss 형식으로 변환
        Args:
            seconds (float): 초
        Returns:
            str: mm:ss 형식의 시간
        """
        minutes = int(seconds // 60)
        secs = int(seconds % 60)
        return f"{minutes:02d}:{secs:02d}"




In [None]:
def calculate_box_center(self, x1, y1, x2, y2):
        """
        바운딩 박스의 중심점 계산
        Args:
            x1, y1, x2, y2: 바운딩 박스 좌표
        Returns:
            center_x, center_y: 중심점 좌표
        """
        center_x = (x1 + x2) // 2
        center_y = (y1 + y2) // 2
        return center_x, center_y
def calculate_distance(self, pos1, pos2):
        """
        두 점 사이의 유클리드 거리 계산
        
        Args:
            pos1, pos2: (x, y) 좌표 튜플
            
        Returns:
            distance: 두 점 사이의 거리
        """
        return np.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2)
    

In [None]:
def is_similar_position(self, current_boxes, previous_boxes):
        """
        현재 바운딩 박스들이 이전 바운딩 박스들과 비슷한 위치에 있는지 확인
        
        Args:
            current_boxes: 현재 프레임의 쓰러짐 바운딩 박스 리스트
            previous_boxes: 이전 프레임의 쓰러짐 바운딩 박스 리스트
            
        Returns:
            bool: 비슷한 위치에 있으면 True
        """
        if not previous_boxes or not current_boxes:
            return False
        
        # 각 현재 박스에 대해 이전 박스들과의 거리 확인
        for current_box in current_boxes:
            current_center = self.calculate_box_center(*current_box)
            
            # 이전 박스들 중 하나라도 허용 거리 내에 있으면 연속성 인정
            for previous_box in previous_boxes:
                previous_center = self.calculate_box_center(*previous_box)
                distance = self.calculate_distance(current_center, previous_center)
                
                if distance <= self.position_tolerance:
                    return True
        
        return False

In [None]:
def detect_objects(self, frame):
        """
        프레임에서 객체 탐지 수행
        
        Args:
            frame: 입력 프레임
            
        Returns:
            results: YOLO 탐지 결과
        """
        results = self.model(frame)
        return results
    
    def process_detections(self, results, frame):
        """
        탐지 결과 처리 및 쓰러짐 감지 로직
        
        Args:
            results: YOLO 탐지 결과
            frame: 현재 프레임
            
        Returns:
            processed_frame: 처리된 프레임
        """
        current_time_seconds = self.get_current_time_seconds()
        current_falldown_boxes = []

In [None]:
class VideoFalldownDetectionSystem:
   # 탐지된 객체들 처리
        for result in results:
            boxes = result.boxes
            if boxes is not None:
                for box in boxes:
                    # 클래스 이름 가져오기
                    class_id = int(box.cls[0])
                    class_name = self.model.names[class_id]
                    confidence = float(box.conf[0])
                    
                    # 바운딩 박스 좌표
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    
                    # falldown 클래스 확인
                    if class_name.lower() == 'falldown' and confidence > 0.5:
                        current_falldown_boxes.append((x1, y1, x2, y2))
                        
                        # 바운딩 박스 그리기 (빨간색)
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 3)
                        
                        # 클래스 이름과 신뢰도 표시
                        label = f'{class_name}: {confidence:.2f}'
                        cv2.putText(frame, label, (x1, y1-10), 
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
                        
                        # 중심점 표시
                        center_x, center_y = self.calculate_box_center(x1, y1, x2, y2)
                        cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1)
                    
                    # else:
                    #     # 다른 객체들 (초록색 바운딩 박스)
                    #     cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    #     label = f'{class_name}: {confidence:.2f}'
                    #     cv2.putText(frame, label, (x1, y1-10), 
                    #               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        # 낙상 연속성 판단
        falldown_continuous = False
        
        if current_falldown_boxes:
            if not self.falldown_detected:
                # 새로운 낙상 감지 시작
                falldown_continuous = True
                self.missing_frame_count = 0
            else:
                # 기존 낙상이 진행 중인 경우, 위치 연속성 확인
                if self.is_similar_position(current_falldown_boxes, self.previous_falldown_boxes):
                    falldown_continuous = True
                    self.missing_frame_count = 0
                else:
                    # 위치가 많이 달라진 경우 - 새로운 낙상으로 간주하고 리셋
                    falldown_continuous = True
                    self.falldown_detected = False
                    self.missing_frame_count = 0
        else:
            # 현재 프레임에서 낙상이 감지되지 않음
            if self.falldown_detected:
                self.missing_frame_count += 1
                # 허용 프레임 수 내에서는 연속성 유지
                if self.missing_frame_count <= self.max_missing_frames:
                    falldown_continuous = True
                else:
                    falldown_continuous = False
            else:
                falldown_continuous = False
        
        # 낙상 상태 업데이트
        if falldown_continuous:
            if not self.falldown_detected:
                # 새로운 낙상 감지 시작
                self.falldown_detected = True
                self.falldown_start_time = current_time_seconds
                self.falldown_start_frame = self.current_frame
                self.warning_triggered = False
                print(f"낙상 감지 시작! (시간: {self.format_time(current_time_seconds)})")
            
            # 낙상 지속 시간 계산
            self.current_falldown_duration = current_time_seconds - self.falldown_start_time
            
            # 10초 이상 지속시 경고
            if self.current_falldown_duration >= 10.0 and not self.warning_triggered:
                self.warning_triggered = True
                print(f"경고! 낙상이 10초 이상 지속되었습니다! (시간: {self.format_time(current_time_seconds)})")
                
        else:
            # 낙상이 더 이상 감지되지 않음
            if self.falldown_detected:
                end_time = current_time_seconds
                duration = self.current_falldown_duration
                
                # 낙상 이벤트 기록
                event = {
                    'start_time': self.falldown_start_time,
                    'end_time': end_time,
                    'duration': duration,
                    'start_frame': self.falldown_start_frame,
                    'end_frame': self.current_frame,
                    'warning_triggered': self.warning_triggered
                }
                self.falldown_events.append(event)
                
                print(f"낙상 감지 종료 (시작: {self.format_time(self.falldown_start_time)}, "
                      f"종료: {self.format_time(end_time)}, 지속시간: {duration:.1f}초)")
            
            self.falldown_detected = False
            self.falldown_start_time = None
            self.falldown_start_frame = None
            self.current_falldown_duration = 0
            self.warning_triggered = False
            self.missing_frame_count = 0
        
        # 현재 프레임의 낙상 박스들을 다음 프레임 비교용으로 저장
        self.previous_falldown_boxes = current_falldown_boxes.copy()
        
    return frame 

In [None]:
def draw_status(self, frame):
        """
        화면에 상태 정보 표시
        
        Args:
            frame: 프레임
            
        Returns:
            frame: 상태 정보가 추가된 프레임
        """
        height, width = frame.shape[:2]
        current_time_seconds = self.get_current_time_seconds()
        
        # 프레임 정보
        frame_text = f"Frame: {self.current_frame}/{self.total_frames}"
        cv2.putText(frame, frame_text, (20, 40),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (200, 200, 200), 2)
        
        
        
        # 낙상 감지 상태 표시
        if self.falldown_detected:
            # status_text = "FALLDOWN DETECTED!"
            status_color = (0, 0, 255)  # 빨간색
            
            # 지속 시간 표시 (큰 글씨로)
            duration_text = f"Duration: {self.current_falldown_duration:.1f}s"
            cv2.putText(frame, duration_text, (20, 60),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
            
            # 시작 시간 표시
            start_time_text = f"Started at: {self.format_time(self.falldown_start_time)}"
            cv2.putText(frame, start_time_text, (20, 100),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
            
            # 경고 상태 표시
            if self.warning_triggered:
                warning_text = "*** EMERGENCY ALERT ***"
                cv2.putText(frame, warning_text, (20, 170),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                
                # 화면 테두리 깜빡임 효과
                if int(current_time_seconds * 2) % 2:  # 0.5초마다 깜빡임
                    cv2.rectangle(frame, (0, 0), (width-1, height-1), (0, 0, 255), 10)
        
        return frame

In [None]:
def print_summary(self):
        #처리 완료 후 요약 정보 출력
        print("\n" + "="*60)
        print("낙상 감지 결과 요약")
        print("="*60)
        
        if not self.falldown_events:
            print("감지된 낙상 이벤트가 없습니다.")
        else:
            print(f"총 낙상 이벤트 수: {len(self.falldown_events)}")
            print()
            
            for i, event in enumerate(self.falldown_events, 1):
                print(f"[낙상 이벤트 #{i}]")
                print(f"  시작 시간: {self.format_time(event['start_time'])}")
                print(f"  종료 시간: {self.format_time(event['end_time'])}")
                print(f"  지속 시간: {event['duration']:.1f}초")
                print(f"  프레임 범위: {event['start_frame']} - {event['end_frame']}")
                print(f"  경고 발생: {'예' if event['warning_triggered'] else '아니오'}")
                print()
        
        print("="*60)

In [None]:
def run(self):
        """메인 실행 루프"""
        print("비디오 파일 기반 낙상 감지 시스템을 시작합니다...")
        
        # 비디오 초기화
        if not self.initialize_video():
            return
        
        print(f"입력 파일: {self.input_video_path}")
        print(f"출력 파일: {self.output_video_path}")
        print("비디오 처리를 시작합니다...")
        
        start_time = time.time()
        
        try:
            while True:
                ret, frame = self.cap.read()
                
                if not ret:
                    print("비디오 처리가 완료되었습니다.")
                    break
                
                self.current_frame += 1
                
                # 객체 탐지
                results = self.detect_objects(frame)
                
                # 탐지 결과 처리
                frame = self.process_detections(results, frame)
                
                # 상태 정보 표시
                frame = self.draw_status(frame)
                
                # 출력 비디오에 프레임 쓰기
                self.out.write(frame)
                
                # 진행률 출력 (매 100프레임마다)
                if self.current_frame % 100 == 0:
                    progress = (self.current_frame / self.total_frames) * 100
                    current_time = self.get_current_time_seconds()
                    print(f"처리 진행률: {progress:.1f}% (시간: {self.format_time(current_time)})")
                
        except KeyboardInterrupt:
            print("\n사용자에 의해 중단되었습니다.")
        
        except Exception as e:
            print(f"오류 발생: {e}")
        
        finally:
            processing_time = time.time() - start_time
            print(f"총 처리 시간: {processing_time:.2f}초")
            
            self.cleanup()
            self.print_summary()

In [None]:
def cleanup(self):
        """리소스 정리"""
        print("시스템을 종료합니다...")
        if self.cap:
            self.cap.release()
        if self.out:
            self.out.release()
        cv2.destroyAllWindows()

In [None]:
# 실행 부분
if __name__ == "__main__":
    # 파일 경로 설정
    input_file = 'HK_Falldown.mp4'
    output_file = 'output.mp4'
    model_file = 'trained_yolov8m.pt'
    
    # 입력 파일 존재 확인
    if not os.path.exists(input_file):
        print(f"오류: 입력 파일을 찾을 수 없습니다: {input_file}")
        print("현재 디렉토리에 'input.mp4' 파일을 넣어주세요.")
    elif not os.path.exists(model_file):
        print(f"오류: 모델 파일을 찾을 수 없습니다: {model_file}")
        print("현재 디렉토리에 'trained_yolov8m.pt' 파일을 넣어주세요.")
    else:
        # 시스템 초기화 및 실행
        detector = VideoFalldownDetectionSystem(model_file, input_file, output_file)
        detector.run()
        
        print(f"\n출력 파일이 생성되었습니다: {output_file}")