In [1]:
import pandas as pd
from typing import Iterable, Literal, overload
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import cv2 as cv
import numpy as np
import math
import os

In [2]:
os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'

In [3]:
import tensorflow as tf

2025-11-18 19:17:00.797325: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-11-18 19:17:00.896311: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-11-18 19:17:00.925494: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-11-18 19:17:01.111791: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [53]:
class LocalizationHead(tf.keras.Model):
    def __init__(self,name: str, num_anchors_per_location: list[int], **kwargs):
        super().__init__(name=name)
        
        self.heads = []
        self.head_type = kwargs['head_type']
        self.heads = self.make_heads(num_anchors_per_location)
        self.num_anchors_per_layer = num_anchors_per_location
        
        if 'initial_norm_strategy' in kwargs:
            self.initial_norm = self.make_normalization(kwargs['initial_norm_strategy'])
        else:
            self.initial_norm = None
        
        self.squeeze_heads = None
        if 'squeeze_ratio' in kwargs:
            self.squeeze_ratio = kwargs['squeeze_ratio']
            self.squeeze_heads = self.make_squeeze_heads(kwargs['in_channels'])
        else:
            self.squeeze_ratio = 1.0
            
        self.intermediate_heads = None
        if 'intermediate_conv' in kwargs:
            self.intermediate_channels = kwargs['intermediate_conv']
            self.intermediate_heads = self.make_intermediate_heads(num_anchors_per_location)

    def call(self,feature_maps,training = False):
        outputs = []
        for layer, feature_map in enumerate(feature_maps):
            num_anchors = self.num_anchors_per_layer[layer]
            
            # Getting the feature map
            x = feature_map

            # Initial Norm
            if self.initial_norm is not None and layer == 0:
                x = self.initial_norm(x,training = training)

            # Squeeze Layer
            if self.squeeze_ratio != 1.0:
                x = self.squeeze_heads[layer](x,training = training)
                
            # Intermediate Conv
            if self.intermediate_heads is not None:
                x = self.intermediate_heads[layer](x, training=training)
            # Prediction Conv
            x = self.heads[layer](x,training = training)

            # Reshape
            B = tf.shape(x)[0]
            H = tf.shape(x)[1]
            W = tf.shape(x)[2]

            x = tf.reshape(x, [B, H, W, num_anchors, 4])
            x = tf.reshape(x, [B, H * W * num_anchors, 4])

            # Append the value
            outputs.append(x)

        # Concatenate
        final_output = tf.concat(outputs,axis=1)
        return final_output
        
    def make_head(self,head_type: str, out_channels: int, index: int, role: str):
        base = f"{self.name}_loc_{role}_{index}"
        if head_type == "conv3x3":
            return tf.keras.layers.Conv2D(filters=out_channels, kernel_size=3,padding="same",name=base)
        elif head_type == "depthwise":
            dw_name = f"{base}_dw"
            pw_name = f"{base}_pw"
            return tf.keras.Sequential([
                tf.keras.layers.DepthwiseConv2D(kernel_size = 3, padding="same",name=dw_name),
                tf.keras.layers.Conv2D(filters=out_channels, kernel_size=1,padding="same",name=pw_name)
            ],name=base)

    def make_heads(self,anchors_per_location: list[int]):
        heads = []
        for layer, anchors in enumerate(anchors_per_location):
            output_channel = anchors * 4
            head = self.make_head(self.head_type,output_channel,layer,role="pred")
            heads.append(head)

        return heads

    def make_squeeze_head(self,out_channels: int,index: int):
        base = f"{self.name}_loc_squeeze_{index}"
        return tf.keras.layers.Conv2D(filters=out_channels, kernel_size=1,padding="same",name=base)

    def make_intermediate_heads(self, anchors_per_location: list[int]):
        heads = []
        for layer in range(len(anchors_per_location)):
            heads.append(self.make_head(self.head_type, self.intermediate_channels,layer,role="inter"))
        
        return heads

    def make_squeeze_heads(self, channels_per_location):
        heads = []
        for layer, channels in enumerate(channels_per_location):
            output_channel = int(channels * self.squeeze_ratio)
            head = self.make_squeeze_head(output_channel,layer)
            heads.append(head)

        return heads

    def make_normalization(self, normalization_type):
        if normalization_type == "BatchNorm":
            return tf.keras.layers.BatchNormalization(name = "loc_initial_normalization")
        elif normalization_type == "Norm":
            return tf.keras.layers.Normalization(name = "initial_normalization")
        


In [54]:
num_anchors_per_location = [4, 6, 6]
B = 2
P0 = tf.random.normal([B, 38, 38, 256])
P1 = tf.random.normal([B, 19, 19, 512])
P2 = tf.random.normal([B, 10, 10, 512])

feature_maps = [P0, P1, P2]

In [55]:
loc_head = LocalizationHead(name="loc_head",num_anchors_per_location=num_anchors_per_location,head_type="conv3x3")

In [56]:
pred_loc = loc_head(feature_maps, training=False)

In [57]:
tf.shape(pred_loc)

<tf.Tensor: shape=(3,), dtype=int32, numpy=array([   2, 8542,    4], dtype=int32)>

In [58]:
H0, W0 = 38, 38
H1, W1 = 19, 19
H2, W2 = 10, 10

A0, A1, A2 = num_anchors_per_location
N_expected = H0 * W0 * A0 + H1 * W1 * A1 + H2 * W2 * A2
print("N_expected:", N_expected)

N_expected: 8542


In [59]:
assert pred_loc.shape[0] == B
assert pred_loc.shape[2] == 4
assert pred_loc.shape[1] == N_expected

In [60]:
loc_head_squeezed = LocalizationHead(name="loc_head_squeezed",num_anchors_per_location=num_anchors_per_location,head_type="conv3x3",squeeze_ratio=0.5,in_channels=[256, 512, 512])

In [61]:
pred_loc_sq = loc_head_squeezed(feature_maps, training=False)
assert pred_loc_sq.shape == pred_loc.shape

In [62]:
loc_head_intermediate = LocalizationHead(name="loc_head_intermediate",num_anchors_per_location=num_anchors_per_location,head_type="conv3x3",intermediate_conv=128)

In [63]:
pred_loc_int = loc_head_intermediate(feature_maps, training=False)
assert pred_loc_int.shape == pred_loc.shape

In [64]:
loc_head.summary()

In [65]:
loc_head_squeezed.summary()

In [66]:
loc_head_intermediate.summary()

In [67]:
loc_head.layers

[<Conv2D name=loc_head_loc_pred_0, built=True>,
 <Conv2D name=loc_head_loc_pred_1, built=True>,
 <Conv2D name=loc_head_loc_pred_2, built=True>]

In [68]:
loc_head_squeezed.layers

[<Conv2D name=loc_head_squeezed_loc_pred_0, built=True>,
 <Conv2D name=loc_head_squeezed_loc_pred_1, built=True>,
 <Conv2D name=loc_head_squeezed_loc_pred_2, built=True>,
 <Conv2D name=loc_head_squeezed_loc_squeeze_0, built=True>,
 <Conv2D name=loc_head_squeezed_loc_squeeze_1, built=True>,
 <Conv2D name=loc_head_squeezed_loc_squeeze_2, built=True>]

In [69]:
loc_head_intermediate.layers

[<Conv2D name=loc_head_intermediate_loc_pred_0, built=True>,
 <Conv2D name=loc_head_intermediate_loc_pred_1, built=True>,
 <Conv2D name=loc_head_intermediate_loc_pred_2, built=True>,
 <Conv2D name=loc_head_intermediate_loc_inter_0, built=True>,
 <Conv2D name=loc_head_intermediate_loc_inter_1, built=True>,
 <Conv2D name=loc_head_intermediate_loc_inter_2, built=True>]