# Лабораторная работа 6. Обработка видео

## Задача 1

Локально на своем компьютере выполните детекцию движения, движущиеся объекты на видео либо потока с камеры должны выделяться контуром. Добейтесь, чтобы неподвижные предметы контуром не выделялись.

1. Получите поток видео
2. Используйте cv2.absdiff(frame1, frame2), чтобы выявить разницу между кадрами.
3. Переведите результат в полутоновой вид
4. Проведите пороговую бинаризацию
5. Найдите контуры
6. Наложите контуры поверх frame1
7. Выведите frame1

In [None]:
import cv2
import numpy as np

In [8]:
# Инициализация захвата видео (из камеры или файла)
cap = cv2.VideoCapture('car_flow.mp4') 
ret, frame1 = cap.read()
frame1_gray = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

while True:
    ret, frame2 = cap.read()
    if not ret:
        break

    frame2_gray = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    diff = cv2.absdiff(frame1_gray, frame2_gray)

    # пороговая бинаризация
    _, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY)

    # Находим контуры
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    for contour in contours:
        if cv2.contourArea(contour) > 120:
            cv2.drawContours(frame1, [contour], -1, (0, 255, 0), 2)

    cv2.imshow('Motion Detection', frame1)

    frame1_gray = frame2_gray.copy()
    frame1 = frame2.copy()

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


## Задача 2

Реализуйте на примере какого-нибудь видео трекинг на основе Deep-Sort

In [None]:
from ikomia.dataprocess.workflow import Workflow
from ikomia.utils.displayIO import display # в локальной версии для вывода
import matplotlib.pyplot as plt
import cv2


# Replace 'your_video_path.mp4' with the actual video file path
input_video_path = 'car_flow.mp4'
output_video_path = 'deepsort_output_video.avi'

# Init your workflow
wf = Workflow()

# Add object detection algorithm
detector = wf.add_task(name="infer_yolo_v7", auto_connect=True)

# Add deepsort tracking algorithm
tracking = wf.add_task(name="infer_deepsort", auto_connect=True)

tracking.set_parameters({
    "categories": "all",
    "conf_thres": "0.5",
})

# Open the video file
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

# Get video properties for the output
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_rate = cap.get(cv2.CAP_PROP_FPS)

# Define the codec and create VideoWriter object
# The 'XVID' codec is widely supported and provides good quality
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(output_video_path, fourcc, frame_rate, (frame_width, frame_height))

while True:
    end, frame = cap.read()

    # Test if the video has ended or there is an error
    if not end:
        print("Info: End of video or error.")
        break

    # Run the workflow on current frame
    wf.run_on(array=frame)

    # Get results
    image_out = tracking.get_output(0)
    obj_detect_out = tracking.get_output(1)

    # Convert the result to BGR color space for displaying
    img_out = image_out.get_image_with_graphics(obj_detect_out)
    img_res = cv2.cvtColor(img_out, cv2.COLOR_RGB2BGR)

    # Save the resulting frame
    out.write(img_out)

    #  Display
    cv2.imshow('video', img_res)

    # Press 'q' to quit the video processing
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# After the loop release everything
cap.release()
out.release()
cv2.destroyAllWindows()

## Задача 3

Реализуйте на примере какого-нибудь видео из встроенных в VideoAssets трекинг на основе ByteTrack с подсчетом объектов, пересекающих заданную вами линию.

In [None]:
import os
import numpy as np
HOME = os.getcwd()
print(HOME)

!pip install ultralytics==8.3.19

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

!pip install supervision[assets]==0.24.0
import supervision as sv
print("supervision.__version__:", sv.__version__)

from ultralytics import YOLO

model = YOLO("yolov8x.pt")

SOURCE_VIDEO_PATH = "car_flow.mp4"

# получаем словарь соответствия class_id и class_name
CLASS_NAMES_DICT = model.model.names

# выбираем классы
SELECTED_CLASS_NAMES = ['car', 'motorcycle', 'bus', 'truck']

# id выбранных классов
SELECTED_CLASS_IDS = [
    {value: key for key, value in CLASS_NAMES_DICT.items()}[class_name]
    for class_name
    in SELECTED_CLASS_NAMES
]

# генератор фреймов
generator = sv.get_video_frames_generator(SOURCE_VIDEO_PATH)
# определяем BoxAnnotator and LabelAnnotator
box_annotator = sv.BoxAnnotator(thickness=1)
label_annotator = sv.LabelAnnotator(text_thickness=1, text_scale=0.5, text_color=sv.Color.BLACK, text_padding=2)

# получаем первый видеокадр
iterator = iter(generator)
frame = next(iterator)

# прогнозирование модели по одному кадру
results = model(frame, verbose=False)[0]

# преобразование в Detections
detections = sv.Detections.from_ultralytics(results)
# учитываем только идентификатор класса из выбранных_классов, определенных выше
detections = detections[np.isin(detections.class_id, SELECTED_CLASS_IDS)]

# форматирование custom labels, т.е. определяем как будут выглядеть custom labels
labels = [
    f"{CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
    for confidence, class_id in zip(detections.confidence, detections.class_id)
]

# аннотирование и отображение рамки
annotated_frame = frame.copy()
annotated_frame = box_annotator.annotate(
    scene=annotated_frame, detections=detections)
annotated_frame = label_annotator.annotate(
    scene=annotated_frame, detections=detections, labels=labels)

%matplotlib inline
sv.plot_image(annotated_frame, (15, 10))

# настройки линии
LINE_START = sv.Point(0 + 50, 1500)
LINE_END = sv.Point(3840 - 50, 1500)

TARGET_VIDEO_PATH = f"{HOME}/result.mp4"

sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# создать экземпляр BYTETracker
byte_tracker = sv.ByteTrack(
    track_activation_threshold=0.25, # Порог достоверности обнаружения для активации трека
    lost_track_buffer=30, # Количество кадров для буферизации при потере трека
    minimum_matching_threshold=0.8, # Порог для сопоставления треков с обнаружениями.
    frame_rate=30,
    minimum_consecutive_frames=3) # Количество последовательных кадров, в течение которых объект должен отслеживаться, прежде чем он будет считаться «действительным» треком.

byte_tracker.reset()

# создать экземпляр VideoInfo
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# создать frame generator
generator = sv.get_video_frames_generator(SOURCE_VIDEO_PATH)

# создайте экземпляр LineZone, ранее он назывался классом LineCounter
line_zone = sv.LineZone(start=LINE_START, end=LINE_END)

# создание BoxAnnotator, LabelAnnotator, и TraceAnnotator
box_annotator = sv.BoxAnnotator(thickness=2)
label_annotator = sv.LabelAnnotator(text_thickness=1, text_scale=0.5, text_color=sv.Color.BLACK, text_padding=2)
trace_annotator = sv.TraceAnnotator(thickness=2, trace_length=50)

# создание экземпляра LineZoneAnnotator, ранее он назывался классом LineCounterAnnotator
line_zone_annotator = sv.LineZoneAnnotator(thickness=4, text_thickness=4, text_scale=2)

# define call back function to be used in video processing
def callback(frame: np.ndarray, index: int) -> np.ndarray:
    # model prediction on single frame and conversion to supervision Detections
    results = model(frame, verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)
    # only consider class id from selected_classes define above
    detections = detections[np.isin(detections.class_id, SELECTED_CLASS_IDS)]
    # tracking detections
    detections = byte_tracker.update_with_detections(detections)
    labels = [
        f"#{tracker_id} {model.model.names[class_id]} {confidence:0.2f}"
        for confidence, class_id, tracker_id
        in zip(detections.confidence, detections.class_id, detections.tracker_id)
    ]
    annotated_frame = frame.copy()
    annotated_frame = trace_annotator.annotate(
        scene=annotated_frame, detections=detections)
    annotated_frame = box_annotator.annotate(
        scene=annotated_frame, detections=detections)
    annotated_frame = label_annotator.annotate(
        scene=annotated_frame, detections=detections, labels=labels)

    # update line counter
    line_zone.trigger(detections)
    # return frame with box and line annotated result
    return line_zone_annotator.annotate(annotated_frame, line_counter=line_zone)

# process the whole video
sv.process_video(
    source_path = SOURCE_VIDEO_PATH,
    target_path = TARGET_VIDEO_PATH,
    callback=callback
)

