In [16]:
import pandas as pd
import numpy as np
from KalmanFilter import KalmanFilter

## 1 - Load dataset Yolov5l

In [17]:
detect_Yolov5l_df = pd.read_csv('ADL-Rundle-6\\det\\Yolov5l\\det.txt', sep = ' ', names = ["frame", "id", "bb_left", "bb_top", "bb_width",
"bb_height", "conf", "x", "y", "z"])

In [18]:
detect_Yolov5l_df.head()

Unnamed: 0,frame,id,bb_left,bb_top,bb_width,bb_height,conf,x,y,z
0,1,-1,1700,391,156,337,0.91455,-1,-1,-1
1,1,-1,250,456,107,248,0.883148,-1,-1,-1
2,1,-1,1255,539,60,118,0.826354,-1,-1,-1
3,1,-1,1288,459,73,199,0.745969,-1,-1,-1
4,1,-1,120,504,93,239,0.740778,-1,-1,-1


In [19]:
detect_Yolov5l_df.drop(columns=["x", "y", "z"], inplace=True)

In [20]:
detect_Yolov5l_df.dtypes

frame          int64
id             int64
bb_left        int64
bb_top         int64
bb_width       int64
bb_height      int64
conf         float64
dtype: object

## Clean the dataframe
To clean it we will get rid of NaN and all the conf bellow 0.5.

## 2 - Create similarity matrix

### Define the lists or arrays to store the current tracked bounding boxes and the new detections for the current frame.

In [21]:
bounding_boxes = detect_Yolov5l_df.groupby("frame")

In [22]:
bounding_boxes.first()

Unnamed: 0_level_0,id,bb_left,bb_top,bb_width,bb_height,conf
frame,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,-1,1700,391,156,337,0.914550
2,-1,1689,390,163,338,0.900352
3,-1,1686,389,163,340,0.908388
4,-1,1684,390,162,341,0.894863
5,-1,1686,389,159,344,0.909074
...,...,...,...,...,...,...
521,-1,695,284,219,586,0.873641
522,-1,695,294,250,580,0.873710
523,-1,691,296,274,577,0.923026
524,-1,693,299,277,574,0.931775


# 3 - Kalman Filter / ONNX model

**In this part:**
- Initialize the Track class -> ID tracking of object.
- Load the ONNX model ready to use.
- Implements all the different useful functions for the global logic of tracking (IOU etc...).

## Use the kalman Filter

In [23]:
kalman_params = {
    "dt": 0.1,
    "u_x": 0,
    "u_y": 0,
    "std_acc": 1,
    "x_sdt_meas": 0.1,
    "y_sdt_meas": 0.1
}

class Track:
    def __init__(self, track_id, bb_left, bb_top, bb_width, bb_height, conf, kalman_params, image):
        self.id = track_id
        self.bb = [bb_left, bb_top, bb_width, bb_height]
        self.bb_left = bb_left
        self.bb_top = bb_top
        self.bb_width = bb_width
        self.bb_height = bb_height
        self.conf = conf
        self.missed_frames = 0
        self.image = image

        # Initialize the Kalman Filter
        self.kf = KalmanFilter(**kalman_params)

        # Compute the initial centroid to set the Kalman filter state
        cx = self.bb_left + self.bb_width / 2.0
        cy = self.bb_top + self.bb_height / 2.0
        # Assuming state vector: [cx, cy, vx, vy]^T
        self.kf.x_k = np.array([[cx],
                                [cy],
                                [0],   # initial vx
                                [0]],  # initial vy
                                dtype=float)

    def predict(self):
        predicted_state, _ = self.kf.predict()
        cx, cy = predicted_state[0, 0], predicted_state[1, 0]

        # Update bounding box using predicted centroid, keep width & height unchanged
        self.bb_left = cx - (self.bb_width / 2.0)
        self.bb_top = cy - (self.bb_height / 2.0)
        self.bb[0] = self.bb_left
        self.bb[1] = self.bb_top
        self.bb[2] = self.bb_width
        self.bb[3] = self.bb_height

    def update(self, bb_left, bb_top, bb_width, bb_height, conf):
        """
        Update the track with a new bounding box measurement and confidence.

        Parameters:
            bb_left (float): Left coordinate of the detected bounding box.
            bb_top (float): Top coordinate of the detected bounding box.
            bb_width (float): Width of the detected bounding box.
            bb_height (float): Height of the detected bounding box.
            conf (float): Detection confidence.
        """
        # Compute centroid of the detected bounding box
        cx = bb_left + bb_width / 2.0
        cy = bb_top + bb_height / 2.0
        z_k = np.array([[cx], [cy]], dtype=float)

        # Update the Kalman filter with the new measurement
        self.kf.update(z_k)

        # Retrieve the updated state from the Kalman filter
        cx_est = self.kf.x_k[0, 0]
        cy_est = self.kf.x_k[1, 0]

        # Update the track's bounding box with the estimated centroid
        self.bb_left = cx_est - (bb_width / 2.0)
        self.bb_top = cy_est - (bb_height / 2.0)
        self.bb_width = bb_width
        self.bb_height = bb_height
        
        self.bb[0] = self.bb_left
        self.bb[1] = self.bb_top
        self.bb[2] = self.bb_width
        self.bb[3] = self.bb_height
        
        self.conf = conf

        # Reset missed frames since we got a match
        self.missed_frames = 0



## Load the oonx model

In [24]:
import onnxruntime as ort
import cv2

class ReIDFeatureExtractor:
    def __init__(self, model_path, input_size=(64, 128), means=(0.485, 0.456, 0.406), stds=(0.229, 0.224, 0.225)):
        self.input_size = input_size
        self.means = np.array(means, dtype=np.float32).reshape(1, 1, 3)
        self.stds = np.array(stds, dtype=np.float32).reshape(1, 1, 3)
        self.session = ort.InferenceSession(model_path)

    def preprocess_patch(self, im_crop):
        patch = cv2.resize(im_crop, self.input_size)
        patch = cv2.cvtColor(patch, cv2.COLOR_BGR2RGB)
        patch = (patch.astype(np.float32) / 255.0 - self.means) / self.stds
        patch = np.transpose(patch, (2, 0, 1))  # HWC to CHW
        patch = np.expand_dims(patch, axis=0)  # Add batch dimension
        return patch.astype(np.float32)

    def extract_features(self, im_crop):
        patch = self.preprocess_patch(im_crop)
        outputs = self.session.run(None, {self.session.get_inputs()[0].name: patch})
        return outputs[0].flatten()  # Flatten to 1D feature vector


def normalized_similarity(distance):
    return 1 / (1 + distance)


def crop_image_by_bbox(image, bbox):
    x, y, w, h = [int(coord) for coord in bbox]
    img_h, img_w = image.shape[:2]

    # Ensure bounding box is within image bounds
    x = max(0, x)
    y = max(0, y)
    w = min(w, img_w - x)
    h = min(h, img_h - y)

    # Check if the bounding box is valid
    if w <= 0 or h <= 0:
        return None  # Invalid bounding box

    return image[y:y+h, x:x+w]


In [25]:
def process_detections_by_frame(detections_df):
    grouped_detections = (
        detections_df.groupby('frame')[['id', 'bb_left', 'bb_top', 'bb_width', 'bb_height', 'conf']]
        .apply(lambda x: x.values.tolist())
    )
    detections_with_missing_frames = []
    
    for frame_detections in grouped_detections:
        track_object = [
            Track(
                track_id=detection[0],
                bb_left=detection[1],
                bb_top=detection[2],
                bb_width=detection[3],
                bb_height=detection[4],
                conf=detection[5],
                kalman_params=kalman_params,
                image=None
            ) 
            for detection in frame_detections
        ]
        detections_with_missing_frames.append(track_object)
    
    return detections_with_missing_frames


detections_by_frame = process_detections_by_frame(detect_Yolov5l_df)

print("Detections by Frame:", detections_by_frame)

Detections by Frame: [[<__main__.Track object at 0x000002B8E9DB2600>, <__main__.Track object at 0x000002B8E8BBC440>, <__main__.Track object at 0x000002B8E8B4B020>, <__main__.Track object at 0x000002B8D637D5E0>, <__main__.Track object at 0x000002B8D637DC40>, <__main__.Track object at 0x000002B8D5C8B620>, <__main__.Track object at 0x000002B8E9EA6390>], [<__main__.Track object at 0x000002B8E9EA67B0>, <__main__.Track object at 0x000002B8E9EA6240>, <__main__.Track object at 0x000002B8EA10D670>, <__main__.Track object at 0x000002B8EA10D6D0>, <__main__.Track object at 0x000002B8EA10D820>, <__main__.Track object at 0x000002B8EA10D880>, <__main__.Track object at 0x000002B8EA10D8E0>], [<__main__.Track object at 0x000002B8EA10D940>, <__main__.Track object at 0x000002B8EA10D9A0>, <__main__.Track object at 0x000002B8EA10DA00>, <__main__.Track object at 0x000002B8EA10DA60>, <__main__.Track object at 0x000002B8EA10DAC0>, <__main__.Track object at 0x000002B8EA10DB20>, <__main__.Track object at 0x00000

In [26]:
import math

def compute_iou(box1, box2):
    # Convert to corner coordinates
    x1_min, y1_min = box1[0], box1[1]
    x1_max, y1_max = x1_min + box1[2], y1_min + box1[3]

    x2_min, y2_min = box2[0], box2[1]
    x2_max, y2_max = x2_min + box2[2], y2_min + box2[3]

    # Calculate the intersection coordinates
    inter_x_min = max(x1_min, x2_min)
    inter_y_min = max(y1_min, y2_min)
    inter_x_max = min(x1_max, x2_max)
    inter_y_max = min(y1_max, y2_max)

    # Compute intersection area
    inter_width = max(0, inter_x_max - inter_x_min)
    inter_height = max(0, inter_y_max - inter_y_min)
    inter_area = inter_width * inter_height

    # Compute areas of the individual boxes
    box1_area = (x1_max - x1_min) * (y1_max - y1_min)
    box2_area = (x2_max - x2_min) * (y2_max - y2_min)

    # Compute union area
    union_area = box1_area + box2_area - inter_area

    # Compute IoU
    iou = inter_area / union_area if union_area > 0 else 0

    return iou
  


def create_combined_similarity_matrix(last_frame, current_frame, feature_extractor, alpha=0.7, beta=0.3):
    if not last_frame or not current_frame:
        return np.zeros((len(last_frame), len(current_frame)))

    similarity_matrix = np.zeros((len(last_frame), len(current_frame)))

    for i, tracked_box in enumerate(last_frame):
        for j, new_box in enumerate(current_frame):
            iou = compute_iou(tracked_box.bb, new_box.bb)

            # Extract features for appearance similarity
            cropped_patch1 = crop_image_by_bbox(tracked_box.image, tracked_box.bb)
            cropped_patch2 = crop_image_by_bbox(new_box.image, new_box.bb)

            if cropped_patch1 is None or cropped_patch2 is None:
                appearance_similarity = 0
            else:
                feature1 = feature_extractor.extract_features(cropped_patch1)
                feature2 = feature_extractor.extract_features(cropped_patch2)
                euclidean_distance = np.linalg.norm(feature1 - feature2)
                appearance_similarity = normalized_similarity(euclidean_distance)

            # Weighted combination
            similarity_matrix[i, j] = alpha * iou + beta * appearance_similarity

    return similarity_matrix


In [27]:
from scipy.optimize import linear_sum_assignment

def get_mapping_boxes(similarity_matrix, threshold=0.2):

    cost_matrix = 1 - similarity_matrix
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    
    mappings = []
    
    for r, c in zip(row_ind, col_ind):
        similarity = similarity_matrix[r, c]
        if similarity > threshold:
            mappings.append((r, c, similarity))
    
    return mappings

In [28]:
def snapshot_tracks(tracks):
    frame_snapshot = []
    for trk in tracks:
        onframe = (trk.missed_frames == 0)
        frame_snapshot.append((
            trk.id,
            trk.bb_left,
            trk.bb_top,
            trk.bb_width,
            trk.bb_height,
            trk.conf,
            onframe
        ))
    return frame_snapshot

# 4 - Global logic tracking ID

In [29]:
import os
import cv2

def store_bbx_id(detections_by_frame, feature_extractor, image_folder, number_of_missing_frame=60):
    
    image_files = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if img.endswith(('.jpg'))])
    
    if detections_by_frame is None or len(detections_by_frame) == 0:
        print("No detection in the frames")
        return None
    
    if not image_files:
        print("Error: No images found in the folder.")
        return None
    
    id_counter = 0
    frame = detections_by_frame[0]
    for i in range(len(frame)):
        frame[i].id = id_counter
        frame[i].image = cv2.imread(image_files[0])
        frame[i].predict()
        id_counter += 1
    
    current_tracks = frame
    
    all_tracked_frames = [snapshot_tracks(current_tracks)]

    
    for i in range(1, len(detections_by_frame)):
        current_image = cv2.imread(image_files[i])

        new_detections = detections_by_frame[i]
        
        for detection in new_detections:
            detection.image = current_image

        similarity_matrix = create_combined_similarity_matrix(current_tracks, new_detections, feature_extractor)
        mapping = get_mapping_boxes(similarity_matrix)
        # plot_similarity_matrix(similarity_matrix, i)
    
        matched_tracks = set()
        matched_detections = set()


        for old_t_idx, new_d_idx, _ in mapping:
            matched_tracks.add(old_t_idx)
            matched_detections.add(new_d_idx)
            
            det = new_detections[new_d_idx]
            current_tracks[old_t_idx].update(det.bb_left, det.bb_top, det.bb_width, det.bb_height, det.conf)
            current_tracks[old_t_idx].predict()

        unmatched_tracks = set(range(len(current_tracks))) - matched_tracks
        for unmatch in unmatched_tracks:
            current_tracks[unmatch].missed_frames += 1
            current_tracks[unmatch].predict()

        current_tracks = [t for t in current_tracks if t.missed_frames <= number_of_missing_frame]


        unmatched_detections = set(range(len(new_detections))) - matched_detections
        for ud in unmatched_detections:
            det = new_detections[ud]
            det.id = id_counter
            id_counter += 1
            
            det.predict()  # predict once for initialization
            current_tracks.append(det)

        
        all_tracked_frames.append(snapshot_tracks(current_tracks))
        
        if i % 10 == 0:
            print("step number: ", i)
    return all_tracked_frames
            

In [30]:
feature_extractor = ReIDFeatureExtractor('reid_osnet_x025_market1501.onnx')
image_folder = "ADL-Rundle-6\\img1"
current_tracked = store_bbx_id(detections_by_frame=detections_by_frame, feature_extractor=feature_extractor, image_folder=image_folder)

FileNotFoundError: [WinError 3] Le chemin d’accès spécifié est introuvable: 'ADL-Rundle-6\\img1'

# 5 - Save video

In [241]:
import os
import cv2

def display_and_save_tracking_from_images(image_folder, tracked_data, output_video_path):
    """
    Display image sequence with overlaid bounding boxes and save the tracking results as a video.
    
    Parameters:
        image_folder (str): Path to the folder containing image sequence.
        tracked_data (list): List of tracked bounding boxes for each frame.
        output_video_path (str): Path to save the output video.
    """
    # Get sorted list of image files
    image_files = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if img.endswith(('.jpg'))])
    if not image_files:
        print("Error: No images found in the folder.")
        return
    
    # Read the first image to get frame dimensions
    first_image = cv2.imread(image_files[0])
    frame_height, frame_width = first_image.shape[:2]
    fps = 60

    # Define codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Use 'XVID' for .avi files
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
    
    # Iterate through image files and overlay bounding boxes
    for frame_idx, image_path in enumerate(image_files):
        frame = cv2.imread(image_path)
        if frame_idx >= len(tracked_data):
            break
        
        # Get the bounding boxes for the current frame
        current_frame_boxes = tracked_data[frame_idx]
        
        # Draw bounding boxes and IDs on the frame
        for box in current_frame_boxes:
            track_id, x, y, w, h, conf, onframe = box
            if not onframe:
                continue
            # Draw rectangle
            cv2.rectangle(frame, (int(x), int(y)), (int(x + w), int(y + h)), (0, 255, 0), 2)
            # Add track ID label
            cv2.putText(frame, f"ID: {track_id}       conf: {conf:.2f}", (int(x), int(y - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
        
        # Write frame to output video
        out.write(frame)
        
        # Display the frame
        cv2.imshow('Tracking', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Release video resources
    out.release()
    cv2.destroyAllWindows()

# Example usage
image_folder = "ADL-Rundle-6\\img1"
output_video_path = "tracked_output_video.mp4"
display_and_save_tracking_from_images(image_folder, current_tracked, output_video_path)


## 6 - Save into file

In [242]:
def save_tracking_results_txt_images(tracked_data, image_folder, output_dir):
    """
    Save tracking results to a text file in the required format for an image sequence.
    
    Parameters:
        tracked_data (list): List of tracked bounding boxes for each frame.
        image_folder (str): Path to the folder containing image sequence.
        output_dir (str): Directory to save the tracking results.
    """
    sequence_name = os.path.basename(os.path.normpath(image_folder))
    output_file = os.path.join(output_dir, f"{sequence_name}.txt")
    
    with open(output_file, 'w') as file:
        for frame_idx, frame_data in enumerate(tracked_data):
            for box in frame_data:
                track_id, x, y, w, h, conf, _ = box
                file.write(f"{frame_idx + 1},{track_id},{x},{y},{w},{h},{conf},1,-1,-1,-1\n")
    
    print(f"Tracking results saved to {output_file}")


output_dir = "tracking_results"
os.makedirs(output_dir, exist_ok=True)
save_tracking_results_txt_images(current_tracked, image_folder, output_dir)

Tracking results saved to tracking_results\img1.txt
