Imports

In [23]:
from ultralytics import YOLO
import cv2
import numpy as np

Configuration

In [35]:
SOURCE_PATH = "" # Complete path of the video you want to proccess
DETECTION_MODEL_PATH = "" # Complete path of the file "antDetection.pt"
CLASSIFICATION_MODEL_PATH = "" # Complete path of the file "loadClassification.pt"

output_filename = "filename.mp4"  # Set the output video filename


In [25]:
BBOX_PADDING = 15 # This value is how much (in pixels) you want to look around the bounding box in each detection. This is used for load detection.
LOAD_THRESHOLD = 0.95 # This value is how sure the model has to consider that the ant has load.

Video Configuration

In [26]:
cap = cv2.VideoCapture(SOURCE_PATH)
detection_model = YOLO(DETECTION_MODEL_PATH)
classification_model = YOLO(CLASSIFICATION_MODEL_PATH)

In [27]:
# Define the video writer parameters
output_width, output_height = cap.get(3), cap.get(4)
fps = 30  # Adjust the frames per second (fps) as needed

# Define the video writer object
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
video_writer = cv2.VideoWriter(output_filename, fourcc, fps, (int(output_width), int(output_height)))

Code

In [28]:
def getNumberOfFrames(video):
    numberOfFrames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    return numberOfFrames


numberOfFrames = getNumberOfFrames(cap)

In [29]:
# This function returns the new bbox after adding the BBOX_PADDING
def extract_object(frame, bbox, padding):
    x, y, x2, y2 = bbox
    padded_x = max(0, x - padding)
    padded_y = max(0, y - padding)
    padded_x2 = min(frame.shape[1], x2 + padding)
    padded_y2 = min(frame.shape[0], y2 + padding)
    object_frame = frame[padded_y:padded_y2, padded_x:padded_x2]
    return object_frame

In [30]:
# This function runs the inference on the classification model to determine if an ant is loaded or not and returns the confidence of the ant in the frame being loaded.
def getLoadProb(frame):
    results = classification_model(frame, verbose=False)

    for r in results:
        load_prob = np.array(r.probs.data.cpu())[0]

    return load_prob


In [31]:
# This function adds the ant id and its probability of being loaded into the dictionary that stores all the probabilities of each ant.
def addLoadProb(antID, prob, dictionary):
    if antID in dictionary:
        dictionary[antID].append(prob)
    else:   
        dictionary[antID] = [prob]        

In [32]:
# This function checks how many times the load probability of an ant surpassed the threshold and returns it
def calcLoadProb(probsArray):
    loadProb = np.sum(np.array(probsArray) > LOAD_THRESHOLD)
    return loadProb

In [33]:
# Paints a rectangle with rounded corners. This is just for aesthetics.
def rounded_rectangle(img, start, end, color, thickness, r):
    x1, y1 = start
    x2, y2 = end
    cv2.line(img, (x1 + r, y1), (x2 - r, y1), color, thickness)
    cv2.line(img, (x1, y1 + r), (x1, y2 - r), color, thickness)
    cv2.line(img, (x1 + r, y2), (x2 - r, y2), color, thickness)
    cv2.line(img, (x2, y1 + r), (x2, y2 - r), color, thickness)
    cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness)
    cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness)
    cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness)
    cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness)

In [34]:
ant_paths = {} # This dictionary stores the path traveled of each ant
ant_loadProbs = {} #DICTIONARY ANT_ID: ARRAY LOAD PROBABILITY

# Main loop
ret = True
processedFrames = 0
while ret:
    print("Processed Frames: " + str(processedFrames) + "/" + str(numberOfFrames), end="\r")
    ret, frame = cap.read()
    try:
        results = detection_model.track(frame, persist=True, verbose=False)
        result = results[0]
    except:
        break

    if result.boxes.id is None:
        break
    
    paintedFrame = frame.copy()
    bboxes = np.array(result.boxes.xyxy.cpu(), dtype="int")
    ids = np.array(result.boxes.id.cpu(), dtype="int")

    for (bbox, id) in zip(bboxes, ids):
        objectFrame = extract_object(frame, bbox, BBOX_PADDING)
        x, y, x2, y2 = bbox
        addLoadProb(str(id), getLoadProb(objectFrame), ant_loadProbs)
        antLoadProb = calcLoadProb(ant_loadProbs[str(id)])


        center = (int((x + x2) / 2), int((y + y2) / 2))
        if id not in ant_paths:
            ant_paths[id] = [center]
        else:
            ant_paths[id].append(center)

        if len(ant_paths[id]) > 1:
            for i in range(1, len(ant_paths[id])):
                cv2.line(paintedFrame, ant_paths[id][i - 1], ant_paths[id][i], (30, 240, 240), 2)

        if antLoadProb > 5:
            rounded_rectangle(paintedFrame, (x, y), (x2, y2), (240, 240, 30), 3, 2)
            cv2.putText(paintedFrame, "Ant + Load " + str(id), (x, y - 5), cv2.FONT_HERSHEY_PLAIN, 1, (240, 240, 30), 2)
        else:
            rounded_rectangle(paintedFrame, (x, y), (x2, y2), (255, 69, 0), 2, 2)
            cv2.putText(paintedFrame, "Ant " + str(id), (x, y - 5), cv2.FONT_HERSHEY_PLAIN, 1, (255, 69, 0), 1)

  
    video_writer.write(paintedFrame)
    processedFrames += 1

video_writer.release()
cap.release()
cv2.destroyAllWindows()

Processed Frames: 907/908



Processed Frames: 908/908