# Independent Tests for Bounding Box Validation on YOLO Model

Using the Thermal Model for testing the Bounding Box Validation (BBV) Pipeline & Model on a live YOLO model, using manually counted clips.

In [1]:
''' Dependencies '''

# Core libraries
import cv2
import supervision as sv
import numpy as np
from ultralytics import YOLO
from utils.thermal_frame_to_temp import result_to_temp_frame
from utils.group_bounding_boxes import group_and_merge_bounding_boxes
from validate_bounding_box import get_box_count
from tkinter.filedialog import askopenfilename, askdirectory
import joblib
from collections import Counter

# SVM and model training
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV

# Random Forest
from sklearn.ensemble import RandomForestClassifier

# Synthesizing Data
from imblearn.over_sampling import SMOTE

# ANN
from tensorflow import keras
from tensorflow.keras import layers

# Better exception handling and helpers
import traceback
import pprint
import datetime
import os
import time



## Deriving Ground Truth Video & Labeling

Given a clip with a known count, save the known count for comparison.

In [2]:
TRUE_COUNT = 122  # True count for the given clip
FILE_NAME:str = "3-Part-Mid-Belt-Testing-Clip-TRUE_COUNT-122" # Global variable to store the selected file name if applicable

## Pulling in the saved model & declaring pipeline

In [11]:
BBV_MODEL = joblib.load('thermal_chick_counting_rf_model_fit.pkl')  # Load the pre-trained BBV model
BBV_STANDARD_SCALER = joblib.load('thermal_chick_counting_rf_scaler.pkl')  # Load the pre-fitted StandardScaler
YOLO_MODEL = YOLO('./models/new_iron.pt')  # Load the pre-trained YOLO model

# Declare the pipeline
BBV_PIPELINE = Pipeline([
    ('scaler', BBV_STANDARD_SCALER),
    ('classifier', BBV_MODEL)
])

# Validate that the model properly loaded
BBV_PIPELINE

0,1,2
,steps,"[('scaler', ...), ('classifier', ...)]"
,transform_input,
,memory,
,verbose,False

0,1,2
,copy,True
,with_mean,True
,with_std,True

0,1,2
,n_estimators,100
,criterion,'gini'
,max_depth,10
,min_samples_split,4
,min_samples_leaf,1
,min_weight_fraction_leaf,0.0
,max_features,0.8
,max_leaf_nodes,
,min_impurity_decrease,0.0
,bootstrap,True


## Run the YOLO Model with Thermal Pipeline 

In [None]:
def get_line_from_video_frame(frame):
    frame_height, frame_width = frame.shape[:2]
    # Draw a horizontal line across the middle of the frame
    line_start = (frame_width, frame_height // 2)
    line_end = (0, frame_height // 2)
    return [line_start, line_end]

def chick_counting(video_path, output_path, line_points, verbose = False):

    # Grab a sample frame so we know video size
    generator = sv.get_video_frames_generator(video_path)
    frame = next(generator)

    # Set up video writer with same FPS/size as input
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    cap.release()
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame.shape[1], frame.shape[0]))
    if not out.isOpened():
        print("Error: Could not open video writer")
        return

    # Init tracker and helpers
    byte_tracker = sv.ByteTrack()
    trace_annotator = sv.TraceAnnotator(thickness=4, trace_length=50)

    # Create the counting line
    line_zone = sv.LineZone(start=sv.Point(*line_points[0]), end=sv.Point(*line_points[1]))

    # Load custom YOLO model
    model = YOLO_MODEL
    
    # Annotators for boxes + labels
    BOUNDING_BOX_ANNOTATOR = sv.BoxAnnotator(thickness=2, color=sv.Color(0, 255, 0))
    LABEL_ANNOTATOR = sv.LabelAnnotator(text_scale=1)

    # Counters
    frame_count = 0
    total_count = 0
    total_count_bbv = 0
    all_counted_ids = set()  # keep track of already-counted trackers
    all_counted_ids_bbv = set()  # Seperate list for the bounding box validation
    
    # Constants to hold high and low thermal temperatures for denormalization
    prev_hi = None
    prev_lo = None

    try:
        generator = sv.get_video_frames_generator(video_path)

        for frame in generator:
            frame_count += 1
            if verbose:
                print(f"Processing frame {frame_count}")

            # Run YOLO on frame
            results = model(frame)[0]
            
            # Get the frame image as denormalized numpy array
            try:
                temp_arr, prev_hi, prev_lo = result_to_temp_frame(
                    results,
                    frame_idx = frame_count,
                    prev_hi_val = prev_hi,
                    prev_lo_val = prev_lo
                )
            except Exception as e:
                if verbose:
                    print(f"Warning: Could not convert frame {frame_count} to temperature array. Skipping BBV for this frame.")
                if prev_hi is not None and prev_lo is not None:
                    temp_arr = np.zeros_like(frame[..., 0], dtype=np.float32)  # reuse last temp_arr shape
                else:
                    temp_arr = None

            # Convert results to supervision Detections
            detections = sv.Detections.from_ultralytics(results)

            # Sensitivity for declaring a box as "nested" (e.g. 0.9 means inner must have at least 90% of its area inside outer)
            NESTED_THRESHOLD = 0.9  

            # Get indicies of all boxes
            contained_indices = set()
            boxes = detections.xyxy

            for i, outer in enumerate(boxes):
                x1o, y1o, x2o, y2o = outer
                outer_area = max(0, (x2o - x1o)) * max(0, (y2o - y1o))

                for j, inner in enumerate(boxes):
                    if i == j:
                        continue
                    x1i, y1i, x2i, y2i = inner
                    inner_area = max(0, (x2i - x1i)) * max(0, (y2i - y1i))

                    # Intersection box
                    inter_x1 = max(x1o, x1i)
                    inter_y1 = max(y1o, y1i)
                    inter_x2 = min(x2o, x2i)
                    inter_y2 = min(y2o, y2i)

                    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)

                    # Ratio of inner covered by outer
                    if inner_area > 0 and (inter_area / inner_area) >= NESTED_THRESHOLD:
                        contained_indices.add(j)


            # Update tracker with detections
            detections = byte_tracker.update_with_detections(detections)
            if verbose:
                print("Tracker IDs this frame:", detections.tracker_id)

            # See if any trackers crossed the line
            crossed_in_flags, crossed_out_flags = line_zone.trigger(detections)

            # Only count new IDs that cross "in"
            for i, crossed in enumerate(crossed_in_flags):
                if crossed:
                    tracker_id = detections.tracker_id[i]
                    
                    # YOLO tracker
                    if tracker_id is not None and tracker_id not in all_counted_ids:
                        total_count += 1
                        all_counted_ids.add(tracker_id)
                        if verbose:
                            print(f"New Chick crossed the line! ID {tracker_id}, Total count: {total_count}")
                    
                    # Bounding Box Validation tracker
                    if tracker_id is not None and tracker_id not in all_counted_ids_bbv:
                        # Get the merged bounding boxes for overlaps in this frame
                        group_and_merge_bounding_boxes_result = group_and_merge_bounding_boxes(
                            xyxy = detections.xyxy, 
                            tracker_ids = detections.tracker_id.tolist(), 
                            target_tracker_id = tracker_id,
                            iou_thresh = 0.01,  # Low threshold to catch even slight overlaps
                        )
                        if group_and_merge_bounding_boxes_result is None: continue  # Skip if no valid group found
                        merged_box_group, grouped_tracker_ids = group_and_merge_bounding_boxes_result
                        all_counted_ids_bbv.update(grouped_tracker_ids)  # Add all grouped IDs to counted list
                        # Using the merged group, add to the total BBV count
                        total_count_bbv += get_box_count(
                            pipeline=BBV_PIPELINE,
                            temperature_frame=temp_arr,
                            box=merged_box_group
                        )
                        if verbose:
                            print(f"New BBV Chick(s) crossed the line! Grouped IDs {grouped_tracker_ids}, BBV Total count: {total_count_bbv}")

            # Assign labels + colors depending on nesting
            labels = []
            colors = []
            for i, tracker_id in enumerate(detections.tracker_id):
                if i in contained_indices:
                    labels.append(f"#{tracker_id} nested")
                    colors.append(sv.Color.RED)
                else:
                    labels.append(f"#{tracker_id} chick")
                    colors.append(sv.Color.GREEN)

            # Draw tracker trails
            annotated_frame = trace_annotator.annotate(scene=frame.copy(), detections=detections)

            # Draw bounding boxes manually with chosen colors
            for i, box in enumerate(detections.xyxy):
                color = colors[i] if i < len(colors) else sv.Color.GREEN
                x1, y1, x2, y2 = map(int, box)
                cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color.as_bgr(), 2)

            # Draw labels
            annotated_frame = LABEL_ANNOTATOR.annotate(annotated_frame, detections, labels=labels)

            # Draw the counting line
            cv2.line(annotated_frame, line_points[0], line_points[1], (0, 0, 255), 2)

            # Overlay YOLO total count
            cv2.putText(
                annotated_frame,
                f'Total Count: {total_count}',
                (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 255, 0),
                2,
                cv2.LINE_AA
            )
            
            # Overlay BBV total count
            cv2.putText(
                annotated_frame,
                f'BBV Total Count: {total_count_bbv}',
                (10, 80),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (255, 0, 0),
                2,
                cv2.LINE_AA
            )
            
            # Overlay True total count
            cv2.putText(
                annotated_frame,
                f'True Total Count: {TRUE_COUNT}',
                (10, 110),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 0, 255),
                2,
                cv2.LINE_AA
            )

            # Write out annotated frame
            out.write(annotated_frame)
            
    except Exception as e:
        # Detailed exception logging
        print("=== Exception while processing video frames ===")
        print("Time:", datetime.datetime.now().isoformat())
        print("Exception type:", type(e).__name__)
        print("Exception message:", str(e))
        print("Full traceback:")
        print(traceback.format_exc())

    finally:
        # Clean up writer and windows
        out.release()
        cv2.destroyAllWindows()
        print(f"Total Frames Processed: {frame_count}\nYOLO Count = {total_count}, BBV Total = {total_count_bbv}, True Count = {TRUE_COUNT}")
        if verbose:
            print(f"LineZone internal count (for reference): in={line_zone.in_count}, out={line_zone.out_count}")


if __name__ == "__main__":
    import tkinter as tk
    from tkinter.filedialog import askopenfilename, askdirectory
    tk.Tk().withdraw()

    # Pick input video + output folder with file dialogs
    SOURCE_VIDEO_PATH = askopenfilename()
    print("User chose:", SOURCE_VIDEO_PATH)

    folder_path = askdirectory()
    print("Output folder:", folder_path)

    # Build output filename
    filename_no_ext = SOURCE_VIDEO_PATH.split('/')[-1].rsplit('.', 1)[0]
    OUTPUT_PATH = f"{folder_path}/{filename_no_ext}-outputfile(colored).mp4"
    print("Output path:", OUTPUT_PATH)

    # Grab a frame to define the line
    cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)
    ret, frame = cap.read()
    if not ret:
        print("Failed to read the video")
        exit()
    cap.release()

    line_points = get_line_from_video_frame(frame)
    print("Line points:", line_points)

    # Only run if line points are valid
    if len(line_points) == 2:
        chick_counting(SOURCE_VIDEO_PATH, OUTPUT_PATH, line_points, verbose=True)
    else:
        print("Error: Not enough points to define the counting line.")

User chose: 
Output folder: 
Output path: /-outputfile(colored).mp4
Failed to read the video


[ WARN:0@1578.833] global cap.cpp:175 open VIDEOIO(CV_IMAGES): raised OpenCV exception:

OpenCV(4.12.0) /io/opencv/modules/videoio/src/cap_images.cpp:293: error: (-215:Assertion failed) !_filename.empty() in function 'open'



KeyboardInterrupt

