# YoloV7 Tracking and Counting using Bytetrack

## Import Libraries and Modules

In [None]:
import cv2
import torch
import numpy as np
from typing import Dict, Tuple, List
import logging
from dataclasses import dataclass
import time

# YOLOv7 specific imports
from models.experimental import attempt_load
from utils.general import check_img_size, non_max_suppression, scale_coords
from utils.torch_utils import select_device

# Bytetrack specific imports
from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch

- cv2: OpenCV library for computer vision tasks.
- torch: PyTorch library for machine learning.
- np: NumPy library for numerical operations.
- Sort: SORT tracker for object tracking.
- logging: Python's logging module for logging events.
- dataclass: To define configuration as a data class.
- YOLOv7 specific imports for model loading and processing.
- Bytetrack specific imports for tracking and counting box_iou_batch

## Data Classes

In [None]:
@dataclass
class Config:
    MODEL_PATH: str = "best_v7.pt" # path to the model file
    FRAME_WIDTH: int = 1280 # width of the frame
    FRAME_HEIGHT: int = 720 # width of the frame
    CONFIDENCE_THRESHOLD: float = 0.5 # confidence threshold for detection
    IOU_THRESHOLD: float = 0.3 # Intersection over Union threshold for non-max suppression.
    FONT_SCALE: float = 1 # Font scale for drawing text on frames.
    FONT_THICKNESS: int = 2 # Font thickness for drawing text on frames.

config = Config()

@dataclass
class BYTETrackerArgs:
    track_thresh: float = 0.25 # Threshold for initiating new tracks.
    track_buffer: int = 30 # Buffer size for track management.
    match_thresh: float = 0.8 # Threshold for matching detections to existing tracks.
    aspect_ratio_thresh: float = 3.0 # Aspect ratio threshold for filtering tracks.
    min_box_area: float = 1.0 #  Minimum bounding box area for tracks.
    mot20: bool = False # Flag for MOT20 dataset compatibility.

These classes define the parameters for object detection and tracking in our system. The Config class contains values used for object detection with YOLOv7, while the BYTETrackerArgs class holds parameters for the ByteTrack tracker. We've encapsulated these parameters within classes to enhance readability and facilitate easy modifications.


## Camera Initialization

Initializes the video capture from a camera.

In [None]:
def initialize_camera(config: Config) -> cv2.VideoCapture:
    cap = cv2.VideoCapture(0) #open the camera (check the index of the device)
    if not cap.isOpened(): #check if the camera is not opened
        raise RuntimeError("Error: Could not open video capture.") # if it doesn't work it will raise a runtime error
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, config.FRAME_WIDTH) #Set the width of the frame with the given width in the config class.
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, config.FRAME_HEIGHT)#Set the height of the frame with the given height in the config class.
    return cap


## Detections Class

The Detections class is a custom data structure designed to store and manage the results of object detection.

In [None]:
class Detections:
    # Constructor method with three parameters: xyxy, confidence, and class_id. To initialize the attributes of an object when it is created. 
    def __init__(self, xyxy, confidence, class_id):
        self.xyxy = xyxy
        self.confidence = confidence
        self.class_id = class_id
        self.tracker_id = None

    # filtering detections based on a given mask.
    def filter(self, mask, inplace=False):
        # Check if the filtering should be done in-place
        if inplace:
            # Update the current object's attributes based on the mask
            self.xyxy = self.xyxy[mask]
            self.confidence = self.confidence[mask]
            self.class_id = self.class_id[mask]
            # If tracker_id is not None, update it based on the mask
            if self.tracker_id is not None:
                self.tracker_id = self.tracker_id[mask]
        else:
            # Return a new Detections object with filtered attributes
            return Detections(
                self.xyxy[mask],
                self.confidence[mask],
                self.class_id[mask]
            )

    #  simplifies iteration over the Detections object by providing a straightforward way to access each detection's attributes in a loop.
    # this make our life easier to access different attributes
    def __iter__(self):
        for xyxy, confidence, class_id, tracker_id in zip(
            self.xyxy, self.confidence, self.class_id, self.tracker_id if self.tracker_id.size > 0 else [None] * len(self.xyxy)
        ):
            yield xyxy, confidence, class_id, tracker_id


## Object Detector Class


Then we create an ObjectDetector class. First, we initialize instance variables, this will be use in different functions in the class.


In [None]:
class ObjectDetector:
    # initialize the paramaters for object detection by accessing both data classes 
    def __init__(self, config: Config):
        self.config = config
        self.device = select_device('') # Select the device (CPU or GPU) for computation
        self.model = self._load_model() # Load the YOLOv7 model using load_model method
        self.byte_tracker = BYTETracker(BYTETrackerArgs()) # Initialize the BYTE tracker
        self.imgsz = check_img_size(640, s=self.model.stride.max()) # Check and set the input image size, ensuring it's compatible with the model's stride
        # Correct handling of self.names assignment
        self.names = self.model.module.names if hasattr(self.model, 'module') else self.model.names
        
        # Ensure self.names is a dictionary
        if isinstance(self.names, list): # If self.names is a list, convert it to a dictionary with indices as keys
            self.names = {i: name for i, name in enumerate(self.names)}
        logger.info(f"Input image size: {self.imgsz}")

### Loading the Model 

The first function of the ObjectDetector class is _load_model(). This will be use to load the best model that was train using the dataset

In [None]:
    def _load_model(self) -> torch.nn.Module:
        model = attempt_load(self.config.MODEL_PATH, map_location=self.device)
        logger.info(f"Model loaded. Number of classes: {len(model.names)}")
        logger.info(f"Class names: {model.names}")
        return model

## Processing Each Frame

This function will detect and track objects

In [None]:
ef process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict[str, int], List[str]]:
        # First, initialize class_counts dictionary that we will be use later for counting of the detected objects
        class_counts = {}
        chicken_parts = []
        # dict mapping class_id to class_name
        CLASS_NAMES_DICT = self.names
        # class_ids of interest 
        CLASS_ID = list(CLASS_NAMES_DICT.keys())

        # pre-process the frame so that it will be fit to the requirement of the model
        img = cv2.resize(frame, (self.imgsz, self.imgsz)) # resize the frame
        img = img.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
        img = np.ascontiguousarray(img) #convert to contiguous array
        img = torch.from_numpy(img).to(self.device) # convert array to PyTorch tensor then move it to device
        img = img.float() / 255.0 # normalize the tensor values to (1, 0)
        if img.ndimension() == 3: # check the dimesion and add one to the first index
            img = img.unsqueeze(0)

        # initialize the starting time of detections
        start_time = time.time()

        # disables gradient calculation during inference so that it will save memory and computation time
        with torch.no_grad():
            pred = self.model(img, augment=False)[0] # predict using the model 

        # initialize the end time of the detections
        end_time = time.time()
        # get the inference time and convert it to milliseconds
        inference_time = (end_time - start_time) * 1000

        # filter out overlapping boxes
        pred = non_max_suppression(pred, self.config.CONFIDENCE_THRESHOLD, self.config.IOU_THRESHOLD)

        # Ensure all GPU operations are complete before proceeding
        if torch.cuda.is_available():
            torch.cuda.synchronize()

        if len(pred[0]) > 0: #check if there is detected
            det = pred[0] # Extract the first element containing detections
            det[:, :4] = scale_coords(img.shape[2:], det[:, :4], frame.shape).round() 
            #adjust the dimensions of the images to the frames and round it up so that it will ensures that the bounding box is in proper place 
            
            # creating an instance of Detections class named detections
            detections = Detections(
                xyxy=det[:, :4].cpu().numpy(), # coordinates
                confidence=det[:, 4].cpu().numpy(), # confidence value of the detected object
                class_id=det[:, 5].cpu().numpy().astype(int) # class id of the detected object
            )

            # filtering out detections with unwanted classes
            mask = np.array([class_id in CLASS_ID for class_id in detections.class_id], dtype=bool)
            detections.filter(mask=mask, inplace=True)

            """
            # A mask array created to check if there is new detected object 
            mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
            
            This can be constructed as shown below for easier readability: 
                mask_list = [] # Initialize an empty list to store the mask values
                for class_id in detections.class_id: # Iterate over each class_id in detections.class_id
                    if class_id in CLASS_ID: # Check if the current class_id is in CLASS_ID
                        mask_list.append(True) # If it is, append True to the mask_list
                    else:
                        mask_list.append(False) # If it is not, append False to the mask_list
                # Convert the mask_list to a NumPy array of boolean type
                mask = np.array(mask_list, dtype=bool)
            """

            # tracking detections
            tracks = self.byte_tracker.update(
                output_results=self.detections2boxes(detections=detections),
                img_info=frame.shape,
                img_size=frame.shape
            )
            #obtaining track id by using the method match_detections_with_tracks
            tracker_id = self.match_detections_with_tracks(detections=detections, tracks=tracks)
            detections.tracker_id = np.array(tracker_id)


            # Annotating the frame. Format custom labels and draw bounding boxes
            for xyxy, confidence, class_id, tracker_id in detections:
                x1, y1, x2, y2 = xyxy.astype(int) # coordinates of the bounding box
                class_name = CLASS_NAMES_DICT[class_id] # class name of the detected object 
                
                class_counts[class_name] = class_counts.get(class_name, 0) + 1 # update the count of class_counts dict
                chicken_parts.append(class_name) # append the class_name to chicken_parts list

                color = self.get_color_for_class(class_id) # getting random color using the function get_color_for_class
                label = f"#{tracker_id} {class_name} {confidence:.2f}" # a string for label (#1 Thigh 70)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # create a rectangle using the coordinates of the detected object for bounding box
                cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) # put the label in the frame

            # put text for the objects per class that is detected
            for i, (class_name, count) in enumerate(class_counts.items()):
                cv2.putText(frame, f"{class_name}: {count}", (10, 30 + i * 30), 
                            cv2.FONT_HERSHEY_SIMPLEX, self.config.FONT_SCALE, (255, 255, 255), self.config.FONT_THICKNESS)

        return frame, class_counts, chicken_parts, inference_time

### Generating Colors for Classes

This function is created for giving the classes different color from each other when they are detected and putting boundix box around it

In [None]:
        @staticmethod 
        def get_color_for_class(class_id: int) -> Tuple[int, int, int]:
            np.random.seed(class_id)
            return tuple(np.random.randint(0, 255, size=3).tolist()) # randomized 3 numbers for color (RGB) then convert it array to list and list to tuple

## Tracking Utils


We need to manually match the bounding boxes generated by our model with those produced by the tracker.

In [None]:
    @staticmethod
    # converts Detections into format that can be consumed by match_detections_with_tracks function
    def detections2boxes(detections: Detections) -> np.ndarray:
        return np.hstack(( # horizontal stack
            detections.xyxy,
            detections.confidence[:, np.newaxis]
        ))

    @staticmethod
    # converts List[STrack] into format that can be consumed by match_detections_with_tracks function
    def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
        return np.array([
            track.tlbr
            for track
            in tracks
        ], dtype=float)  

    @staticmethod
    # matches our bounding boxes with predictions
    def match_detections_with_tracks(
        detections: Detections,
        tracks: List[STrack]
    ) -> List[int]:
        
        # check if there is a detections
        if not np.any(detections.xyxy) or len(tracks) == 0:
            return [None] * len(detections.xyxy)

        tracks_boxes = ObjectDetector.tracks2boxes(tracks=tracks) # Converts the tracks into bounding box format using the tracks2boxes function.
        iou = box_iou_batch(tracks_boxes, detections.xyxy) # obtaining iou using the box_iou_batch method
        track2detection = np.argmax(iou, axis=1) # Finds the index of the detection (from detections) that has the highest IOU with each track, indicating the best match.

        # Creating a list where we will store the track ids
        tracker_ids = [None] * len(detections.xyxy)

        # assign track ids to each detected object that meets the iou threshold
        for tracker_index, detection_index in enumerate(track2detection):
            if iou[tracker_index, detection_index] != 0:
                tracker_ids[detection_index] = tracks[tracker_index].track_id

        return tracker_ids

## Main Function

The main function ties everything together that we built. From detection to tracking and counting of detected objects

In [None]:
def main():
    # Initialize the detector and camera
    detector = ObjectDetector(config) # we will create an instance of the ObjectDetector class 
    cap = initialize_camera(config) # initialize camera using the initialize_camera function

    try:
        while True: # it will loop infinitely to capture and detect images until it breaks or exception occurs
            ret, frame = cap.read() # it will read the image that the camera captures
            if not ret: # if the ret or return in the capturing reading is false then it will break
                logger.error("Could not read frame.") 
                break
            # if there is a frame captured, it will pass to the instance method of process_frame to get the bounding boxes drawn and a count of detected objects by class. 
            processed_frame, class_counts = detector.process_frame(frame)
            # then it will output to the window the processed frame
            cv2.imshow("YOLOv7 Live", processed_frame)
            print(f"{inference_time:.4f} ms to do a forward inference time per image.") # print in the terminal the inference time per image
            # This waits for 30 milliseconds for a key press. If the Esc keyis pressed, it breaks the loop.
            if cv2.waitKey(30) == 27:  # ASCII of Esc key is 27
                break
    except Exception as e:
        logger.exception(f"An error occurred: {e}") # If any exception occurs during the process, it's caught and logged.
    finally: # then we will release all of the resources
        cap.release()
        cv2.destroyAllWindows()
        torch.cuda.empty_cache()

if __name__ == "__main__":
    main()
