In [13]:
from ultralytics import YOLO
import numpy as np
import cv2
import time

In [14]:
def enhance_visible_with_infrared(visible_img, infrared_img):

    visible_lab = cv2.cvtColor(visible_img, cv2.COLOR_BGR2LAB)
    infrared_img = cv2.resize(infrared_img, (visible_img.shape[1], visible_img.shape[0]))

    # Extract the L channel from the LAB color space
    l_channel = visible_lab[:, :, 0]

    # Enhance the L channel using the infrared image
    enhanced_l_channel = cv2.addWeighted(l_channel, 0.5, infrared_img, 0.5, 0)

    # Replace the original L channel with the enhanced one
    visible_lab[:, :, 0] = enhanced_l_channel

    # Convert the enhanced LAB image back to BGR color space
    enhanced_visible_img = cv2.cvtColor(visible_lab, cv2.COLOR_LAB2BGR)
    cv2.imwrite('images/enhanced_visible.jpg', enhanced_visible_img)
    
    return enhanced_visible_img

In [15]:
def preprocess_img(img):
    # denoise the image
    img = cv2.fastNlMeansDenoisingColored(img, None, 1, 1, 5, 21)
    
    # Convert the image from BGR color space to LAB color space
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2Lab)
    
    # Split the LAB image into L, A and B channels
    l, a, b = cv2.split(lab)
    
    # Apply CLAHE to L channel
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    cl = clahe.apply(l)
    
    # Merge the CLAHE enhanced L channel with the original A and B channel
    enhanced_lab = cv2.merge((cl,a,b))
    
    # Convert the image back to BGR format
    img = cv2.cvtColor(enhanced_lab, cv2.COLOR_Lab2BGR)
    
    cv2.imwrite('images/processed_image.jpg', img)
    return img

In [16]:
visible_model = YOLO('models/visible.pt')
thermal_model = YOLO('models/thermal.pt')

infrared_img = cv2.imread("images/infrared.jpg", cv2.IMREAD_GRAYSCALE)
visible_img = cv2.imread("images/visible.jpg")
thermal_img = cv2.imread("images/thermal.jpg")

visible_img = enhance_visible_with_infrared(visible_img,infrared_img)
visible_img = preprocess_img(visible_img)

In [17]:
results_visible = visible_model.predict(source=visible_img, save=True)
results_thermal = thermal_model.predict(source=thermal_img, save=True)


0: 480x640 12 persons, 2 bicycles, 1 motorcycle, 1 bench, 4 umbrellas, 1 chair, 352.1ms
Speed: 6.0ms preprocess, 352.1ms inference, 1573.7ms postprocess per image at shape (1, 3, 480, 640)
Results saved to [1mD:\softwares\vscode\python\ensembledmodel\runs\detect\predict55[0m

0: 480x640 (no detections), 336.6ms
Speed: 2.0ms preprocess, 336.6ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)
Results saved to [1mD:\softwares\vscode\python\ensembledmodel\runs\detect\predict56[0m


In [18]:
print("number of detections in thermal = " + str(len(results_thermal[0].boxes)))
print("number of detections in visible = " + str(len(results_visible[0].boxes)))    

number of detections in thermal = 0
number of detections in visible = 21


In [19]:
boxes_list = []
scores_list = []
labels_list = []

# bicycle in infra = 0 change it to 1
# car in infra = 1 change it to 2
# person in infra = 2 change it to 0

# bicycle in thermal = 0 change it to 1
# car in thermal = 1 change it 2
# person in thermal = 3 change it to 0
# dog in thermal = 2 change it to 17 

def relabel(model,c):
    if(model == 'visible'):
        if(c == 7): return 2
        return c
    if(model == 'infrared'):
        if(c == 0): return 1
        if(c == 1): return 2
        if(c == 2): return 0
        
    if(c == 0): return 1
    if(c == 1): return 2
    if(c == 3): return 0
    if(c == 2): return 17
   
def combine_result(result,model = 'visible'):
    cur_boxes_list = []
    cur_scores_list = []
    cur_labels_list = []
    
    for r in result:
        boxes = r.boxes
        for box in boxes:   
            b = box.xyxy[0] 
            c = box.cls
            score = box.conf
            if(score > 0.3):
                cur_boxes_list.append(b)
                cur_scores_list.append(score)
                cur_labels_list.append(relabel(model,c))
                
    boxes_list.append(cur_boxes_list)
    scores_list.append(cur_scores_list)
    labels_list.append(cur_labels_list)
    
combine_result(results_thermal,'thermal')
combine_result(results_visible)        

In [20]:
print(boxes_list[1])

[tensor([7.1144e-02, 1.6232e+02, 3.6668e+01, 2.3969e+02]), tensor([193.6515, 112.7302, 213.1233, 176.4601]), tensor([271.1494, 112.9939, 308.6399, 201.9917]), tensor([2.6978e-02, 1.3226e+02, 3.0321e+01, 1.7920e+02]), tensor([121.3488, 137.1816, 151.6475, 198.5643]), tensor([ 83.4306, 134.1086, 110.8255, 191.0008]), tensor([220.6516, 115.1308, 233.8808, 156.9962]), tensor([156.2414, 108.9907, 177.3053, 116.3447]), tensor([176.8253, 111.7766, 191.1967, 117.5332]), tensor([ 56.1848, 127.5705,  88.3339, 166.0059]), tensor([166.1029, 115.8994, 174.7498, 129.5180]), tensor([177.8506, 115.2158, 186.7646, 139.9413]), tensor([108.0314, 102.6780, 149.1077, 114.6645]), tensor([ 86.0397, 113.3627,  99.1617, 134.6866]), tensor([120.6128, 114.8236, 156.2406, 182.9259]), tensor([ 50.2323, 140.8474,  86.4843, 192.4600]), tensor([157.6383, 116.0931, 166.6913, 135.9849])]


In [21]:
def prefilter_boxes(boxes, scores, labels, weights, thr):
 
    new_boxes = dict()

    for t in range(len(boxes)):

        if len(boxes[t]) != len(scores[t]):
            print('Error. Length of boxes arrays not equal to length of scores array: {} != {}'.format(len(boxes[t]), len(scores[t])))
            exit()

        if len(boxes[t]) != len(labels[t]):
            print('Error. Length of boxes arrays not equal to length of labels array: {} != {}'.format(len(boxes[t]), len(labels[t])))
            exit()

        for j in range(len(boxes[t])):
            score = scores[t][j]
            if score < thr:
                continue
            label = int(labels[t][j])
            box_part = boxes[t][j]
            x1 = float(box_part[0])
            y1 = float(box_part[1])
            x2 = float(box_part[2])
            y2 = float(box_part[3])
          
            b = [int(label), float(score) * weights[t], weights[t], t, x1, y1, x2, y2]
            if label not in new_boxes:
                new_boxes[label] = []
            new_boxes[label].append(b)

    for k in new_boxes:
        current_boxes = np.array(new_boxes[k])
        new_boxes[k] = current_boxes[current_boxes[:, 1].argsort()[::-1]]

    return new_boxes


def get_weighted_box(boxes, conf_type):

    box = np.zeros(8, dtype=np.float32)
    conf = 0
    conf_list = []
    w = 0
    for b in boxes:
        box[4:] += (b[1] * b[4:])
        conf += b[1]
        conf_list.append(b[1])
        w += b[2]
    box[0] = boxes[0][0]
    box[1] = np.array(conf_list).max()
    box[2] = w
    box[3] = -1 
    box[4:] /= conf
    return box


def find_matching_box_fast(boxes_list, new_box, match_iou):

    def bb_iou_array(boxes, new_box):
        # bb interesection over union
        xA = np.maximum(boxes[:, 0], new_box[0])
        yA = np.maximum(boxes[:, 1], new_box[1])
        xB = np.minimum(boxes[:, 2], new_box[2])
        yB = np.minimum(boxes[:, 3], new_box[3])

        interArea = np.maximum(xB - xA, 0) * np.maximum(yB - yA, 0)

        # compute the area of both the prediction and ground-truth rectangles
        boxAArea = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
        boxBArea = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1])

        iou = interArea / (boxAArea + boxBArea - interArea)

        return iou

    if boxes_list.shape[0] == 0:
        return -1, match_iou

    # boxes = np.array(boxes_list)
    boxes = boxes_list

    ious = bb_iou_array(boxes[:, 4:], new_box[4:])

    ious[boxes[:, 0] != new_box[0]] = -1

    best_idx = np.argmax(ious)
    best_iou = ious[best_idx]

    if best_iou <= match_iou:
        best_iou = match_iou
        best_idx = -1

    return best_idx, best_iou


def weighted_boxes_fusion(
        boxes_list,
        scores_list,
        labels_list,
        weights=None,
        iou_thr=0.1,
        skip_box_thr=0.3,
        conf_type='max',
        allows_overflow=False
):

    if weights is None:
        weights = np.ones(len(boxes_list))
    
    weights = np.array(weights)

    filtered_boxes = prefilter_boxes(boxes_list, scores_list, labels_list, weights, skip_box_thr)
    if len(filtered_boxes) == 0:
        return np.zeros((0, 4)), np.zeros((0,)), np.zeros((0,))

    overall_boxes = []
    for label in filtered_boxes:
        boxes = filtered_boxes[label]
        new_boxes = []
        weighted_boxes = np.empty((0, 8))

        # Clusterize boxes
        for j in range(0, len(boxes)):
            index, best_iou = find_matching_box_fast(weighted_boxes, boxes[j], iou_thr)
            if index != -1:
                boxes[index][1] = np.minimum(boxes[index][1] * 1.15 * boxes[j][2],0.99)
                new_boxes[index].append(boxes[index])
                weighted_boxes[index] = get_weighted_box(new_boxes[index], conf_type)
            else:
                new_boxes.append([boxes[j].copy()])
                weighted_boxes = np.vstack((weighted_boxes, boxes[j].copy()))

        # Rescale confidence based on number of models and boxes
        for i in range(len(new_boxes)):
            weighted_boxes[i, 1] = weighted_boxes[i, 1] / weights.max()
            
        overall_boxes.append(weighted_boxes)
    overall_boxes = np.concatenate(overall_boxes, axis=0)
    overall_boxes = overall_boxes[overall_boxes[:, 1].argsort()[::-1]]
    boxes = overall_boxes[:, 4:]
    scores = overall_boxes[:, 1]
    labels = overall_boxes[:, 0]
    return boxes, scores, labels

In [22]:
start_time = time.time()
boxes,scores,labels = weighted_boxes_fusion(boxes_list,scores_list,labels_list)
end_time = time.time()
execution_time = end_time - start_time

In [23]:
print(execution_time * 1000)
print(len(boxes))

2.9926300048828125
17


In [24]:
colors = [(0, 255, 0), (0, 0, 255), (0, 255, 255)]  # Green, Red, yellow
final_image_choice = "visible"
image = cv2.imread('images/'+ final_image_choice + '.jpg')

for i in range(scores.size):
    box = boxes[i].astype(np.int32)
    label = labels[i]
    score = scores[i]
    cv2.rectangle(image, (box[0], box[1]), (box[2], box[3]), colors[int(label % len(colors))], 2)
    label_text = f'{label}: {score:.2f}'
    cv2.putText(image, label_text, (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, colors[int(label % len(colors))], 2)
    
cv2.imshow('Image', image)
cv2.waitKey(0)
cv2.imwrite('final_image.jpg', image)
cv2.destroyAllWindows()