In [86]:
import os
import sys
from collections import defaultdict
import json
import typing as t

import numpy as np
import cv2
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

from pydantic import BaseModel

In [87]:
class DetectionResult(BaseModel):
    # Имя класса
    detection_class: str
    # Порядковый идентификатор класса в списке классов
    detection_class_id: int
    # Порядковый идентификатор объекта в кадре
    # (для модели мячей - номер кадра из входной последовательности
    object_number: int
    # Cырой идентификатор объекта после базового трекера (например, bytetrack)
    track_id_raw: int
    # Финальный идентификатор объекта после дополнительной фильтрации
    track_id: int
    # Уверенность в найденном объекте
    score: float
    # Абсолютный bbox найденного объекта
    box: t.Optional[t.Tuple[int, int, int, int]] = None
    # Относительный bbox найденного объекта
    box_rel: t.Optional[t.Tuple[float, float, float, float]] = None
    # Абсолютная x-координата центра/низа найденного объекта в кадре
    x_frame: t.Optional[float] = None
    # Абсолютная y-координата центра/низа найденного объекта в кадре
    y_frame: t.Optional[float] = None
    # Абсолютная x-координата объекта в координатах корта/поля
    x_court: t.Optional[float] = None
    # Абсолютная y-координата объекта в координатах корта/поля
    y_court: t.Optional[float] = None
    # Абсолютная z-координата объекта в координатах корта/поля
    z_court: t.Optional[float] = None
    # Статус видимости объекта
    visible_status: t.Optional[int] = None
    # Мета информация
    meta: t.Optional[t.Dict] = None
    # Номер кадра из демо-скриптов
    frame_id: t.Optional[int] = None
    # Номер камеры из демо-скриптов
    camera_id: t.Optional[str] = None
    # Таймштамп
    timestamp: t.Optional[float] = None

In [6]:
def remove_ext(fname):
    return '.'.join(fname.split('.')[:-1])

## Визуализация детекций мяча

In [7]:
video_path = '/home/ubuntu/data/videos/025958_6m47s_7m13s.mp4'
ball_json = '/home/ubuntu/data/jsons/ball/unet_mbnv2_norelu6_576x1024_11ch_our_data_blob_det/025958_6m47s_7m13s.json'

camera_id = 'camera1'

with open(ball_json) as f:
    json_data = json.load(f)

ball_results_by_frame_id = defaultdict(list)
for dets_per_frame in json_data:
    for cur_det_per_frame in dets_per_frame:
        if cur_det_per_frame and cur_det_per_frame['camera_id'] == camera_id and cur_det_per_frame['detection_class'] == 'm_ball':
            obj = DetectionResult(**cur_det_per_frame)
            frame_id = obj.frame_id
            ball_results_by_frame_id[frame_id].append(obj)

In [8]:
list(ball_results_by_frame_id.values())[:3]

[[DetectionResult(detection_class='m_ball', detection_class_id=0, object_number=0, track_id_raw=0, track_id=0, score=0.5, box=None, box_rel=None, x_frame=998.6472778320312, y_frame=753.5253295898438, x_court=None, y_court=None, z_court=None, visible_status=None, meta=None, frame_id=0, camera_id='camera1', timestamp=0.0)],
 [DetectionResult(detection_class='m_ball', detection_class_id=0, object_number=0, track_id_raw=0, track_id=0, score=0.5, box=None, box_rel=None, x_frame=1008.6041870117188, y_frame=736.38134765625, x_court=None, y_court=None, z_court=None, visible_status=None, meta=None, frame_id=1, camera_id='camera1', timestamp=0.03333333333333333)],
 [DetectionResult(detection_class='m_ball', detection_class_id=0, object_number=2, track_id_raw=0, track_id=0, score=1.0, box=None, box_rel=None, x_frame=1016.9336547851562, y_frame=712.2769165039062, x_court=None, y_court=None, z_court=None, visible_status=None, meta=None, frame_id=2, camera_id='camera1', timestamp=0.06666666666666667

In [9]:
cap = cv2.VideoCapture(video_path)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

_, video_ext = os.path.splitext(video_path)

if video_ext == '.avi':
    fourcc = cv2.VideoWriter_fourcc(*'DIVX')
elif video_ext == '.mp4':
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    # fourcc = cv2.VideoWriter_fourcc(*'H264')
else:
    raise ValueError('Invalid video format.')

video_writer = cv2.VideoWriter(video_path.replace(video_ext, f'preds{video_ext}'),
                               fourcc, fps, (frame_width, frame_height))

pbar = tqdm(total=total_frames, desc=f"Processing {os.path.basename(video_path)}", unit="frame")

frame_id = -1

# Обработка кадров из видео
while cap.isOpened():

    ret, frame = cap.read()
    if not ret:
        # print('bad frame or end')
        break

    frame_id += 1
        
    # Отрисовка мяча
    if frame_id in ball_results_by_frame_id.keys():
        ball_results = ball_results_by_frame_id[frame_id]

        for ball_result in ball_results:
            ball_x = ball_result.x_frame
            ball_y = ball_result.y_frame
        
            cv2.circle(frame, (int(ball_x), int(ball_y)),
                       radius=10, color=(0, 255, 255), thickness=2)

    video_writer.write(frame)

    pbar.update(1)

    # plt.imshow(frame[:,:,::-1])
    # plt.show()
    # break

pbar.close()
cap.release()
video_writer.release()

Processing 025958_6m47s_7m13s.mp4:   0%|          | 0/917 [00:00<?, ?frame/s]