<a href="https://colab.research.google.com/github/felipe-aveiro/MMHD-project/blob/main/MMHD_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🛠️🗃️ **Install dependencies, import libraries, and remove "sample_data" folder**

In [None]:
print("START!\n")
# Silently install and remove files
!pip install ultralytics > /dev/null 2>&1

print("🛠️ Ultralytics library installed!\n")

!rm -r sample_data > /dev/null 2>&1

# Silently import everything
import contextlib
with contextlib.redirect_stdout(None), contextlib.redirect_stderr(None):
    import ultralytics
    from ultralytics import YOLO
    import logging
    from ultralytics.utils import LOGGER
    from google.colab import files
    import matplotlib.pyplot as plt
    from PIL import Image
    import os
    import random
    import shutil
    import time
    from tqdm import tqdm
    import torch
    from torch import cat
    import torchvision
    from torchvision.ops import box_iou
    import cv2
    import numpy as np

print("📢 CUDA available:", torch.cuda.is_available())
print("🔍 Device:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU")


print("\n✅ Installations and imports done!")

# 💾📥 **Download MID-3K dataset**

In [None]:
!git clone https://github.com/felipe-aveiro/MMHD-project.git

print("\n✅ Repository was successfully cloned!")

# ⚙️🛠️ **Split dataset into train (65%), validation (15%), and test (20%)**

In [None]:
# Base directory
BASE_DIR = "/content/MMHD-project/MID-3K"

def process_dataset(BASE_DIR, modality):
    print(f"\n🧰 Processing modality: {modality.upper()}")

    # Paths to the images and labels of the selected modality
    images_dir = os.path.join(BASE_DIR, "dataset", modality, "whole-dataset", modality, "images")
    labels_dir = os.path.join(BASE_DIR, "dataset", modality, "whole-dataset", modality, "labels")

    # Build output directories for a given split (train, val, test)
    def build_output_path(split):
        return {
            "images": os.path.join(BASE_DIR, "dataset", modality, split, modality, "images"),
            "labels": os.path.join(BASE_DIR, "dataset", modality, split, modality, "labels"),
        }

    # Collect and shuffle all image files
    image_files = [f for f in os.listdir(images_dir) if f.endswith('.png')]
    random.shuffle(image_files)

    # Split ratios
    train_percentage = 0.65
    val_percentage = 0.15
    test_percentage = 0.20

    # Calculate the split sizes
    total = len(image_files)
    train_split = int(total * train_percentage)
    val_split = int(total * val_percentage)
    test_split = total - train_split - val_split # ensure total coverage

    # Lists for each split
    train_files = image_files[:train_split]
    val_files = image_files[train_split:train_split + val_split]
    test_files = image_files[train_split + val_split:]

    print(f"📊 Total {modality} images: {total}\n")

    print(f"🟢 Train: {len(train_files)}")
    print(f"🔵 Validation: {len(val_files)}")
    print(f"🟣 Test: {len(test_files)}")

 # Delete and recreate output folders
    def prepare_output_folders(splits=["train", "val", "test"]):
        for split in splits:
            paths = build_output_path(split)
            for subpath in paths.values():
                if os.path.exists(subpath):
                    shutil.rmtree(subpath)
                os.makedirs(subpath, exist_ok=True)

    prepare_output_folders()

    # Copy images and corresponding labels to each split folder
    def copy_files(file_list, split):
        paths = build_output_path(split)
        print(f"\n📦 Copying {split} set ({len(file_list)} files):")
        for img_file in tqdm(file_list, desc=f"→ {split.capitalize()} progress", unit=" images"):
            label_file = os.path.splitext(img_file)[0] + ".txt"
            src_img = os.path.join(images_dir, img_file)
            src_lbl = os.path.join(labels_dir, label_file)
            dst_img = os.path.join(paths["images"], img_file)
            dst_lbl = os.path.join(paths["labels"], label_file)

            if os.path.exists(src_img) and os.path.exists(src_lbl):
                shutil.copy2(src_img, dst_img)
                shutil.copy2(src_lbl, dst_lbl)
            else:
                print(f"⚠️ Skipped (missing image or label): {img_file}")

    # Run the file copying process and measure time
    start_time = time.perf_counter()
    copy_files(train_files, "train")
    copy_files(val_files, "val")
    copy_files(test_files, "test")
    elapsed = time.perf_counter() - start_time
    print(f"\n\n🔔 {modality.upper()} dataset split completed in {elapsed:.2f} seconds!")

# Process RGB and Thermal datasets
process_dataset(BASE_DIR, "rgb")
process_dataset(BASE_DIR, "thermal")
process_dataset(BASE_DIR, "depth")

# ⚙️🧰 **Generate dataset configuration files (rgb.yaml & thermal.yaml & depth.yaml)**

In [None]:
# RGB YAML
rgb_yaml_path = os.path.join(BASE_DIR, "dataset", "rgb.yaml")
rgb_yaml_content = f"""
path: {BASE_DIR}/dataset/rgb
train: train/rgb
val: val/rgb
test: test/rgb
names:
  0: human
"""

# THERMAL YAML
thermal_yaml_path = os.path.join(BASE_DIR, "dataset", "thermal.yaml")
thermal_yaml_content = f"""
path: {BASE_DIR}/dataset/thermal
train: train/thermal
val: val/thermal
test: test/thermal
names:
  0: human
"""

# DEPTH YAML
depth_yaml_path = os.path.join(BASE_DIR, "dataset", "depth.yaml")
depth_yaml_content = f"""
path: {BASE_DIR}/dataset/depth
train: train/depth
val: val/depth
test: test/depth
names:
  0: human
"""

# Save
with open(rgb_yaml_path, "w") as f:
    f.write(rgb_yaml_content.strip())

with open(thermal_yaml_path, "w") as f:
    f.write(thermal_yaml_content.strip())

with open(depth_yaml_path, "w") as f:
    f.write(depth_yaml_content.strip())

print("✅ Checkpoint:\n")
print("\t📄 ", rgb_yaml_path)
print("\t📄 ", thermal_yaml_path)
print("\t📄 ", depth_yaml_path)

# 🚀🧠🌈🖼️ **Create YOLO RGB model, train and download**

In [None]:
#REMOVE OTHERS RGB FOLDERS
if os.path.exists("RML-project-MMHD/rgb"):
    shutil.rmtree("RML-project-MMHD/rgb")

# Training parameters
# NUMBER OF EPOCHS TO TRAIN
num_epochs_rgb = 50

# LOAD YOLOv8 SMALL MODEL FOR RGB MODALITY (nano, small, medium...)
model_rgb = YOLO('yolov8s.pt') # LOAD YOLO MODEL FOR TRAINNING Ex.: yolov8n.pt, yolov8s.pt, yolov8m.pt ...

rgb_yaml_path = os.path.join(BASE_DIR, "dataset", "rgb.yaml")

print("🏃‍♂️💨 Using GPU..." if torch.cuda.is_available() else "🐌 Using CPU...")

# Start training
#https://docs.ultralytics.com/usage/cfg/#train-settings
model_rgb.train(
    pretrained = False, # DEFINE IF USE PRETRAINED WEIGHTS
    data = rgb_yaml_path, # DATASET CONFIG FILE
    epochs = num_epochs_rgb, #NUMBER OF EPOCHS
    device = 0 if torch.cuda.is_available() else 'cpu', # USE GPU
    patience = num_epochs_rgb, # SET patience = num_epochs_rgb TO DISABLE EARLY STOP
    imgsz = 640, # TO REZISE IMAGES, DEFAULT 640
    save = True, # TO SAVE CHECKPOINTS AND FINAL MODEL WEIGHTS
    project='RML-project-MMHD', #NAME OF PROJECT
    name = 'rgb', # SUB-NAME OF PROJECT or MODALITY
    plots = True # TO SHOW PLOTS OF TRAINING AND VALIDATION METRICS
)

# 🚀🧠🔥🌡️ **Create YOLO thermal model, train and download**

In [None]:
#REMOVE OTHERS THERMAL FOLDERS
if os.path.exists("RML-project-MMHD/thermal"):
    shutil.rmtree("RML-project-MMHD/thermal")

# Training parameters
# NUMBER OF EPOCHS TO TRAIN
num_epochs_thermal = 50

# LOAD YOLOv8 SMALL MODEL FOR THERMAL MODALITY (nano, small, medium...)
model_thermal = YOLO('yolov8s.pt') # LOAD YOLO MODEL FOR TRAINNING Ex.: yolov8n.pt, yolov8s.pt, yolov8m.pt ...

thermal_yaml_path = os.path.join(BASE_DIR, "dataset", "thermal.yaml")

print("🏃‍♂️💨 Using GPU..." if torch.cuda.is_available() else "🐌 Using CPU...")

# Start training
#https://docs.ultralytics.com/usage/cfg/#train-settings
model_thermal.train(
    pretrained = False, # DEFINE IF USE PRETRAINED WEIGHTS
    data = thermal_yaml_path, # DATASET CONFIG FILE
    epochs = num_epochs_thermal, #NUMBER OF EPOCHS
    device = 0 if torch.cuda.is_available() else 'cpu', # USE GPU
    patience = num_epochs_thermal, # SET patience = num_epochs_thermal TO DISABLE EARLY STOP
    imgsz = 640, # TO REZISE IMAGES, DEFAULT 640
    save = True, # TO SAVE CHECKPOINTS AND FINAL MODEL WEIGHTS
    project='RML-project-MMHD', #NAME OF PROJECT
    name = 'thermal', # SUB-NAME OF PROJECT or MODALITY
    plots = True # TO SHOW PLOTS OF TRAINING AND VALIDATION METRICS
)

# 🚀🧠🕳️📡 **Create YOLO depth model, train and download**

In [None]:
#REMOVE OTHERS DEPTH FOLDERS
if os.path.exists("RML-project-MMHD/depth"):
    shutil.rmtree("RML-project-MMHD/depth")

# Training parameters
# NUMBER OF EPOCHS TO TRAIN
num_epochs_depth = 50

# LOAD YOLOv8 SMALL MODEL FOR DEPTH MODALITY (nano, small, medium...)
model_depth = YOLO('yolov8s.pt') # LOAD YOLO MODEL FOR TRAINNING Ex.: yolov8n.pt, yolov8s.pt, yolov8m.pt ...

depth_yaml_path = os.path.join(BASE_DIR, "dataset", "depth.yaml")

print("🏃‍♂️💨 Using GPU..." if torch.cuda.is_available() else "🐌 Using CPU...")

# Start training
#https://docs.ultralytics.com/usage/cfg/#train-settings
model_depth.train(
    pretrained = False, # DEFINE IF USE PRETRAINED WEIGHTS
    data = depth_yaml_path, # DATASET CONFIG FILE
    epochs = num_epochs_depth, #NUMBER OF EPOCHS
    device = 0 if torch.cuda.is_available() else 'cpu', # USE GPU
    patience = num_epochs_depth, # SET patience = num_epochs_depth TO DISABLE EARLY STOP
    imgsz = 640, # TO REZISE IMAGES, DEFAULT 640
    save = True, # TO SAVE CHECKPOINTS AND FINAL MODEL WEIGHTS
    project='RML-project-MMHD', #NAME OF PROJECT
    name = 'depth', # SUB-NAME OF PROJECT or MODALITY
    plots = True # TO SHOW PLOTS OF TRAINING AND VALIDATION METRICS
)

# 🧪🎯🌈🖼️ **RGB model validation with test set**

In [None]:
# https://docs.ultralytics.com/modes/val/#arguments-for-yolo-model-validation
# Evaluate on the test set
results_test_rgb = model_rgb.val(
    data=rgb_yaml_path,
    split='test',
    project='RML-project-MMHD',
    name='test_eval_rgb',
    plots=True
)

# 🧪🎯🔥🌡️ **Thermal model validation with test set**

In [None]:
# https://docs.ultralytics.com/modes/val/#arguments-for-yolo-model-validation
# Evaluate on the test set
results_test_thermal = model_thermal.val(
    data=thermal_yaml_path,
    split='test',
    project='RML-project-MMHD',
    name='test_eval_thermal',
    plots=True
)

# 🧪🎯🕳️📡 **Depth model validation with test set**

In [None]:
# https://docs.ultralytics.com/modes/val/#arguments-for-yolo-model-validation
# Evaluate on the test set
results_test_depth = model_depth.val(
    data=depth_yaml_path,
    split='test',
    project='RML-project-MMHD',
    name='test_eval_depth',
    plots=True
)

# 🔁🤖 **Alternatively, load previous models**

In [None]:
# ⚙️ OPTIONAL: Load pretrained models from Google Drive if session was restarted
from google.colab import drive
drive.mount('/content/drive')

# Paths for saved best weights
drive_rgb_weights = "/content/drive/MyDrive/RML-2025/RML-project-MMHD/rgb/weights/best.pt"
drive_thermal_weights = "/content/drive/MyDrive/RML-2025/RML-project-MMHD/thermal/weights/best.pt"
drive_depth_weights = "/content/drive/MyDrive/RML-2025/RML-project-MMHD/depth/weights/best.pt"

# Check if files exist and load them
if os.path.exists(drive_rgb_weights):
    model_rgb = YOLO(drive_rgb_weights)
    print("✅ RGB model loaded from Drive!")

if os.path.exists(drive_thermal_weights):
    model_thermal = YOLO(drive_thermal_weights)
    print("✅ Thermal model loaded from Drive!")

if os.path.exists(drive_depth_weights):
    model_depth = YOLO(drive_depth_weights)
    print("✅ Depth model loaded from Drive!")

# 👁️‍🗨️🕵️‍♂️ **Predict images (local files or URL image)**

In [None]:
id_rgb = 0

id_thermal = 0

id_depth = 0

In [None]:
from IPython.display import Image as IPyImage, display
import glob
LOGGER.setLevel(logging.CRITICAL)

# Base image folder paths
image_folder_rgb = "/content/MMHD-project/MID-3K/dataset/rgb/whole-dataset/rgb/images"
image_folder_thermal = "/content/MMHD-project/MID-3K/dataset/thermal/whole-dataset/thermal/images"
image_folder_depth = "/content/MMHD-project/MID-3K/dataset/depth/whole-dataset/depth/images"

def get_random_image(image_folder):
  # Get a random image filename
  image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png'))]
  if not image_files:
    raise RuntimeError(f"No PNG images found in {image_folder}")

  random_image_name = random.choice(image_files)
  random_image_path = os.path.join(image_folder, random_image_name)
  return random_image_path, random_image_name


random_image_path_rgb, random_image_name_rgb = get_random_image(image_folder_rgb)
random_image_path_thermal, random_image_name_thermal = get_random_image(image_folder_thermal)
random_image_path_depth, random_image_name_depth = get_random_image(image_folder_depth)

print(f"🔍 Selected random RGB image: {random_image_name_rgb}")
print(f"🔍 Selected random thermal image: {random_image_name_thermal}")
print(f"🔍 Selected random depth image: {random_image_name_depth}")
print()

# Run prediction
# Tutorial: https://docs.ultralytics.com/modes/predict/
pred_rgb = model_rgb(random_image_path_rgb)
pred_thermal = model_thermal(random_image_path_thermal)
pred_depth = model_depth(random_image_path_depth)

# Create folder for saving predictions
os.makedirs("predictions", exist_ok=True)

# Get next available ID for RGB
existing_rgb = glob.glob("predictions/rgb_result_*.jpg")
id_rgb = len(existing_rgb)
for i in pred_rgb:
    print("\n💾 Saving RGB prediction results...\n")
    cls = i.boxes.cls.tolist()
    conf = i.boxes.conf.tolist()
    boxes = i.boxes.xywhn.tolist()
    file = 'predictions/rgb_result_' + str(id_rgb) + '.jpg'
    i.save(filename=file)
    id_rgb += 1
    display(IPyImage(filename=file))

# Get next available ID for thermal
existing_thermal = glob.glob("predictions/thermal_result_*.jpg")
id_thermal = len(existing_thermal)
for i in pred_thermal:
     print("\n💾 Saving thermal prediction results...\n")
     cls = i.boxes.cls.tolist()
     conf = i.boxes.conf.tolist()
     boxes = i.boxes.xywhn.tolist()
     file = 'predictions/thermal_result_' + str(id_thermal) + '.jpg'
     i.save(filename=file)
     id_thermal += 1
     display(IPyImage(filename=file))

# Get next available ID for depth
existing_depth = glob.glob("predictions/depth_result_*.jpg")
id_depth = len(existing_depth)
for i in pred_depth:
     print("\n💾 Saving depth prediction results...\n")
     cls = i.boxes.cls.tolist()
     conf = i.boxes.conf.tolist()
     boxes = i.boxes.xywhn.tolist()
     file = 'predictions/depth_result_' + str(id_depth) + '.jpg'
     i.save(filename=file)
     id_depth += 1
     display(IPyImage(filename=file))

# 🔀🎥 **Apply late fusion with Non-Maximum Suppresion (RGB, Thermal, and Depth)**

In [None]:
LOGGER.setLevel(logging.CRITICAL)

CLASS_NAMES = {
    0: 'human'
}

# Define root directories for each modality
image_folder_rgb = "/content/MMHD-project/MID-3K/dataset/rgb/whole-dataset/rgb/images"
image_folder_thermal = "/content/MMHD-project/MID-3K/dataset/thermal/whole-dataset/thermal/images"
image_folder_depth = "/content/MMHD-project/MID-3K/dataset/depth/whole-dataset/depth/images"

def get_random_existing_triplet():
    """
    Selects a random image filename in one modality and checks if the same file exists
    in the other two modalities. This avoids scanning all three directories entirely
    Args:
        max_attempts (int): Number of tries before giving up

    Returns:
        tuple or None: (filename, full_rgb_path, full_thermal_path, full_depth_path)
    """
    # Randomly choose one base directory to sample from
    modality_dirs = {
        "rgb": image_folder_rgb,
        "thermal": image_folder_thermal,
        "depth": image_folder_depth
    }

    base_modality = random.choice(list(modality_dirs.keys()))
    base_folder = modality_dirs[base_modality]

    try:
        all_files = [f for f in os.listdir(base_folder) if f.endswith('.png')]
        if not all_files:
            print(f"⚠️ No images found in base modality folder: {base_modality}")
            return None

        for _ in range(20):
            candidate = random.choice(all_files)

            # Build full paths
            rgb_path = os.path.join(image_folder_rgb, candidate)
            thermal_path = os.path.join(image_folder_thermal, candidate)
            depth_path = os.path.join(image_folder_depth, candidate)

            if all(os.path.exists(p) for p in [rgb_path, thermal_path, depth_path]):
                return candidate, rgb_path, thermal_path, depth_path

        print(f"❌ Could not find a matching triplet after 20 attempts.")
        return None

    except Exception as e:
        print(f"❌ Error during selection: {e}")
        return None

def late_fusion_predictions_nms(filename, rgb_img_path, thermal_img_path, depth_img_path, save_name=None, show=True):
    """
    Performs late fusion using NMS on RGB, thermal, and depth predictions for the same image

    Args:
        filename (str): Image filename (e.g., '000123_...png')
        save_name (str): Optional path to save the fused output visualization
        show (bool): Whether to display the image with detections inside the notebook
    """

    # Run predictions for each model
    results_rgb = model_rgb(rgb_img_path)
    results_thermal = model_thermal(thermal_img_path)
    results_depth = model_depth(depth_img_path)

    # Extract boxes, scores, and classes
    def extract_info(result):
        return result[0].boxes.xyxy, result[0].boxes.conf, result[0].boxes.cls

    boxes_rgb, scores_rgb, cls_rgb = extract_info(results_rgb)
    boxes_thermal, scores_thermal, cls_thermal = extract_info(results_thermal)
    boxes_depth, scores_depth, cls_depth = extract_info(results_depth)

    # Concatenate detections from all modalities
    all_boxes = torch.cat([boxes_rgb, boxes_thermal, boxes_depth])
    all_scores = torch.cat([scores_rgb, scores_thermal, scores_depth])
    all_cls = torch.cat([cls_rgb, cls_thermal, cls_depth])

    # Apply Non-Maximum Suppression (NMS)
    nms_indices = torchvision.ops.nms(all_boxes, all_scores, iou_threshold=0.5)

    fused_boxes = all_boxes[nms_indices]
    fused_scores = all_scores[nms_indices]
    fused_cls = all_cls[nms_indices]

    # Define visualization function
    def draw_boxes_on_image(img_path, modality_name):
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        for box, score, cls_id in zip(fused_boxes, fused_scores, fused_cls):
            x1, y1, x2, y2 = map(int, box.tolist())
            class_idx = int(cls_id.item())
            class_name = CLASS_NAMES.get(class_idx, str(class_idx))
            label = f"{class_name} ({score.item():.2f})"
            cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
            font = cv2.FONT_HERSHEY_SIMPLEX
            font_scale = 0.5
            font_thickness = 1

            (text_width, text_height), baseline = cv2.getTextSize(label, font, font_scale, font_thickness)

            cv2.rectangle(
                img,
                (x1, y1 - text_height - 6),
                (x1 + text_width, y1),
                (255, 0, 0),
                -1
            )

            cv2.putText(
                img,
                label,
                (x1, y1 - 3),
                font,
                font_scale,
                (255, 255, 255),
                font_thickness,
                lineType=cv2.LINE_AA
            )
        return Image.fromarray(img)

    LATE_FUSION_DIR = os.path.join('/content', 'predictions', 'late_fusion', 'nms')
    if save_name:
        folder_path = os.path.join(LATE_FUSION_DIR, save_name)
        os.makedirs(folder_path, exist_ok=True)
    else:
        folder_path = None

    # Process all 3 modalities
    images = {}
    for modality, path in zip(['rgb', 'thermal', 'depth'], [rgb_img_path, thermal_img_path, depth_img_path]):
        img_pil = draw_boxes_on_image(path, modality)
        images[modality] = img_pil
        if save_name is not None:
            basename = os.path.splitext(save_name)[0]
            output_path = os.path.join(folder_path, f"{basename}_{modality}.png")
            img_pil.save(output_path)
            print(f"💾 Saved {basename}_{modality} in {output_path}")

    print()

    # Show all three modalities side-by-side
    if show:
        plt.figure(figsize=(15, 5))
        for idx, modality in enumerate(['rgb', 'thermal', 'depth']):
            plt.subplot(1, 3, idx + 1)
            plt.imshow(images[modality])
            plt.axis("off")
            plt.title(modality.upper(),  fontsize=20, fontweight='bold')
        plt.suptitle(f"NMS Fused Detection: {filename}", fontsize=24, fontweight='bold')
        plt.subplots_adjust(hspace=0.25, top=0.90)

        if save_name:
          figure_path = os.path.join(folder_path, f"{basename}_plot.png")
          plt.savefig(figure_path)
          print(f"📊 Saved plot at: {figure_path}\n")

        plt.show()

    images = {}
    for modality, path in zip(['rgb', 'thermal', 'depth'], [rgb_img_path, thermal_img_path, depth_img_path]):
        img_pil = draw_boxes_on_image(path, modality)
        images[modality] = img_pil

    return {
        "boxes": fused_boxes,
        "scores": fused_scores,
        "classes": fused_cls,
        "images": images
    }

result = get_random_existing_triplet()
if result:
    filename, rgb_path, thermal_path, depth_path = result
    late_fusion_predictions_nms(filename, rgb_path, thermal_path, depth_path, save_name='nms_test', show=True)


# 🔀🎥 **Apply late fusion with Voting (RGB, Thermal, and Depth)**

In [None]:
LOGGER.setLevel(logging.CRITICAL)

CLASS_NAMES = {0: 'human'}

# Paths
image_folder_rgb = "/content/MMHD-project/MID-3K/dataset/rgb/whole-dataset/rgb/images"
image_folder_thermal = "/content/MMHD-project/MID-3K/dataset/thermal/whole-dataset/thermal/images"
image_folder_depth = "/content/MMHD-project/MID-3K/dataset/depth/whole-dataset/depth/images"

def get_random_existing_triplet():
    """
    Selects a random image filename in one modality and checks if the same file exists
    in the other two modalities. This avoids scanning all three directories entirely
    Args:
        max_attempts (int): Number of tries before giving up

    Returns:
        tuple or None: (filename, full_rgb_path, full_thermal_path, full_depth_path)
    """
    # Randomly choose one base directory to sample from
    modality_dirs = {
        "rgb": image_folder_rgb,
        "thermal": image_folder_thermal,
        "depth": image_folder_depth
    }

    base_modality = random.choice(list(modality_dirs.keys()))
    base_folder = modality_dirs[base_modality]

    try:
        all_files = [f for f in os.listdir(base_folder) if f.endswith('.png')]
        if not all_files:
            print(f"⚠️ No images found in base modality folder: {base_modality}")
            return None

        for _ in range(20):
            candidate = random.choice(all_files)

            # Build full paths
            rgb_path = os.path.join(image_folder_rgb, candidate)
            thermal_path = os.path.join(image_folder_thermal, candidate)
            depth_path = os.path.join(image_folder_depth, candidate)

            if all(os.path.exists(p) for p in [rgb_path, thermal_path, depth_path]):
                return candidate, rgb_path, thermal_path, depth_path

        print(f"❌ Could not find a matching triplet after 20 attempts.")
        return None

    except Exception as e:
        print(f"❌ Error during selection: {e}")
        return None

def voting_fusion(boxes, scores, iou_thresh=0.5):
    fused_boxes, fused_scores, fused_classes = [], [], []
    used = torch.zeros(len(boxes)).bool()

    for i in range(len(boxes)):
        if used[i]:
            continue
        ious = box_iou(boxes[i].unsqueeze(0), boxes)[0]
        match_idxs = (ious > iou_thresh).nonzero(as_tuple=False).squeeze(1)
        cluster_boxes = boxes[match_idxs]
        cluster_scores = scores[match_idxs]

        # Weighted average of box coordinates
        if cluster_scores.sum() > 0:
            weights = cluster_scores / cluster_scores.sum()
        else:
            weights = torch.ones_like(cluster_scores) / len(cluster_scores)

        box_avg = (cluster_boxes.T * weights).T.sum(0)

        fused_boxes.append(box_avg)
        fused_scores.append(cluster_scores.mean())  # Could also use max
        fused_classes.append(0)  # Assuming single class

        used[match_idxs] = True

    if fused_boxes:
        return torch.stack(fused_boxes), torch.tensor(fused_scores), torch.tensor(fused_classes)
    else:
        return torch.empty((0, 4)), torch.tensor([]), torch.tensor([])

def late_fusion_predictions_voting(filename, rgb_img_path, thermal_img_path, depth_img_path, save_name=None, show=True):
    results_rgb = model_rgb(rgb_img_path)
    results_thermal = model_thermal(thermal_img_path)
    results_depth = model_depth(depth_img_path)

    def extract_info(result):
        return result[0].boxes.xyxy, result[0].boxes.conf, result[0].boxes.cls

    boxes_rgb, scores_rgb, _ = extract_info(results_rgb)
    boxes_thermal, scores_thermal, _ = extract_info(results_thermal)
    boxes_depth, scores_depth, _ = extract_info(results_depth)

    all_boxes = torch.cat([boxes_rgb, boxes_thermal, boxes_depth])
    all_scores = torch.cat([scores_rgb, scores_thermal, scores_depth])

    # Apply voting fusion
    fused_boxes, fused_scores, fused_cls = voting_fusion(all_boxes, all_scores, iou_thresh=0.5)

    # Define visualization function
    def draw_boxes_on_image(img_path, modality_name):
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        for box, score, cls_id in zip(fused_boxes, fused_scores, fused_cls):
            x1, y1, x2, y2 = map(int, box.tolist())
            class_idx = int(cls_id.item())
            class_name = CLASS_NAMES.get(class_idx, str(class_idx))
            label = f"{class_name} ({score.item():.2f})"
            cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
            font = cv2.FONT_HERSHEY_SIMPLEX
            font_scale = 0.5
            font_thickness = 1

            (text_width, text_height), baseline = cv2.getTextSize(label, font, font_scale, font_thickness)

            cv2.rectangle(
                img,
                (x1, y1 - text_height - 6),
                (x1 + text_width, y1),
                (255, 0, 0),
                -1
            )

            cv2.putText(
                img,
                label,
                (x1, y1 - 3),
                font,
                font_scale,
                (255, 255, 255),
                font_thickness,
                lineType=cv2.LINE_AA
            )
        return Image.fromarray(img)

    VOTE_DIR = os.path.join('/content', 'predictions', 'late_fusion', 'voting')
    if save_name:
        folder_path = os.path.join(VOTE_DIR, save_name)
        os.makedirs(folder_path, exist_ok=True)
    else:
        folder_path = None

    images = {}
    for modality, path in zip(['rgb', 'thermal', 'depth'], [rgb_img_path, thermal_img_path, depth_img_path]):
        img_pil = draw_boxes_on_image(path, modality)
        images[modality] = img_pil
        if save_name:
            basename = os.path.splitext(save_name)[0]
            output_path = os.path.join(folder_path, f"{basename}_{modality}.png")
            img_pil.save(output_path)

    images = {}
    for modality, path in zip(['rgb', 'thermal', 'depth'], [rgb_img_path, thermal_img_path, depth_img_path]):
        img_pil = draw_boxes_on_image(path, modality)
        images[modality] = img_pil
        if save_name is not None:
            basename = os.path.splitext(save_name)[0]
            output_path = os.path.join(folder_path, f"{basename}_{modality}.png")
            img_pil.save(output_path)
            print(f"\n💾 Saved {basename}_{modality} in {output_path}\n")

    if show:
        plt.figure(figsize=(15, 5))
        for idx, modality in enumerate(['rgb', 'thermal', 'depth']):
            plt.subplot(1, 3, idx + 1)
            plt.imshow(images[modality])
            plt.axis("off")
            plt.title(modality.upper(),  fontsize=20, fontweight='bold')
        plt.suptitle(f"Voting Fused Detection: {filename}", fontsize=24, fontweight='bold')
        plt.subplots_adjust(hspace=0.25, top=0.90)

        if save_name:
          figure_path = os.path.join(folder_path, f"{basename}_plot.png")
          plt.savefig(figure_path)
          print(f"📊 Saved plot at: {figure_path}\n")

        plt.show()

    return {
        "boxes": fused_boxes,
        "scores": fused_scores,
        "classes": fused_cls,
        "images": images
    }

# Run it
result = get_random_existing_triplet()
if result:
    filename, rgb_path, thermal_path, depth_path = result
    late_fusion_predictions_voting(filename, rgb_path, thermal_path, depth_path, save_name='voting_test', show=True)

# 🎯 **Comparison between the two late fusion techniques**

In [None]:
# Get a random triplet of RGB, thermal and depth
result = get_random_existing_triplet()
if result:
    filename, rgb_path, thermal_path, depth_path = result
    base_name = os.path.splitext(filename)[0]

    # Run NMS fusion
    nms_output = late_fusion_predictions_nms(
        filename,
        rgb_path,
        thermal_path,
        depth_path,
        #save_name='fusion_nms',
        show=False
    )

    # Run Voting fusion
    voting_output = late_fusion_predictions_voting(
        filename,
        rgb_path,
        thermal_path,
        depth_path,
        #save_name='fusion_voting',
        show=False
    )

    # Create figure with 2 rows (NMS, Voting) and 3 columns (RGB, Thermal, Depth)
    fig, axes = plt.subplots(2, 3, figsize=(18, 10))
    modalities = ['rgb', 'thermal', 'depth']

    for col, modality in enumerate(modalities):
        # Row 0: NMS
        axes[0, col].imshow(nms_output['images'][modality])
        axes[0, col].set_title(f"{modality.upper()} – NMS", fontsize=20, fontweight='bold')
        axes[0, col].axis("off")

        # Row 1: Voting
        axes[1, col].imshow(voting_output['images'][modality])
        axes[1, col].set_title(f"{modality.upper()} – Voting", fontsize=20, fontweight='bold')
        axes[1, col].axis("off")

    plt.suptitle(f"Late Fusion Comparison: {filename}", fontsize=24, fontweight='bold')
    plt.subplots_adjust(hspace=0.25, top=0.90)

    # Save the figure
    comparison_dir = "/content/predictions/late_fusion/comparison"
    os.makedirs(comparison_dir, exist_ok=True)
    comparison_path = os.path.join(comparison_dir, f"{base_name}_comparison.png")
    plt.savefig(comparison_path)
    print(f"\n💾 Comparison figure saved at: {comparison_path}\n")

    plt.show()


# 📁🧾 **ZIP RML-project-MMHD folder and create RML-project-MMHD.zip**

In [None]:
folder_to_zip = '/content/RML-project-MMHD'

output_filename = '/content/RML-project-MMHD.zip'

shutil.make_archive(output_filename.replace('.zip', ''), 'zip', folder_to_zip)

print(f'Folder {folder_to_zip} was compacted as {output_filename}')

# 💾🖨️ **Save project to Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

OUTPUT_DIR = "/content/drive/MyDrive/RML-2025"
os.makedirs(OUTPUT_DIR, exist_ok=True)

paths_to_save = [
    "/content/RML-project-MMHD",
    "/content/MMHD-project/MID-3K/dataset/rgb/train",
    "/content/MMHD-project/MID-3K/dataset/rgb/val",
    "/content/MMHD-project/MID-3K/dataset/rgb/test",
    "/content/MMHD-project/MID-3K/dataset/thermal/train",
    "/content/MMHD-project/MID-3K/dataset/thermal/val",
    "/content/MMHD-project/MID-3K/dataset/thermal/test",
    "/content/MMHD-project/MID-3K/dataset/depth/train",
    "/content/MMHD-project/MID-3K/dataset/depth/val",
    "/content/MMHD-project/MID-3K/dataset/depth/test",
    "/content/predictions",
    "/content/RML-project-MMHD.zip",
    "/content/yolov8s.pt",
    "/content/yolo11n.pt"
]

print(f"📦 Copying {len(paths_to_save)} files to Drive...\n")
for src_path in tqdm(paths_to_save, desc="🔄 Copying", unit=" file"):
    dst_path = os.path.join(OUTPUT_DIR, os.path.basename(src_path))

    try:
        if os.path.isdir(src_path):
            if os.path.exists(dst_path):
                print(f"⚠️ Overwriting folder: {dst_path}")
                shutil.rmtree(dst_path)
            shutil.copytree(src_path, dst_path)
        elif os.path.isfile(src_path):
            if os.path.exists(dst_path):
                print(f"⚠️ Overwriting file: {dst_path}")
            shutil.copy2(src_path, dst_path)
        print(f"✔️ Copied: {src_path}")
    except Exception as e:
        print(f"❌ Failed to copy {src_path}: {e}")

print(f"\n✅ Content in /content/ was saved in: {OUTPUT_DIR}")

# 💾🖨️ **Save late fusion files to Drive**

In [None]:
drive.mount('/content/drive')

source_folder = '/content/predictions/late_fusion'

destination_folder = '/content/drive/MyDrive/RML-2025/RML-project-MMHD/predictions/late_fusion'

os.makedirs(destination_folder, exist_ok=True)

print()

for root, dirs, files in os.walk(source_folder):
    if '.ipynb_checkpoints' in root:
        continue

    relative_path = os.path.relpath(root, source_folder)
    dest_dir = os.path.join(destination_folder, relative_path)
    os.makedirs(dest_dir, exist_ok=True)

    for file in files:
        src_file = os.path.join(root, file)
        dest_file = os.path.join(dest_dir, file)

        if not os.path.exists(dest_file):
            shutil.copy2(src_file, dest_file)
            print(f'Copied file: {os.path.relpath(dest_file, destination_folder)}')
        else:
            print(f'File already exists, skipping: {os.path.relpath(dest_file, destination_folder)}')

# 📌🎉✅ **ALL DONE!**