In [None]:
import torch
def IOU(box1, box2):
    """
    args:
        box1 (batch_size, N, 1, 5): Golden label box
        box2 (batch_size, N, B, 5): All boxes of that cell 
    """
    if isinstance(box1, list):
        box1 = torch.tensor(box1, dtype=float)

    if isinstance(box2, list):
        box2 = torch.tensor(box2, dtype=float)

    _, x_mid1, y_mid1, width1, height1 = box1.unbind(dim=-1) # Removes one dimension and enumerates the resulting tensor. dim = -1 will correspond to the columns
    _, x_mid2, y_mid2, width2, height2 = box2.unbind(dim=-1)

    x11, y11, x21, y21 = get_x1y1x2y2(x_mid1, y_mid1, width1, height1)
    x12, y12, x22, y22 = get_x1y1x2y2(x_mid2, y_mid2, width2, height2)

    dx = torch.minimum(x21, x22) - torch.maximum(x11, x12)
    dy = torch.minimum(y21, y22) - torch.maximum(y11, y12)
    dx = (dx > 0) * dx # Mask if dx < 0 -> No intersection because of x-axis
    dy = (dy > 0) * dy # Mask if dy < 0 -> No intersection because of y-axis
    area_intersection = dx * dy
    
    area_union = width1 * height1 + width2 * height2 - area_intersection
    clipped_area_union = torch.clamp(area_union, min=1e-6)

    return area_intersection/clipped_area_union

def get_x1y1x2y2(x_mid, y_mid, width, height):
    half_height = height / 2
    half_width = width / 2
    
    return x_mid - half_width, y_mid - half_height, x_mid + half_width, y_mid + half_height

In [2]:
import torch
import torch.nn as nn
class YOLO(torch.nn.Module):
    def __init__(self, S=7, B=2, C=200, slope=0.1, dropout=0.5):
        """
        Args:
            S (int): Dimensions for the final grid SxS
            B (int): Number of boxes per position on the grid
            C (int): Number of classes 
            slope (float): Negative slope in LeakyRELU
            dropout (float): Dropout Probability
        """
        
        super().__init__()
        
        # Initializing attributes
        self.S = S
        self.B = B
        self.C = C

        # Block 1
        conv1 = self.conv_block(3, [7], [64], [2], [2], [2], slope)

        # Block 2
        conv2 = self.conv_block(64, [3], [192], [1], [2], [2], slope)

        # Block 3
        conv3 = self.conv_block(192, [1, 3, 1, 3], [128, 256, 256, 512], [1] * 4, [2], [2], slope)
    
        # Block 4
        conv4 = self.conv_block(512, [1, 3] * 5, [256, 512] * 4 + [512, 1024], [1] * 10, [2], [2], slope)

        # Block 5
        #conv5 = self.conv_block(1024, [1, 3] * 2 + [3, 3], [512, 1024] * 2 + [1024] * 2, [1] * 5 + [2], [], [], slope)

        # Block 6
        #conv6 = self.conv_block(1024, [3, 3], [1024, 1024], [1, 1], [], [], slope)

        # Flatten layer
        flatten = nn.Flatten()

        # First Dense Layer - Assumes images are 448x448x3, since it is faithful to the original YOLO
        linear1 = nn.Linear(7*7*1024, 4096)

        # Activation function after Linear Layer
        act = nn.LeakyReLU(slope, inplace=True)

        # Dropout
        drop = nn.Dropout(dropout)
        
        # Last Dense Layer
        linear2 = nn.Linear(4096, S*S*(B*5 + C))

        # Create the layers object
        self.layers = nn.Sequential(*conv1,
                                    *conv2,
                                    *conv3,
                                    *conv4,
                                    flatten,
                                    linear1,
                                    act,
                                    drop,
                                    linear2
                                    )
        
        
    def conv_block(self, start_channels, size_conv, out_channels, stride_conv, size_pool, stride_pool, slope):
        """
        Args:
            start_channels (int): Number of channels of first input.
            out_channels (List[int]): Number of kernels for each convlutional layer
            size_conv (List[int]): Filter sizes for each convolutional layer.
            stride_conv (List[int]): Stride values for each convolutional layer.
            size_pool (List[int]): Size for the single pooling layer (if exists)
            stride_pool (List[int]): Stride for the single pooling layer (if exists)
            slope (float): Negative slope in LeakyRELU
        
        Output:
            layers (List[nn.Module]): List with all the layers of the block
        """
        layers = []
        in_channels = [start_channels] + out_channels[:-1]
        for inp, out, size, stride in zip(in_channels, out_channels, size_conv, stride_conv):
            layers.append(nn.Conv2d(inp, out, size, stride, size//2))
            layers.append(nn.LeakyReLU(slope))

        for size, stride in zip(size_pool, stride_pool):
            layers.append(nn.MaxPool2d(size, stride))
        
        return layers

    def forward(self, x):
        batch_size = x.shape[0]
        for layer in self.layers:
            x = layer(x)

        return torch.reshape(x, (batch_size, self.S, self.S, self.B*5 + self.C))

    def predict(self, output, IOU_threshold=0.8, conf_threshold=0.0):
        """
        Args:
            output (batch_size, S, S, 5*B + C)

        Output:
            output_boxes (batch_size, S*S*B, 6)
        """
        # Get device
        device = output.device

        # First Step - Get the relevant information from the boxes
        num_grid = self.S * self.S
        num_boxes = num_grid * self.B

        batch_size = output.shape[0]

        boxes = output[:, :, :, :5*self.B].reshape((batch_size, num_boxes, 5)) # (batch_size, S*S*B, 5)
        logits = output[:, :, :, 5*self.B:].reshape((batch_size, num_grid, self.C)) # (batch_size, S*S, 200)
        classes = torch.softmax(logits, dim=-1)

        conf, x_mid, y_mid, width, height = boxes.unbind(dim=-1) # (batch_size, S*S*B)
        x_mid, y_mid = self.grid2img(x_mid, y_mid) # Convert relative to grid to relative to image

        class_prob, class_idx = torch.max(classes, dim=-1) # (batch_size, S*S)
        
        class_prob_repeated = class_prob.unsqueeze(-1).expand(batch_size, self.S * self.S, self.B)\
            .reshape((batch_size, num_boxes)) # (batch_size, S*S, B) -> (batch_size, S*S*B)
        
        class_idx_repeated = class_idx.unsqueeze(-1).expand(batch_size, self.S * self.S, self.B)\
            .reshape((batch_size, num_boxes)) # (batch_size, S*S, B) -> (batch_size, S*S*B)
        
        new_conf = torch.mul(conf, class_prob_repeated)  # (batch_size, S*S*B)

        new_conf, x_mid, y_mid, width, height, class_idx_repeated = map(lambda z: z.unsqueeze(-1), \
                                                                        [new_conf, x_mid, y_mid, width, height, class_idx_repeated])
        
        x_mid, y_mid, width, height = map(lambda x: torch.clamp(x, min=0, max=1), [x_mid, y_mid, width, height])

        output_boxes = torch.cat((new_conf, x_mid, y_mid, width, height, class_idx_repeated), dim=-1) # (batch_size, S*S*B, 6)


        # Second step - Sort by conf
        sorted_conf, sorted_idx = torch.sort(new_conf.squeeze(-1), dim=-1, descending=True) # (batch_size, num_boxes)
        batch_idx = torch.arange(batch_size).unsqueeze(1).expand(batch_size, num_boxes)
        sorted_boxes = output_boxes[batch_idx, sorted_idx, :]
        

        # Third Step - Non Maximum Supression (NMS)
        all_mask = torch.ones((batch_size, num_boxes)).to(device)
        for i in range(num_boxes-1):
            curr_boxes = sorted_boxes[:, i, :-1] # (batch_size, 5)
            other_boxes = sorted_boxes[:, i+1:, :-1] # (batch_size, num_boxes-i, 5)
            IOU_results = IOU(curr_boxes.unsqueeze(1), other_boxes)
            mask = IOU_results <= IOU_threshold
            is_masked = all_mask[:, i].unsqueeze(-1)
            all_mask[:, i+1:] = (1 - is_masked + is_masked * mask) * all_mask[:, i+1:] 


        # Fourth Step - Threshold for confidence and get Final Boxes
        result = [[] for _ in range(batch_size)]
        for batch in range(batch_size):
            for box in range(num_boxes):
               if sorted_conf[batch, box] >= conf_threshold:
                   if all_mask[batch, box]:
                        result[batch].append(tuple(sorted_boxes[batch, box, :].cpu().detach().tolist()))
               else:
                   break


        return result


    def grid2img(self, x, y):
        """
        Args:
            x (batch_size, S*S*B)
            y (batch_size, S*S*B)
        """
        batch_size = x.shape[0] 
        device = x.device
        new_dim = self.S * self.S * self.B
        dx = torch.arange(self.S).unsqueeze(0).unsqueeze(0).unsqueeze(-1).expand(batch_size, self.S, self.S, self.B).reshape(batch_size, new_dim).to(device)
        dy = torch.arange(self.S).unsqueeze(0).unsqueeze(-1).unsqueeze(-1).expand(batch_size, self.S, self.S, self.B).reshape(batch_size, new_dim).to(device)
        inv_S = 1/self.S

        return (x + dx) * inv_S, (y + dy) * inv_S

In [None]:
import os
import cv2
import torch
import matplotlib.pyplot as plt
import torchvision.transforms as T
import numpy as np
from PIL import Image

# Cuda
device = "cuda" if torch.cuda.is_available() else "cpu"

# Get the weights
weights_path = "best_model.pth"
weights = torch.load(weights_path)

# Load model
model = YOLO().to(device)
model.load_state_dict(weights)

# Transformation
transform = T.Compose([
    T.Resize((224, 224
    )),
    T.ToTensor(),
])


def get_images_from_folder(folder_path, extensions=(".jpeg")):
    return [
        os.path.join(folder_path, f)
        for f in os.listdir(folder_path)
        if f.lower().endswith(extensions)
    ]


def predict_and_show_inline(image_path):
    pil_img = Image.open(image_path).convert("RGB")
    np_img = np.array(pil_img)  
    image_tensor = transform(pil_img).to(device).unsqueeze(0)

    output = model(image_tensor)
    predictions = model.predict(output)[0] 

    count = 0

    for pred in predictions:
        conf, xm, ym, w, h, cls_id = pred

        xmin = int(xm - w / 2)
        ymin = int(ym - h / 2)
        xmax = int(xm + w / 2)
        ymax = int(ym + h / 2)

        label = f"{int(cls_id)} {conf:.2f}"

        # Draw bounding box & label
        cv2.rectangle(np_img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
        cv2.putText(np_img, label, (xmin, ymin - 5), cv2.FONT_HERSHEY_SIMPLEX,
                    0.5, (0, 255, 0), 2)

    if count > 0:
        # Show in notebook
        plt.figure(figsize=(8, 6))
        plt.imshow(np_img)
        plt.axis("off")
        plt.show()


folder_path = "tiny-imagenet-200/test/images"
images = get_images_from_folder(folder_path)

for img_path in images:
    predict_and_show_inline(img_path)
