**IF THE SCRIPT IS RUN IN COLAB, WE NEED TO INSTALL TWO PACKAGES**

---



In [1]:
!pip install supervision

Collecting supervision
  Downloading supervision-0.25.0-py3-none-any.whl.metadata (14 kB)
Downloading supervision-0.25.0-py3-none-any.whl (181 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m181.5/181.5 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: supervision
Successfully installed supervision-0.25.0


In [2]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.40-py3-none-any.whl.metadata (35 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.12-py3-none-any.whl.metadata (9.4 kB)
Downloading ultralytics-8.3.40-py3-none-any.whl (898 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m898.5/898.5 kB[0m [31m23.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.12-py3-none-any.whl (26 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.40 ultralytics-thop-2.0.12


**IMPORTS**

In [None]:
from pathlib import Path
import numpy as np
import math
import os
import pandas as pd
import json
from enum import Enum
import time
from typing import List, Generator, Dict, Any, Tuple, Union, Set
from types import SimpleNamespace
from dataclasses import dataclass
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
import sys
import itertools

from IPython.display import display, Image
from shapely.geometry import box
from shapely.ops import unary_union

In [6]:
import humanize
import psutil
import GPUtil
import supervision as sv
import torch
from ultralytics import YOLO
from ultralytics.engine.results import Results
from supervision.detection.core import Detections
from supervision.tracker.byte_tracker.core import ByteTrack

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [7]:
class Labels(Enum):
    gallina_bianca = 0
    gallina_rossa = 1

In [None]:
# Define the file name and URL
file_name = "ExternDisk0_ch4_20240207130000_20240207140000.mp4"
url = "https://www.dropbox.com/scl/fi/rwz8irhljs0dm08yy2zku/ExternDisk0_ch4_20240207130000_20240207140000.mp4?rlkey=fi02g6j9hlet481hw9kk4inru&st=08kvj8ln&dl=0"

# Download the file using wget
os.system(f"wget -O {file_name} '{url}'")

print(f"File '{file_name}' scaricato nella directory corrente.")

**FUNCTIONS FOR TRACKING AND COMPUTATION OF KINETIC INDICES**

In [None]:
def annotate_frame(
    detections: Detections,
    frame: np.ndarray) -> np.ndarray:

    n = len(detections.xyxy)

    labels = []

    if detections.class_id is not None and detections.tracker_id is not None and n > 0 and len(detections.class_id) == len(detections.tracker_id) == n:
        labels = [
            f"{Labels(detections.class_id[i]).name}#{detections.tracker_id[i]}"
            for i in range(n)
        ]
    elif detections.class_id is not None and n > 0:
        labels = [
            f"{Labels(detections.class_id[i]).name}"
            for i in range(n)
        ]

    box_annotator = sv.BoxAnnotator()
    label_annotator = sv.LabelAnnotator(text_position=sv.Position.CENTER)

    annotated_frame = box_annotator.annotate(
        scene=frame,
        detections=detections
    )
    annotated_frame = label_annotator.annotate(
        scene=annotated_frame,
        detections=detections,
        labels=labels
    )

    return annotated_frame

##############################################################################

def main(
        input_video_path: str,
        start_second: int,
        end_second: int,
        output_video_path: str,
        weight_file: str,
        tracker_file: str,
        device: torch.device,
        class_of_interest: int
    ) -> Generator[
          Tuple[Dict[str, Any], List[Dict[str, Any]], List[Dict[str, Any]]],
          None,
          None
        ]:

    #################################################
    cap = cv2.VideoCapture(input_video_path)
    frame_width = None
    frame_height = None
    fps = None
    number_of_frames = 0
    fourcc = None
    output_video = None
    print(f"Video: {input_video_path}")
    #################################################
    if not cap.isOpened():
        print(f"Error: unable to open the video.")
        cap.release()
    else:
        # Get the resolution of frames
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        number_of_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if output_video_path is not None:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            output_video = cv2.VideoWriter(
                output_video_path, fourcc, fps, (frame_width, frame_height))
        tracker = None
        if FRAME_BUFFER_DIM > 1:
            tracker = ByteTrack(track_activation_threshold=0.25,
                                lost_track_buffer=30,
                                minimum_matching_threshold=0.8,
                                frame_rate=fps,
                                minimum_consecutive_frames=1)
            tracker.reset()
    #################################################
    current_batch_of_results = None
    frame_buffer_list = []
    frame_id_list = []
    batch_id = 0
    previous_avg = None
    previous_result = None
    previous_detections = None
    extreme_values_occurrences = 0
    first_frame_of_interest = 0
    last_frame_of_interest = number_of_frames
    if start_second is not None and start_second >= 0 and end_second is not None and end_second >= start_second:
        first_frame_of_interest = int(start_second*fps)
        last_frame_of_interest = int(min(end_second*fps, number_of_frames))
    cap.set(cv2.CAP_PROP_POS_FRAMES, first_frame_of_interest)

    #################################################
    model = YOLO(weight_file)
    model.to(device)
    #################################################
    for frame_id in tqdm(range(first_frame_of_interest, last_frame_of_interest)):
        ####
        if not cap.isOpened():
            break

        success, frame_hwc = cap.read()

        if not success:
            continue
        ####
        frame_buffer_list.append(frame_hwc)
        frame_id_list.append(frame_id)
        ####
        if (frame_id-first_frame_of_interest) > 0 and FRAME_BUFFER_DIM > 1 and ((frame_id-first_frame_of_interest)%FRAME_BUFFER_DIM == 0 or frame_id >= last_frame_of_interest-1):
            #print(f"frame_id_list == {frame_id_list}")
            start = time.time()
            current_batch_of_results = model.predict(  # parallel inference on the entire batch without tracking
                frame_buffer_list,
                verbose=False,
                classes=[class_of_interest])
            end = time.time()
            batch_aggregate_measurements = {
                'batch_id': int(batch_id),
                'yolo_detection_time': float(end-start)
            }

            frame_aggregate_measurements_list, bbox_predictions_list, current_batch_of_detections = process_current_batch_detection_results(
                batch_id,
                frame_id_list,
                previous_detections,
                current_batch_of_results,
                tracker,
                device,
                frame_width,
                frame_height,
                class_of_interest,
                selective_calculation)
            # frame_aggregate_measurements_list: List[Dict[str, Any]]
            # bbox_predictions_list: List[Dict[str, Any]]
            # current_batch_of_detections: List[Detections]

            if current_batch_of_detections is not None and len(current_batch_of_detections) > 0:
                previous_detections = current_batch_of_detections[-1]

            batch_id += 1
            ####
            frame_buffer_list.clear()
            frame_id_list.clear()
            ##########################################################
            """
            if output_video_path is not None:
                for frame_hwc, detections in zip(frame_buffer_list, current_batch_of_detections):
                    annotated_frame = annotate_frame(detections, frame_hwc.copy())
                    output_video.write(annotated_frame)
            """
            ##########################################################
            yield batch_aggregate_measurements, frame_aggregate_measurements_list, bbox_predictions_list
        ###
        elif FRAME_BUFFER_DIM == 1:
            start = time.time()
            current_result = model.track(  # sequential inference with tracking
                    source=frame_hwc,
                    stream=False,
                    persist=True,
                    tracker=tracker_file,
                    verbose=False,
                    show=False,
                    classes=[class_of_interest])
            end = time.time()
            frame_aggregate_measurements, bbox_predictions_list = process_current_frame_tracking_results(
                frame_id,
                previous_result,
                current_result[0],
                device,
                frame_width,
                frame_height,
                class_of_interest)
            frame_aggregate_measurements['yolo_tracking_time'] = (end-start)

            # frame_aggregate_measurements: Dict[str, Any]
            # bbox_predictions_list: List[Dict[str, Any]]
            previous_result = current_result[0]

            frame_aggregate_measurements_list = [frame_aggregate_measurements]

            yield None, frame_aggregate_measurements_list, bbox_predictions_list
        ###

    # Release the video capture object and close the display window
    cap.release()
    if output_video_path is not None:
        output_video.release()
        del output_video
        del fourcc
    if tracker is not None:
        del tracker
    del model
    # end of for loop
    # end of main

In [None]:
best_weights = os.path.join(
    'runs', 'detect', 'train_2_yolo11n', 'weights', 'best.pt')

base_folder = os.path.join('raw_video_source')

**FOR WHITE HENS**

In [11]:
input_video_name = 'ExternDisk0_ch4_20240207130000_20240207140000.mp4'
cage = 'mod_1_galline_bianche'
day = '07-02-2024'
start_second = 52*60+46
end_second = 52*60+56
class_of_interest = Labels.gallina_bianca.value

**CALCULATION OF PREDICTED KINETIC INDICES FOR THE SELECTED VIDEO**

In [None]:
####
input_video_path  = os.path.join(base_folder, 'raw_video_source', cage,
                                 day, input_video_name)
save_csv_to_drive = True
tracker_file = 'bytetrack.yaml'
#tracker_file = 'botsort.yaml'
selective_calculation = False

parameter_pairs = [(1, False)] + list(itertools.product([4, 8, 16, 32, 64, 128], [False, True]))
####
for FRAME_BUFFER_DIM, selective_calculation in parameter_pairs:

    #output_video_path = os.path.join(base_folder, 'raw_video_source', cage,
    #                                day, f'{Path(input_video_name).stem}_secs_{start_second}-to-{end_second}_FRAME_BUFFER_DIM-{FRAME_BUFFER_DIM}.mp4')
    output_video_path = None
    ####
    #output_images_dir = os.path.join(base_folder, 'raw_video_source', cage,
    #                               day, f"{Path(input_video_name).stem}_secs_{start_second}-to-{end_second}_images_FRAME_BUFFER_DIM-{FRAME_BUFFER_DIM}")
    #if not os.path.exists(output_images_dir):
    #    os.makedirs(output_images_dir)
    output_images_dir = None
    ####

    perform_tracking = True

    if perform_tracking:

        device = torch.device("cpu")
        if torch.cuda.is_available():
            device = torch.device("cuda")
            torch.cuda.empty_cache()
            # setting device on GPU if available, else CPU
            print('Using device:', device)
            #Additional Info when using cuda
            if device.type == 'cuda':
                print(torch.cuda.get_device_name(0))
                print('Memory Usage:')
                print('Allocated:', round(torch.cuda.memory_allocated(0)/1024**3,1), 'GB')
                print('Cached:   ', round(torch.cuda.memory_reserved(0)/1024**3,1), 'GB')

            mem_rep_record = mem_report()
            json_str = json.dumps(mem_rep_record, indent=4)
            print(json_str)

        frame_aggregate_measurements_list_tot = []
        bbox_predictions_list_tot = []
        batch_aggregate_measurements_list = []
        #counter = 0

        for batch_aggregate_measurements, frame_aggregate_measurements_list, bbox_predictions_list in main(input_video_path=input_video_path,
                                        start_second=start_second,
                                        end_second=end_second,
                                        output_video_path=output_video_path,
                                        weight_file=best_weights,
                                        tracker_file=tracker_file,
                                        FRAME_BUFFER_DIM=FRAME_BUFFER_DIM,
                                        device=device,
                                        class_of_interest=class_of_interest,
                                        selective_calculation=selective_calculation):

            frame_aggregate_measurements_list_tot.extend(frame_aggregate_measurements_list)
            bbox_predictions_list_tot.extend(bbox_predictions_list)
            if batch_aggregate_measurements is not None:
                batch_aggregate_measurements_list.append(batch_aggregate_measurements)
        ### end for loop #################################################
        output_labels_and_measurements_dir = os.path.join(base_folder, 'raw_video_source', 'mod_1_galline_bianche',
                                     day, f"{Path(input_video_name).stem}_secs_{start_second}-to-{end_second}_pred_({FRAME_BUFFER_DIM}-{selective_calculation})")
        if not os.path.exists(output_labels_and_measurements_dir):
            os.makedirs(output_labels_and_measurements_dir)
        ######### SAVE PREDICTED BOUNDING BOXES ##########################
        bbox_predictions_df = pd.DataFrame(bbox_predictions_list_tot)
        bbox_predictions_csv_file = os.path.join(output_labels_and_measurements_dir, "labels.csv")
        # bounding box coordinates are normalized, just as the ground truth bounding boxes
        if save_csv_to_drive:
            bbox_predictions_df.to_csv(bbox_predictions_csv_file, index=False, header=False)
            if not os.path.exists(bbox_predictions_csv_file):
                print(f"Saved file {bbox_predictions_csv_file}.")
            else:
                print(f"File {bbox_predictions_csv_file} overwritten.")

        del bbox_predictions_list_tot
        del bbox_predictions_df
        ######### SAVE PREDICTED MEASUREMENTS ############################
        frame_aggregate_measurements_df = pd.DataFrame(frame_aggregate_measurements_list_tot)
        if FRAME_BUFFER_DIM > 1 and selective_calculation:
            # the field 'displacement' is NaN in half of the records if BUFF_DIM > 1
            frame_aggregate_measurements_df['displacement'] = frame_aggregate_measurements_df['displacement'].interpolate(method='linear', limit_direction='both')
            frame_aggregate_measurements_df['cluster_index'] = frame_aggregate_measurements_df['cluster_index'].interpolate(method='linear', limit_direction='both')
            frame_aggregate_measurements_df['average_distance_index'] = frame_aggregate_measurements_df['average_distance_index'].interpolate(method='linear', limit_direction='both')
            frame_aggregate_measurements_df['unrest_index'] = frame_aggregate_measurements_df['unrest_index'].interpolate(method='linear', limit_direction='both')
        else:
            pass
        measurements_csv_file = os.path.join(output_labels_and_measurements_dir, f"frame_aggregate_measurements.csv")
        if save_csv_to_drive:
            if not os.path.exists(measurements_csv_file):
                frame_aggregate_measurements_df.to_csv(measurements_csv_file, index=False, header=False)
                print(f"Saved file {measurements_csv_file}")
            else:
                print(f"File {measurements_csv_file} already exists.")

        frame_aggregate_measurements_list_tot.clear()
        del frame_aggregate_measurements_list_tot
        del frame_aggregate_measurements_df

        batch_aggregate_measurements_df = pd.DataFrame(batch_aggregate_measurements_list)
        out_csv_file = os.path.join(output_labels_and_measurements_dir, f"batch_aggregate_measurements.csv")
        if save_csv_to_drive:
            if not os.path.exists(out_csv_file):
                batch_aggregate_measurements_df.to_csv(out_csv_file, index=False, header=False)
                print(f"Saved file {out_csv_file}")
            else:
                print(f"File {out_csv_file} already exists.")
        batch_aggregate_measurements_list.clear()
        del batch_aggregate_measurements_list
        del batch_aggregate_measurements_df
        ####################################################
    ## end of "if perform_tracking:"