## 🌟 Smart Modality Selection in Camel Dataset 🐫

This code implements a decision tree algorithm in the Camel dataset using two modalities. The algorithm dynamically selects the modality deemed better for object tracking initiation until the end of tracking.

If the tracker loses the object for more than 3 consecutive frames, it intelligently switches to utilizing frames from the alternative modality.

Let's optimize your object tracking with smart modality selection! 🚀


In [None]:
import warnings

# Suppress UserWarning related to feature names
warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names*")


In [None]:
import joblib
import pandas as pd
import numpy as np
import cv2

from tqdm import tqdm

from sklearn.feature_selection import RFE
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, recall_score, roc_auc_score, f1_score
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
from sklearn.neural_network import MLPClassifier

## 📝 Setting Annotation Parameters 📁

Set the variable `name` to the desired name of the result annotation file, and `path` to where the Camel dataset is located:

```python
name = 'TrackerCSRT_random_forest_dynamic'  # 🎯 Desired name for the result annotation file
path = '/content/drive/MyDrive/Camel'  # 🐪 Path to the Camel dataset


In [None]:
name = 'TrackerCSRT_random_forest_dynamic'
path = '/content/drive/MyDrive/Camel'

## 🛠️ Loading Pre-Trained Model Components!

Unleash the might of your pre-trained scaler, imputer, and model! 💥


In [None]:
# Load the scaler and imputer from files
scaler = joblib.load(f'{path}/scaler.pkl')
imputer = joblib.load(f'{path}/imputer.pkl')
model = joblib.load(f'{path}/model.pkl')

In [None]:
def read_bbox_data_from_dataframe(dataframe):
    bbox_data = {}
    latest_frames = {}
    for index, row in dataframe.iterrows():
        frame = int(row['frame_id'])
        track_id = int(row['track_id'])
        class_id = int(row['class_id'])
        bbox = [int(row['absolute_gt_x']), int(row['absolute_gt_y']), int(row['absolute_gt_width']), int(row['absolute_gt_height'])]
        bbox_data.setdefault(frame, []).append((track_id, bbox))
        latest_frames[track_id] = max(frame, latest_frames.get(track_id, 0))  # Update the latest frame for the track_id
    return bbox_data, latest_frames

def generate_absolute_truth(vis_file,ir_file):
    # Read the tab-delimited text file into a pandas DataFrame
    df1 = pd.read_csv(vis_file, delimiter='\t', header=None, names=['frame_id','track_id', 'class', 'x', 'y', 'width', 'height'], dtype={'frame_id': int, 'track_id': int,'class': int, 'x': float, 'y': float, 'width': float, 'height': float}, on_bad_lines='skip')
    df2 =  pd.read_csv(ir_file, delimiter='\t', header=None, names=['frame_id', 'track_id','class', 'x', 'y', 'width', 'height'], dtype={'frame_id': int, 'track_id': int,'class': int, 'x': float, 'y': float, 'width': float, 'height': float}, on_bad_lines='skip')
    # Perform a full outer join on the two DataFrames
    merged_df = pd.merge(df1, df2, on=['frame_id', 'track_id'], how='outer', suffixes=('_vis', '_ir'))

    # Determine the bigger bounding box for each row and preserve only x, y, width, height
    merged_df['absolute_gt_x'] = merged_df.apply(lambda row: min(row['x_vis'], row['x_ir']) if not pd.isna(row['x_vis']) and not pd.isna(row['x_ir']) else row['x_vis'] if not pd.isna(row['x_vis']) else row['x_ir'], axis=1)
    merged_df['absolute_gt_y'] = merged_df.apply(lambda row: min(row['y_vis'], row['y_ir']) if not pd.isna(row['y_vis']) and not pd.isna(row['y_ir']) else row['y_vis'] if not pd.isna(row['y_vis']) else row['y_ir'], axis=1)
    merged_df['absolute_gt_width'] = merged_df.apply(lambda row: max(row['x_vis'] + row['width_vis'], row['x_ir'] + row['width_ir']) - min(row['x_vis'], row['x_ir']) if not pd.isna(row['x_vis']) and not pd.isna(row['x_ir']) else row['width_vis'] if not pd.isna(row['width_vis']) else row['width_ir'], axis=1)
    merged_df['absolute_gt_height'] = merged_df.apply(lambda row: max(row['y_vis'] + row['height_vis'], row['y_ir'] + row['height_ir']) - min(row['y_vis'], row['y_ir']) if not pd.isna(row['y_vis']) and not pd.isna(row['y_ir']) else row['height_vis'] if not pd.isna(row['height_vis']) else row['height_ir'], axis=1)

    merged_df['class_id'] = -1

    # Drop all other columns except frame_id, track_id, and absolute_gt
    merged_df = merged_df[['frame_id', 'track_id','class_id', 'absolute_gt_x', 'absolute_gt_y', 'absolute_gt_width', 'absolute_gt_height']]

    return merged_df

def adjust_bounding_box(frame, roi):
    height, width, _ = frame.shape
    height -= 1
    width -= 1
    x, y, w, h = roi

    # Adjust x coordinate if it's less than 0
    if x < 0:
        w += x  # Decrease width
        x = 0   # Set x to 0

    # Adjust width if it exceeds the frame width
    if x + w > width:
        w = width - x

    # Adjust y coordinate if it's less than 0
    if y < 0:
        h += y  # Decrease height
        y = 0   # Set y to 0

    # Adjust height if it exceeds the frame height
    if y + h > height:
        h = height - y

    # Ensure width and height are at least 5
    w = max(5, w)
    h = max(5, h)

    # Adjust width if it exceeds the frame width
    if x + w > width:
        x = width - w

    # Adjust height if it exceeds the frame height
    if y + h > height:
        y = height - h


    return (x, y, w, h)

In [None]:
def eval_sample(features, scaler, imputer, model):
    # Apply imputation
    features_imputed = imputer.transform([features])

    # Apply scaling
    features_scaled = scaler.transform(features_imputed)

    # Use the model to make predictions
    prediction = model.predict(features_scaled)

    return prediction

In [None]:
def extract_image_features(frame, frame2, x, y, width, height):
    # frame is visible
    # frame2 is IR
    frame_copy = frame.copy()
    frame2_copy = frame2.copy()
    roi = frame_copy[y:y+height, x:x+width]
    roi2 = frame2_copy[y:y+height, x:x+width]

    frame_gray = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2GRAY)
    frame2_gray = cv2.cvtColor(frame2_copy, cv2.COLOR_BGR2GRAY)
    roi_gray = frame_gray[y:y+height, x:x+width]
    roi2_gray = frame2_gray[y:y+height, x:x+width]

    rois = [(frame_gray, frame_copy),
            (frame2_gray, frame2_copy),
            (roi_gray, roi),
            (roi2_gray, roi2)]

    features = []

    for x in rois:
        gray_roi = x[0]
        roi = x[1]

        # Convert the cropped region to BGR
        b, g, r = cv2.split(roi)

        mean_intensity_b = np.mean(b)
        mean_intensity_g = np.mean(g)
        mean_intensity_r = np.mean(r)

        # Compute median intensity directly from pixel values
        median_intensity_b = np.median(b)
        median_intensity_g = np.median(g)
        median_intensity_r = np.median(r)

        # Compute mode intensity directly from pixel values
        mode_intensity_b = np.argmax(np.bincount(b.flatten()))
        mode_intensity_g = np.argmax(np.bincount(g.flatten()))
        mode_intensity_r = np.argmax(np.bincount(r.flatten()))

        # Compute standard deviation directly from pixel values
        std_deviation_b = np.std(b)
        std_deviation_g = np.std(g)
        std_deviation_r = np.std(r)

        # Compute intensity statistics
        mean_intensity = np.mean(gray_roi)
        variance_intensity = np.var(gray_roi)
        skewness_intensity = np.mean((gray_roi - mean_intensity) ** 3) / np.power(variance_intensity, 1.5)
        kurtosis_intensity = np.mean((gray_roi - mean_intensity) ** 4) / np.power(variance_intensity, 2) - 3

        area = width*height

        # Include all variables in the features list
        features += [mean_intensity_b, mean_intensity_g, mean_intensity_r,
                median_intensity_b, median_intensity_g, median_intensity_r,
                mode_intensity_b, mode_intensity_g, mode_intensity_r,
                std_deviation_b, std_deviation_g, std_deviation_r,
                mean_intensity, variance_intensity, skewness_intensity, kurtosis_intensity, area]

    # Return all features
    return features

In [None]:
def annotate(vid_in_vis, vid_in_ir, annotation_in_vis, annotation_in_ir, annotation_file_out, annotation_file_out2):
    trackers = {}
    # Process frames and initialize trackers
    cap = cv2.VideoCapture(vid_in_vis)
    cap2 = cv2.VideoCapture(vid_in_ir)

    annotation_df = generate_absolute_truth(annotation_in_vis, annotation_in_ir)
    bbox_data, latest_frames = read_bbox_data_from_dataframe(annotation_df)

    with open(annotation_file_out, 'w') as file, open(annotation_file_out2, 'w') as file2:
        while cap.isOpened() and cap2.isOpened():
            ret, frame = cap.read()
            ret2, frame2 = cap2.read()
            if not ret or not ret2:
                break

            frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES))

            # Initialize trackers at the earliest frame where track_id appears
            if frame_number in bbox_data:
                for track_id, bbox in bbox_data[frame_number]:
                    if track_id not in trackers:
                        tracker3 = cv2.TrackerCSRT_create()
                        image_shape = frame.shape
                        bbox = adjust_bounding_box(frame2, bbox)
                        if bbox[2] <= 0 or bbox[3] <= 0:
                            continue
                        x, y, w, h = bbox
                        features = extract_image_features(frame, frame2, x, y, w, h)
                        ir_is_better = eval_sample(features, scaler, imputer, model)
                        if ir_is_better:
                            tracker3.init(frame2, tuple(bbox))
                        else:
                            tracker3.init(frame, tuple(bbox))

                        trackers[track_id] = {'tracker_dual': tracker3, 'last_bbox': bbox, 'lost_count': 0, 'ir_better': ir_is_better}

            # Update trackers and draw bounding boxes
            for track_id, data in trackers.items():

                # Here I tried to determine better modality for each frame -> this only confused the tracker resulting in worse performace

                #if 0 not in data['last_bbox'][2:] and not any(num < 0 for num in data['last_bbox'][:2]):
                #    x, y, w, h = data['last_bbox']
                #    features = extract_image_features(frame, frame2, x, y, w, h)
                #    data['ir_better'] = eval_sample(features, scaler, imputer, model)

                if data['ir_better']:
                    frame3 = frame2
                    frame4 = frame
                else:
                    frame3 =  frame
                    frame4 = frame2

                success3, bbox3 = data['tracker_dual'].update(frame3)

                if not success3:
                    data['lost_count'] += 1
                    if data['lost_count'] > 3:

                        if 0 not in data['last_bbox'][2:] and not any(num < 0 for num in data['last_bbox'][:2]):
                            x, y, w, h = data['last_bbox']
                            features = extract_image_features(frame, frame2, x, y, w, h)
                            data['ir_better'] = eval_sample(features, scaler, imputer, model)
                            if data['ir_better']:
                                frame4 = frame
                            else:
                                frame4 = frame2

                        success, bbox3 = data['tracker_dual'].update(frame4)
                else:
                    data['lost_count'] = 0

                if success3:
                    data['last_bbox'] = bbox3
                else:
                    bbox3 = data['last_bbox']



                x1, y1, w, h = bbox3
                file.write(f"{frame_number} {track_id} {-1} {x1} {y1} {w} {h}\n")
                file2.write(f"{frame_number} {track_id} {-1} {x1} {y1} {w} {h}\n")

            for track_id, value in latest_frames.items():
                if value == frame_number: # or track_id in to_remove
                    # Delete the tracker associated with the current track ID
                    del trackers[track_id]

            # Press 'q' to quit
            if cv2.waitKey(25) & 0xFF == ord('q'):
                break

    # Release resources
    cap.release()
    cap2.release()
    cv2.destroyAllWindows()

## 🚫 Skipped Files Due to Missing Videos/Annotations

Unfortunately, during processing, the following files had to be skipped due to missing videos or annotations:

- File 12
- File 14
- File 16
- File 22
- File 24

These files are excluded from the analysis as their corresponding videos or annotations are not available. 😞


In [None]:
for i in tqdm(range(1, 31)):
    if i in [22,24,16,14,12]:
        continue
    annotate(f'{path}/seq-{i}/Visual-seq{i}.mp4',
             f'{path}/seq-{i}/IR-seq{i}.mp4',
             f'{path}/seq-{i}/Seq{i}-Vis.txt',
             f'{path}/seq-{i}/Seq{i}-IR.txt',
             f'{path}/seq-{i}/{name}-Out_Seq{i}-Vis.txt',
             f'{path}/seq-{i}/{name}-Out_Seq{i}-IR.txt')