# ensemble model - Combining U-Net and Mask R-CNN

This notebook implements an ensemble approach that combines predictions from both U-Net and Mask R-CNN models for improved traffic sign detection.

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
import cv2
import matplotlib.pyplot as plt
import pandas as pd
import json
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score

tf.random.set_seed(42)
np.random.seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU available: {len(tf.config.list_physical_devices('GPU')) > 0}")


In [None]:
DATA_PATH = "../data/car"
UNET_MODEL_PATH = "../data/models/full_dataset_model_traffic_sign_unet.h5"
MASKRCNN_MODEL_PATH = "final_maskrcnn_model.h5"
RESULTS_PATH = "../data/results"

UNET_IMG_SIZE = (256, 256)
MASKRCNN_IMG_SIZE = (128, 128)

os.makedirs(RESULTS_PATH, exist_ok=True)

print(f"Data path exists: {os.path.exists(DATA_PATH)}")
print(f"U-Net model exists: {os.path.exists(UNET_MODEL_PATH)}")
print(f"Mask R-CNN model exists: {os.path.exists(MASKRCNN_MODEL_PATH)}")


def load_yolo_annotations(img_path, img_shape):
    annotation_path = img_path.replace('images', 'labels').replace('.jpg', '.txt').replace('.png', '.txt')
    mask = np.zeros(img_shape[:2], dtype=np.uint8)
    
    if os.path.exists(annotation_path):
        try:
            with open(annotation_path, 'r') as f:
                lines = f.readlines()
            for line in lines:
                data = line.strip().split()
                if len(data) >= 5:
                    x_center, y_center, width, height = map(float, data[1:5])
                    h, w = img_shape[:2]
                    x_center *= w
                    y_center *= h
                    width *= w
                    height *= h
                    x1 = max(0, int(x_center - width/2))
                    y1 = max(0, int(y_center - height/2))
                    x2 = min(w, int(x_center + width/2))
                    y2 = min(h, int(y_center + height/2))
                    if x2 > x1 and y2 > y1:
                        mask[y1:y2, x1:x2] = 1
        except Exception as e:
            print(f"Error processing {annotation_path}: {e}")
    return mask

def load_test_data(data_path, img_size):
    test_path = os.path.join(data_path, 'test', 'images')
    image_files = sorted([f for f in os.listdir(test_path) if f.endswith(('.jpg', '.png'))])
    
    images = []
    masks = []
    filenames = []
    
    for filename in tqdm(image_files, desc="Loading test data"):
        img_path = os.path.join(test_path, filename)
        image = cv2.imread(img_path)
        if image is None:
            continue
        
        original_shape = image.shape
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image_resized = cv2.resize(image_rgb, img_size)
        image_normalized = image_resized.astype(np.float32) / 255.0
        
        mask = load_yolo_annotations(img_path, original_shape)
        mask_resized = cv2.resize(mask, img_size, interpolation=cv2.INTER_NEAREST)
        
        images.append(image_normalized)
        masks.append(mask_resized)
        filenames.append(filename)
    
    return np.array(images), np.array(masks), filenames

print("Data loading functions defined.")



- `calculate_iou()` - Computes Intersection over Union for a single image prediction
- `calculate_metrics()` - Returns precision, recall, F1-score, and accuracy for a single prediction
- `evaluate_predictions()` - Batch evaluation that returns mean metrics with standard deviations


In [None]:
def calculate_iou(y_true, y_pred, threshold=0.5):
    y_pred_binary = (y_pred > threshold).astype(np.float32)
    y_true_binary = y_true.astype(np.float32)
    
    intersection = np.sum(y_true_binary * y_pred_binary)
    union = np.sum(y_true_binary) + np.sum(y_pred_binary) - intersection
    
    if union == 0:
        return 1.0 if np.sum(y_true_binary) == 0 else 0.0
    return intersection / union

def calculate_metrics(y_true, y_pred, threshold=0.5):
    y_true_flat = y_true.flatten().astype(np.int32)
    y_pred_flat = (y_pred.flatten() > threshold).astype(np.int32)
    
    if np.sum(y_true_flat) == 0 and np.sum(y_pred_flat) == 0:
        return 1.0, 1.0, 1.0, 1.0
    
    tp = np.sum((y_true_flat == 1) & (y_pred_flat == 1))
    fp = np.sum((y_true_flat == 0) & (y_pred_flat == 1))
    fn = np.sum((y_true_flat == 1) & (y_pred_flat == 0))
    tn = np.sum((y_true_flat == 0) & (y_pred_flat == 0))
    
    precision = tp / (tp + fp + 1e-7)
    recall = tp / (tp + fn + 1e-7)
    f1 = 2 * precision * recall / (precision + recall + 1e-7)
    accuracy = (tp + tn) / (tp + tn + fp + fn + 1e-7)
    
    return precision, recall, f1, accuracy

def evaluate_predictions(y_true_batch, y_pred_batch, threshold=0.5):
    ious, precisions, recalls, f1s, accuracies = [], [], [], [], []
    
    for y_true, y_pred in zip(y_true_batch, y_pred_batch):
        iou = calculate_iou(y_true, y_pred, threshold)
        precision, recall, f1, accuracy = calculate_metrics(y_true, y_pred, threshold)
        ious.append(iou)
        precisions.append(precision)
        recalls.append(recall)
        f1s.append(f1)
        accuracies.append(accuracy)
    
    return {
        'iou': np.mean(ious),
        'precision': np.mean(precisions),
        'recall': np.mean(recalls),
        'f1_score': np.mean(f1s),
        'accuracy': np.mean(accuracies),
        'iou_std': np.std(ious),
        'f1_std': np.std(f1s)
    }

print("Metric functions defined.")


Combines U-Net and Mask R-CNN predictions using multiple fusion strategies:

- `weighted_average` - Blends predictions with tunable weights (default 60% U-Net, 40% Mask R-CNN)
- `intersection` - High-confidence regions where both models agree
- `union` - Conservative approach that captures all detections from either model
- `confidence_based` - Weights each pixel by how confident each model is
- `voting` - Detects if at least one model predicts positive


class EnsembleModel:
    def __init__(self, unet_model, maskrcnn_model, unet_size=(256, 256), maskrcnn_size=(128, 128)):
        self.unet = unet_model
        self.maskrcnn = maskrcnn_model
        self.unet_size = unet_size
        self.maskrcnn_size = maskrcnn_size
        self.output_size = unet_size
    
    def _prepare_for_unet(self, images):
        if images.shape[1:3] != self.unet_size:
            resized = np.array([cv2.resize(img, self.unet_size) for img in images])
            return resized
        return images
    
    def _prepare_for_maskrcnn(self, images):
        if images.shape[1:3] != self.maskrcnn_size:
            resized = np.array([cv2.resize(img, self.maskrcnn_size) for img in images])
            return resized
        return images
    
    def _resize_predictions(self, predictions, target_size):
        resized = np.array([cv2.resize(pred, target_size) for pred in predictions])
        return resized
    
    def predict_unet(self, images):
        prepared = self._prepare_for_unet(images)
        preds = self.unet.predict(prepared, verbose=0)
        if preds.shape[-1] == 1:
            preds = preds[:, :, :, 0]
        return preds
    
    def predict_maskrcnn(self, images):
        prepared = self._prepare_for_maskrcnn(images)
        preds = self.maskrcnn.predict(prepared, verbose=0)
        if preds.shape[-1] == 1:
            preds = preds[:, :, :, 0]
        preds_resized = self._resize_predictions(preds, self.output_size)
        return preds_resized
    
    def predict(self, images, strategy='weighted_average', weights=(0.6, 0.4)):
        unet_preds = self.predict_unet(images)
        mrcnn_preds = self.predict_maskrcnn(images)
        
        if strategy == 'weighted_average':
            return weights[0] * unet_preds + weights[1] * mrcnn_preds
        
        elif strategy == 'intersection':
            unet_binary = (unet_preds > 0.5).astype(np.float32)
            mrcnn_binary = (mrcnn_preds > 0.5).astype(np.float32)
            return unet_binary * mrcnn_binary
        
        elif strategy == 'union':
            return np.maximum(unet_preds, mrcnn_preds)
        
        elif strategy == 'confidence_based':
            unet_conf = np.abs(unet_preds - 0.5)
            mrcnn_conf = np.abs(mrcnn_preds - 0.5)
            total_conf = unet_conf + mrcnn_conf + 1e-7
            return (unet_preds * unet_conf + mrcnn_preds * mrcnn_conf) / total_conf
        
        elif strategy == 'voting':
            unet_binary = (unet_preds > 0.5).astype(np.float32)
            mrcnn_binary = (mrcnn_preds > 0.5).astype(np.float32)
            votes = unet_binary + mrcnn_binary
            return (votes >= 1).astype(np.float32)
        
        else:
            raise ValueError(f"Unknown strategy: {strategy}")
    
    def get_all_strategies(self):
        return ['weighted_average', 'intersection', 'union', 'confidence_based', 'voting']

print("EnsembleModel class defined.")


In [None]:
print("Loading models...")
unet_model = keras.models.load_model(UNET_MODEL_PATH)
print(f"U-Net loaded: {unet_model.input_shape} -> {unet_model.output_shape}")

maskrcnn_model = keras.models.load_model(MASKRCNN_MODEL_PATH)
print(f"Mask R-CNN loaded: {maskrcnn_model.input_shape} -> {maskrcnn_model.output_shape}")

print("\nLoading test data...")
X_test, y_test, filenames = load_test_data(DATA_PATH, UNET_IMG_SIZE)
print(f"Test data loaded: {X_test.shape[0]} images, shape {X_test.shape[1:]}")

ensemble = EnsembleModel(
    unet_model=unet_model,
    maskrcnn_model=maskrcnn_model,
    unet_size=UNET_IMG_SIZE,
    maskrcnn_size=MASKRCNN_IMG_SIZE
)
print("\nEnsemble model initialized.")