In [1]:
import sys, time
import numpy as np
import torch
import cv2
from ultralytics import YOLO
from collections import deque

In [2]:
def euclidean_dist(x1, y1, x2, y2):
    return ((x1-x2)**2 + (y1-y2)**2)**0.5

In [3]:
def extend_line(img, forklift_deque, color, thickness):
    """
    forklift 의 최근 n개 프레임 정보를 사용해서 진행 방향을 구하고, 사진 상에서의 양 끝점을 구하는 함수
    n개 프레임 정보가 저장된 deque 내의 가장 첫 값과 끝 값을 사용해서 두 점을 잇는 직선을 구한다.
    - forklift_deque : forklift의 바운딩 박스 좌표를 저장하는 deque 객체 
    """
    
    # deque 내의 값이 충분하지 않을 경우 종료
    deque_len = len(forklift_deque)
    if deque_len <= 1:
        return
    
    # 대상 사진의 높이, 너비
    height, width, _ = img.shape
    
    x1, y1, _, _ = forklift_deque[0]
    x2, y2, _, _ = forklift_deque[-1]
    
    dx = x2 - x1
    dy = y2 - y1
    grad = dy / dx
    
    if dx == 0: # 세로선
        cv2.line(img, (x1, 0), (x1, height), color, thickness)
    elif dy == 0:   # 가로선
        cv2.line(img, (0, y1), (width, y1), color, thickness)
    else:
        points = []
        
        # left border (x=0)
        y = y1 - x1 * grad
        if 0 <= y <= height:
            points.append((0, int(y)))
        
        # Right border (x=width)
        y = y1 + (width - x1) * grad
        if 0 <= y <= height:
            points.append((width, int(y)))
        
        # Top border (y=0)
        x = x1 - y1 / grad
        if 0 <= x <= width:
            points.append((int(x), 0))
        
        # Bottom border (y=height)
        x = x1 + (height - y1) / grad
        if 0 <= x <= width:
            points.append((int(x), height))
        
        if len(points) == 2:
            cv2.line(img, points[0], points[1], color, thickness)

In [4]:
def extend_route_wrong(forklift_deque, img_width, img_height):
    """ [오류 있음] 쓰지 말 것 """
    deque_len = len(forklift_deque)
    if deque_len == 0:
        return
    
    x1, y1, _, _ = forklift_deque[0]
    x2, y2, _, _ = forklift_deque[-1]
    
    grad = (y2 - y1) / (x2 - x1)
    
    x_intercept = x1 - (y1 / grad)
    y_intercept = y1 - x1 * grad
    
    if x_intercept > img_width:
        x3, y3 = (img_width, grad * (img_width - x1) + y1)
    else:
        x3, y3 = (x_intercept, 0)
    
    if y_intercept > img_height:
        x4, y4 = ((img_height - y1) / grad + x1, 0)
    else:
        x4, y4 = (0, y_intercept)
    
    return x3, y3, x4, y4   # 사진에서 그려지는 직선의 양 끝점 반환

In [5]:
def calculate_route_coefs(forklift_deque):
    """
    forklift 의 최근 n개 프레임 정보를 사용해서 진행 방향의 음함수 계수를 구하는 함수
    deque 내의 가장 첫 값과 끝 값을 사용해서 두 점을 잇는 직선을 구한다. (음함수 식 ax+by+c=0)
    - forklift_deque : forklift의 바운딩 박스 좌표를 저장하는 deque 객체
    """
    
    # deque 내의 값이 충분하지 않을 경우 종료
    deque_len = len(forklift_deque)
    if deque_len <= 1:
        return
    
    x1, y1, _, _ = forklift_deque[0]
    x2, y2, _, _ = forklift_deque[-1]
    
    dx = x2 - x1
    dy = y2 - y1
    grad = dy / dx
    
    # 음함수 식 ax+by+c=0
    if dx == 0:
        a, b, c = 1, 0, -x1
    elif dy == 0:
        a, b, c = 0, 1, -y1
    else:
        a, b, c = grad, -1, y1 - (a * x1)
    
    return a, b, c

In [6]:
def detect_danger_between_forklift_and_person(forklift_deque, person_bbox):
    """ [여러 사람을 대상으로 작동할 수 있도록 수정 필요]
    forklift의 예상 진행 경로를 계산하고, 어떤 한 사람이 그 경로로부터 충분히 떨어져 있는지 판단하는 함수
    - forklift_deque : forklift의 바운딩 박스 좌표 여러 개를 저장하는 deque 객체
    - person_bbox : person의 바운딩 박스 좌표를 저장하는 리스트 객체
    """
    
    coefs = calculate_route_coefs(forklift_deque)
    if not coefs: 
        return
    
    a, b, c = coefs
    x1, y1, w1, h1 = person_bbox
    dist = abs(a * x1 + b * y1 + c) / (a**2 + b**2)**0.5
    
    _, _, w2, h2 = forklift_deque[-1]
    forklift_len = (w2**2 + h2**2)**0.5
    person_len = (w1**2 + h2**2)**0.5
    
    danger_flag = True if (forklift_len + person_len) * 0.5 >= dist else False
    return danger_flag

In [7]:
# 커스텀 모델 불러오기
model = YOLO(r'./runs/detect/yolov8n_custom_1280x7204/weights/best.pt')

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# GPU 설정 (predict)
# model.to(DEVICE)

In [8]:
# 비디오 파일 로드
video_file = "./datasets/short.mp4"
cap = cv2.VideoCapture(video_file)

# 비디오 객체가 열렸는지 확인
if not cap.isOpened():
    print("Video open failed!")
    sys.exit()

### 비디오 생성 준비
fourcc = cv2.VideoWriter_fourcc(*'DIVX')
w = round(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = round(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# 비디오의 너비, 높이, fps
w, h, fps

(1920, 1080, 29.733460762450886)

In [9]:
# 전체 프레임 detect 데이터를 저장할 리스트 (테스트용)
detect_results = []

In [10]:
# 최근 n개 프레임을 저장할 데크(deque) 객체 생성
recent_frames = deque(maxlen=3)

# 프레임 간격 설정 (가변적)
frame_interval = 5

# forklift, person valid flag
forklift_valid, person_valid = False, False # 초기값 : False

# forklift, person count (일정 프레임 이상 존재할 경우 객체 탐지로 인정)
# forklift_cnt, person_cnt = 0, 0   # 아직 threshold 설정 안 함

# 1프레임씩 읽으며 위험상황 처리
while True:
    # 트래킹 중인 지게차가 없다면 데크 초기화
    if not forklift_valid:
        recent_frames.clear()
    
    # 비디오에서 현재 프레임의 위치
    current_frame_pos = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
    
    # 카메라의 ret, frame 값 가져오기
    # - ret : boolean (success or not)
    # - frame : image array vector
    ret, frame = cap.read()
    
    if not ret: 
        break
    
    if current_frame_pos % frame_interval == 0:
        # YOLOv8 모델 적용
        # results[0] : ultralytics.engine.results.Results
        results = model.track(frame, conf=0.7)
        
        # Visualize the results on the frame
        annotated_frame = results[0].plot()
        detect_results.append(results[0])
        
        # 지게차가 있는지 확인
        if 2 in results[0].boxes.cls:
            forklift_valid = True
            idx = results[0].boxes.cls.tolist().index(2)
            recent_frames.append(results[0].boxes.xywh.tolist()[idx])
        else:
            forklift_valid = False
        
        # 사람이 있는지 확인
        if 0 in results[0].boxes.cls:
            person_valid = True
            idx = results[0].boxes.cls.tolist().index(0)
            person_frame = results[0].boxes.xywh.tolist()[idx]
            # 지게차 예상 진행 루트와의 직선 거리를 계산해서 위험여부를 알려줌
            if detect_danger_between_forklift_and_person(recent_frames, person_frame):
                # [위험상황 발생 시각 저장 기능] => 구현 예정
                cv2.putText(annotated_frame, 'collision risk occurred', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0))
                print('collision risk occurred')
        else:
            person_valid = False
        
        # 예상 진행 루트 표시 (직선)
        if len(recent_frames) >= 2:
            x1, y1, _, _ = recent_frames[0]
            x2, y2, _, _ = recent_frames[-1]
            extend_line(annotated_frame, recent_frames, (0, 0, 255), 5)
            # cv2.line(annotated_frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 10)
            dist = euclidean_dist(x1, y1, x2, y2)
            cv2.putText(annotated_frame, f'Dist : {dist}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0))
        
        # 이미지 크기 조정
        scale_factor = 0.5
        resized_frame = cv2.resize(annotated_frame, None, fx=scale_factor, fy=scale_factor)
        
        # 어노테이션된 프레임을 표시
        cv2.imshow('RESULT IMAGE', resized_frame)
    
    # 'q'가 눌리면 루프를 중단
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 비디오 캡처 객체를 해제하고 표시 창 닫기
cap.release()
cv2.destroyAllWindows()


0: 384x640 (no detections), 143.8ms
Speed: 0.0ms preprocess, 143.8ms inference, 2633.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 94.4ms
Speed: 14.5ms preprocess, 94.4ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Forklift, 87.1ms
Speed: 0.0ms preprocess, 87.1ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Forklift, 124.5ms
Speed: 0.0ms preprocess, 124.5ms inference, 8.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Forklift, 135.7ms
Speed: 15.8ms preprocess, 135.7ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Forklift, 147.6ms
Speed: 0.0ms preprocess, 147.6ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Forklift, 129.0ms
Speed: 0.0ms preprocess, 129.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Forklift, 142.4ms
Speed: 0.0ms preprocess, 142.4ms inference, 0.0

In [8]:
len(detect_results), len(recent_frames)

(45, 0)

In [28]:
recent_frames[2]

[938.2415771484375, 266.6779479980469, 554.625, 533.3558959960938]

In [4]:
results[0].boxes.xywh.tolist()[0]

[610.776611328125, 150.93067932128906, 411.310791015625, 301.8613586425781]

In [15]:
import inspect

def get_attributes_info(obj):
    """ 객체가 갖고 있는 속성들의 종류를 리스트로 반환하는 함수 """
    if obj is None: return

    return [attr for attr, value in inspect.getmembers(obj)
            if not callable(value) and not attr.startswith('__')]

In [16]:
### ultralytics.engine.results.Results 속성들
temp = get_attributes_info(results[0])
for t in temp:
    print(t)

# https://docs.ultralytics.com/reference/engine/results/#ultralytics.engine.results.BaseTensor.__len__
# - boxes : Object containing detection bounding boxes.
# - names : Dictionary of class names.
# - orig_img : Original image as a numpy array.
# - orig_shape : Original image shape in (height, width) format.
# - speed : Dictionary of preprocess, inference, and postprocess speeds (ms/image).

_keys
boxes
keypoints
masks
names
obb
orig_img
orig_shape
path
probs
save_dir
speed


In [17]:
### ultralytics.engine.results.Boxes 속성들
temp = get_attributes_info(results[0].boxes)
for t in temp:
    print(t)

# https://docs.ultralytics.com/reference/engine/results/#ultralytics.engine.results.Results.summary
# - cls : the class values of the boxes.
# - conf : the confidence values of the boxes.
# - data : the raw tensor containing detection boxes and their associated data.
# - id : the track IDs of the boxes (if available).
# - is_track : 	Indicates whether tracking IDs are included in the box data.
# - orig_shape : The original image size as a tuple (height, width), used for normalization.
# - xywh : the boxes in xywh format.
# - xywhn : the boxes in xywh format normalized by original image size.
# - xyxy : the boxes in xyxy format.
# - xyxyn : the boxes in xyxy format normalized by original image size.

cls
conf
data
id
is_track
orig_shape
shape
xywh
xywhn
xyxy
xyxyn


In [13]:
detect_results[27].boxes.xywh

tensor([[1107.2329,  278.1606,  590.8399,  552.2476]])

In [83]:
detect_results[27].boxes.id

tensor([1.])

In [3]:
### 사진 파일 로드
# img_file = "./datasets/multiple_classes.jpg"
# img = cv2.imread(img_file)

### 사진 예측
# results = model.predict(img, conf=0.5)


0: 384x640 2 Persons, 1 Forklift, 88.5ms
Speed: 4.0ms preprocess, 88.5ms inference, 1578.0ms postprocess per image at shape (1, 3, 384, 640)


In [7]:
### 비디오 예측
# video_file = "./datasets/short.mp4"
# results = model.predict(source= video_file, save=True, conf=0.7, device=DEVICE)



errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

video 1/1 (frame 1/224) c:\Users\kdp\PycharmProjects\KDT_MNVISION\BJY\yolo\v9\datasets\short.mp4: 640x640 (no detections), 54.4ms
video 1/1 (frame 2/224) c:\Users\kdp\PycharmProjects\KDT_MNVISION\BJY\yolo\v9\datasets\short.mp4: 640x640 (no detections), 51.9ms
video 1/1 (frame 3/224) c:\Users\kdp\PycharmProjects\KDT_MNVISION\BJY\yolo\v9\datasets\short.mp4: 640x640 1 Forklift(D), 52.6ms
video 1/1 (frame 4/224) c:\Users\kdp\PycharmProjects\KDT_MNVISION\BJY\yolo\v9\datasets\short.mp4: 640x640 1 Forklift(D), 61.4ms
video 1/1 (frame 5/224) c:\Users\kdp\PycharmProjects\KDT