In [1]:
import os
import time
import argparse
from datetime import datetime
from collections import defaultdict, deque

import cv2
import numpy as np
import requests
import json

from detector import YOLOv8
from utils import compute_polygon_intersection

In [2]:
class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points

        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(
                reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

In [3]:
SOURCE = np.array([
    [1252, 787], 
    [2298, 803], 
    [5039, 2159], 
    [-550, 2159]
])

TARGET_WIDTH = 25
TARGET_HEIGHT = 250

TARGET = np.array([
    [0, 0],
    [24, 0],
    [24, 249],
    [0, 249],
])
view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

In [3]:
response = requests.get(r'http://localhost:5000/api/events/1711485879.618846-sdaj55').text
response = json.loads(response)
response['end_time'] is None

False

In [22]:
response = requests.get(r'http://localhost:5000/api/config').text
response = json.loads(response)
address = response["cameras"]['test']['ffmpeg']['inputs'][0]['path']
address.replace('*', 'login', 1).replace('*', 'password', 1)

'rtsp://login:password@192.168.0.39:1935'

In [1]:
!pwd

/home/pc/frigate_test/mqtt/speed_estimation


In [4]:
import cv2
import numpy as np
from collections import defaultdict, deque
import sys
sys.path.append('../')

from request_utils import *
from detector import YOLOv8
from view_transformer import view_transformer


def speed_estimation(address: str):
    """
    Speed estimation process

    :camera: str - camera name
    :event_id: str - id of the event
    :permitted_speed: int - permitted speed to move
    """
    model_path = r'./forklift_human_new.onnx'
    model_path = r'./yolov9c.onnx'

    yolov8_detector = YOLOv8(path=model_path,
                             conf_thres=0.3,
                             iou_thres=0.5)

    # Get camera address
    # address = f'rtsp://localhost:8554/{camera}'
    # address = get_camera_address(camera, login, password)
    # address = get_camera_address_from_config(camera)

    # Videocapturing
    cv2.namedWindow('stream', cv2.WINDOW_NORMAL)
    cap = cv2.VideoCapture(address)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    print(f'{fps} {width} {height}')

    # Videowriting
    # directory = '/storage/' + datetime.now().strftime(r'%d.%m.%Y/')
    # if not os.path.isdir(directory):
    #     os.mkdir(directory)
    # filename = directory + camera + \
    #     datetime.now().strftime(r'_%H:%M:%S_') + event_id + '.mp4'
    # out = cv2.VideoWriter(filename, fourcc, fps, (width, height))
    # print(filename)

    # Get end time of the event
    # end_time = get_end_time(event_id)

    # For affine transforms
    # SOURCE, TARGET = get_transform_points(camera='test')
    SOURCE = np.array([
        [10, 10],
        [630, 10],
        [630, 470],
        [10, 470]
    ])
    TARGET_WIDTH = 200
    TARGET_HEIGHT = 200
    TARGET = np.array([
        [0, 0],
        [TARGET_WIDTH - 1, 0],
        [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
        [0, TARGET_HEIGHT - 1],
    ])

    # fps = 5
    print(f'get_transform_points {SOURCE} {TARGET}')
    coordinates = defaultdict(lambda: deque(maxlen=fps))
    transformer = view_transformer(source=SOURCE, target=TARGET)

    # Maximal detected speed
    max_detected_speed = 0

    while cap.isOpened(): # and end_time is None:
        # Кадр с камеры
        ret, frame = cap.read()
        if not ret:
            break

        # Detecting
        detected_img = frame.copy()
        bounding_boxes, scores, class_ids = yolov8_detector(detected_img)
        # print(bounding_boxes)
        bounding_boxes = np.array(bounding_boxes)[class_ids == 7]
        detected_img = yolov8_detector.draw_detections(detected_img)
        if detected_img is None:
            continue

        # iou fix if len == 1
        if len(bounding_boxes) == 1 or bounding_boxes.shape[0] == 1:
            # bounding_boxes = np.array([bounding_boxes])
            bounding_boxes = np.array(bounding_boxes).reshape(1, -1)

        # Bottom center anchors
        points = np.array([[x_1 + x_2 / 2, y]
                          for [x_1, _, x_2, y] in bounding_boxes])
        points = transformer.transform_points(points=points).astype(int)

        # for class_id, [_, y] in zip(class_ids, points):
        for class_id, point in enumerate(points):
            coordinates[class_id].append(point)

        print(coordinates)
        
        # for class_id, bounding_box in zip(class_ids, bounding_boxes):
        for class_id, bounding_box in enumerate(bounding_boxes):
            # wait to have enough data
            if len(coordinates[class_id]) > fps / 2:
                # calculate the speed
                x_start = coordinates[class_id][-1][0]
                x_end = coordinates[class_id][0][0]
                y_start = coordinates[class_id][-1][1]
                y_end = coordinates[class_id][0][1]
                distance = np.sqrt((x_end - x_start)**2 + (y_end - y_start)**2) / 10
                print(f'{x_start} {x_end}   {y_start} {y_end}')
                # distance = abs(coordinate_start - coordinate_end)
                time = len(coordinates[class_id]) / fps
                print(f'{distance}   {time}')
                speed = round(distance / time * 3.6, 2)

                max_detected_speed = speed if speed > max_detected_speed else max_detected_speed

                # Caption on the frame
                caption = f'{speed} km/h'  # caption
                font = cv2.FONT_HERSHEY_SIMPLEX  # font
                fontScale = 1  # fontScale
                thickness = 2  # Line thickness of 2 px
                x_1 = bounding_box[0]
                y_1 = bounding_box[1]
                x_2 = bounding_box[2]
                y_2 = bounding_box[3]
                # Using cv2.putText() method
                cv2.putText(detected_img, caption, (int(x_1 + 2), int(y_1 + (y_2 - y_1) / 2)),
                            font, fontScale, (255, 0, 0), thickness, cv2.LINE_AA)

        cv2.imshow('stream', detected_img)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        # Writing frame to file
        # out.write(detected_img)  # frame

    cv2.destroyAllWindows()
    cap.release()
    # out.release()

# gen(r'rtsp://admin:admin@192.168.0.39:1935')
# speed_estimation(r'rtsp://localhost:8854/stream0.sdp')
speed_estimation(0)

30 640 480
get_transform_points [[ 10  10]
 [630  10]
 [630 470]
 [ 10 470]] [[  0   0]
 [199   0]
 [199 199]
 [  0 199]]
defaultdict(<function speed_estimation.<locals>.<lambda> at 0x7f0efa7bfbe0>, {0: deque([array([101, 128])], maxlen=30)})
defaultdict(<function speed_estimation.<locals>.<lambda> at 0x7f0efa7bfbe0>, {0: deque([array([101, 128]), array([101, 128])], maxlen=30)})
defaultdict(<function speed_estimation.<locals>.<lambda> at 0x7f0efa7bfbe0>, {0: deque([array([101, 128]), array([101, 128]), array([101, 128])], maxlen=30)})
defaultdict(<function speed_estimation.<locals>.<lambda> at 0x7f0efa7bfbe0>, {0: deque([array([101, 128]), array([101, 128]), array([101, 128]), array([101, 128])], maxlen=30)})
defaultdict(<function speed_estimation.<locals>.<lambda> at 0x7f0efa7bfbe0>, {0: deque([array([101, 128]), array([101, 128]), array([101, 128]), array([101, 128]), array([101, 128])], maxlen=30)})
defaultdict(<function speed_estimation.<locals>.<lambda> at 0x7f0efa7bfbe0>, {0: de

In [2]:
import cv2


def stream(address):

    cv2.namedWindow('stream', cv2.WINDOW_NORMAL)
    cap = cv2.VideoCapture(address)
    # fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    # width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    # height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    # fps = int(cap.get(cv2.CAP_PROP_FPS))

    while cap.isOpened():
        # Кадр с камеры
        ret, frame = cap.read()
        if not ret:
            break

        cv2.imshow('stream', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            

    cv2.destroyAllWindows()
    cap.release()

# stream(r'rtsp://localhost:8554/test')
# stream(r'rtsp://localhost:8854/cam1')
stream(r'rtsp://admin:admin@192.168.0.39:1935/asdasdas')
# stream(r'http://@192.168.0.39:8080/video')

In [64]:
import yaml

with open('../.././config/config.yml', 'r') as file:
    config = yaml.safe_load(file)

config['cameras']['test']['ffmpeg']['inputs'][0]['path']

'rtsp://mediamtx-server:8554/stream0.sdp'

In [1]:
from detector import YOLOv8
import cv2

def gen(address):#, camera, event_id):
    """
    Запуск модели
    """    
    model_path = r'./yolov9c.onnx'
    yolov8_detector = YOLOv8(path=model_path,
                             conf_thres=0.3,
                             iou_thres=0.5)
    
    cv2.namedWindow('stream', cv2.WINDOW_NORMAL)
    cap = cv2.VideoCapture(address)


    while cap.isOpened():# and end_time is None:
        # Кадр с камеры
        ret, frame = cap.read()
        if not ret:
            break

        # Детектирование
        detected_img = frame.copy()
        bounding_boxes, scores, class_ids = yolov8_detector(detected_img)
        # print(class_ids)
        # bounding_boxes = np.array(bounding_boxes)
        detected_img = yolov8_detector.draw_detections(detected_img)

        if detected_img is None:
            continue


        cv2.imshow('stream', detected_img)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break


    cv2.destroyAllWindows()
    cap.release()

gen(0)

In [5]:
classes = {
    0: 'person',
    1: 'bicycle',
    2: 'car',
    3: 'motorcycle',
    4: 'airplane',
    5: 'bus',
    6: 'train',
    7: 'truck',
    8: 'boat',
    9: 'traffic light',
    10: 'fire hydrant',
    11: 'stop sign',
    12: 'parking meter',
    13: 'bench',
    14: 'bird',
    15: 'cat',
    16: 'dog',
    17: 'horse',
    18: 'sheep',
    19: 'cow',
    20: 'elephant',
    21: 'bear',
    22: 'zebra',
    23: 'giraffe',
    24: 'backpack',
    25: 'umbrella',
    26: 'handbag',
    27: 'tie',
    28: 'suitcase',
    29: 'frisbee',
    30: 'skis',
    31: 'snowboard',
    32: 'sports ball',
    33: 'kite',
    34: 'baseball bat',
    35: 'baseball glove',
    36: 'skateboard',
    37: 'surfboard',
    38: 'tennis racket',
    39: 'bottle',
    40: 'wine glass',
    41: 'cup',
    42: 'fork',
    43: 'knife',
    44: 'spoon',
    45: 'bowl',
    46: 'banana',
    47: 'apple',
    48: 'sandwich',
    49: 'orange',
    50: 'broccoli',
    51: 'carrot',
    52: 'hot dog',
    53: 'pizza',
    54: 'donut',
    55: 'cake',
    56: 'chair',
    57: 'couch',
    58: 'potted plant',
    59: 'bed',
    60: 'dining table',
    61: 'toilet',
    62: 'tv',
    63: 'laptop',
    64: 'mouse',
    65: 'remote',
    66: 'keyboard',
    67: 'cell phone',
    68: 'microwave',
    69: 'oven',
    70: 'toaster',
    71: 'sink',
    72: 'refrigerator',
    73: 'book',
    74: 'clock',
    75: 'vase',
    76: 'scissors',
    77: 'teddy bear',
    78: 'hair drier',
    79: 'toothbrush'
}

In [10]:
for id, value in classes:
    print(f'{id}: {value}')

person
bicycle
car
motorcycle
airplane
bus
train
truck
boat
traffic light
fire hydrant
stop sign
parking meter
bench
bird
cat
dog
horse
sheep
cow
elephant
bear
zebra
giraffe
backpack
umbrella
handbag
tie
suitcase
frisbee
skis
snowboard
sports ball
kite
baseball bat
baseball glove
skateboard
surfboard
tennis racket
bottle
wine glass
cup
fork
knife
spoon
bowl
banana
apple
sandwich
orange
broccoli
carrot
hot dog
pizza
donut
cake
chair
couch
potted plant
bed
dining table
toilet
tv
laptop
mouse
remote
keyboard
cell phone
microwave
oven
toaster
sink
refrigerator
book
clock
vase
scissors
teddy bear
hair drier
toothbrush
