In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os, cv2
import xml.etree.ElementTree as ET
import tensorflow as tf
from tensorflow import keras
from keras import Sequential, Model
import time
from tensorflow.keras.losses import Loss
from tensorflow.keras import backend as K

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [2]:
from itertools import product
from math import sqrt

cfg = {
    'feature_maps':[38, 19, 10, 5, 3, 1],
    'min_dim': 300, #
    'steps': [8, 16, 32, 32, 100, 300], #
    'min_sizes': [30, 60, 111, 162, 213, 232], #
    'max_sizes': [60, 111, 162, 213, 232, 315], #
    'aspect_ratios': [[2], [2, 3], [2, 3], [2, 3], [2], [2]], #
    'variance': [0.1, 0.2], #
    'clip': True, #
}

class PriorBox(tf.Module):
    def __init__(self, cfg):
        super().__init__()
        self.image_size = cfg['min_dim']
        self.num_priors = len(cfg['aspect_ratios'])
        self.variance = cfg['variance'] or [0.1]
        self.feature_maps = cfg['feature_maps']
        self.min_sizes = cfg['min_sizes']
        self.max_sizes = cfg['max_sizes']
        self.steps = cfg['steps']
        self.aspect_ratios = cfg['aspect_ratios']
        self.clip = cfg['clip']
        for v in self.variance:
            if v <= 0:
                raise ValueError('Variances must be greater than 0')

    def __call__(self):
        mean = []
        for k, f in enumerate(self.feature_maps):
            for i, j in product(range(f), repeat=2):
                f_k = self.image_size / self.steps[k]
                cx = (j + 0.5) / f_k
                cy = (i + 0.5) / f_k

                s_k = self.min_sizes[k] / self.image_size
                mean.append([cx, cy, s_k, s_k])

                s_k_prime = sqrt(s_k * (self.max_sizes[k] / self.image_size))
                mean.append([cx, cy, s_k_prime, s_k_prime])

                for ar in self.aspect_ratios[k]:
                    mean.append([cx, cy, s_k * sqrt(ar), s_k / sqrt(ar)])
                    mean.append([cx, cy, s_k / sqrt(ar), s_k * sqrt(ar)])

        output = tf.constant(mean, dtype=tf.float32)
        if self.clip:
            output = tf.clip_by_value(output, clip_value_min=0, clip_value_max=1)
        return output
    
def convert_scale(matrix,scale,hImage,wImage):
    if scale == 'abs':
        return tf.stack([matrix[:,0]*wImage,
        matrix[:,1]*hImage,
        matrix[:,2]*wImage,
        matrix[:,3]*hImage],axis=-1)

    elif scale == 'rel':
        return tf.stack([matrix[:,0]/wImage,
        matrix[:,1]/hImage,
        matrix[:,2]/wImage,
        matrix[:,3]/hImage],axis=-1)    

    
def convert_format(out,format):
    if format == 'x1y1x2y2':
        return tf.stack([out[...,0]-out[...,2]/2.0,
        out[...,1]-out[...,3]/2.0,
        out[...,0]+out[...,2]/2.0,
        out[...,1]+out[...,3]/2.0]
        ,axis=-1)

    elif format == 'xywh':
        return tf.stack([(out[...,0]+out[...,2])/2.0,
        (out[...,1]+out[...,3])/2.0,
        out[...,2]-out[...,0],
        out[...,3]-out[...,1],
        out[...,4]],axis=-1)
    


def get_priors_featureboxes(cfg):
    
    priorbox = PriorBox(cfg)
    priors = priorbox()

    # Our generated prior boxes are from 0 to 1 in range. We need to multiply with the scale of our image i.e 300

    feature_box = convert_scale(priors,'abs',300,300)
    feature_box_conv = convert_format(feature_box,'x1y1x2y2')

    feature_box_conv = tf.clip_by_value(feature_box_conv, clip_value_min=0, clip_value_max=300)

    return priors,feature_box,feature_box_conv


# priors,feature_box,feature_box_conv = get_priors_featureboxes(cfg)

In [3]:
def iou(box1, box2):
    """
    Calculate the Intersection over Union (IoU) of two sets of boxes.
    The IoU is a measure of the overlap between two bounding boxes.

    Args:
    box1 -- tensor of shape (N, 4), ground truth bounding boxes
    box2 -- tensor of shape (M, 4), proposed anchor boxes

    Returns:
    iou -- tensor of shape (N, M), IoU values
    """
    # Convert boxes to float32 for precision during division
    box1 = tf.cast(box1, dtype=tf.float32)
    box2 = tf.cast(box2, dtype=tf.float32)

    # Calculate intersection coordinates
    x1 = tf.math.maximum(box1[:, None, 0], box2[:, 0])
    y1 = tf.math.maximum(box1[:, None, 1], box2[:, 1])
    x2 = tf.math.minimum(box1[:, None, 2], box2[:, 2])
    y2 = tf.math.minimum(box1[:, None, 3], box2[:, 3])

    # Compute area of intersection rectangle
    intersectionArea = tf.math.maximum(0.0, x2 - x1) * tf.math.maximum(0.0, y2 - y1)

    # Compute area of both bounding boxes
    box1Area = (box1[:, 2] - box1[:, 0]) * (box1[:, 3] - box1[:, 1])
    box2Area = (box2[:, 2] - box2[:, 0]) * (box2[:, 3] - box2[:, 1])

    # Compute union area by using inclusion-exclusion principle
    unionArea = tf.math.maximum(1e-10, box1Area[:, None] + box2Area - intersectionArea)

    # Compute IoU by dividing intersection area by union area
    iou = intersectionArea / unionArea

    # Clip the values to be between 0 and 1 as IoU cannot exceed 1
    return tf.clip_by_value(iou, 0.0, 1.0)


def df_match(labels, iou_matrix):
    """
    Match each proposed anchor box with the ground truth box that has the highest IoU.

    Args:
    labels -- tensor of shape (K, 4), ground truth labels
    iou_matrix -- tensor of shape (N, K), IoU values for each anchor against each ground truth

    Returns:
    gt_box -- tensor of shape (N, 4), matched ground truth boxes for each anchor
    matched -- tensor of shape (N,), indicator whether matching IoU is above threshold
    """
    
    # Find the maximum IoU value for each anchor box and its corresponding ground truth box index
    max_values = tf.reduce_max(iou_matrix,axis=1)
    max_idx = tf.math.argmax(iou_matrix,axis=1) 

    # Determine if the maximum IoU value for each anchor is above the threshold (0.5 in this case)
    matched = tf.cast(tf.math.greater_equal(max_values,0.5),dtype=tf.float32) 

    # Gather the matched ground truth boxes based on the indices from max_idx
    gt_box = tf.gather(labels,max_idx) 

    return gt_box, matched


def normalised_ground_truth(matched_boxes, feature_box, return_format):
    
    """Normalizes ground truth boxes based on the anchor box dimensions.

    This function encodes or decodes ground truth boxes relative to the provided 
    anchor boxes. The encoding and decoding processes are essential for training
    the SSD model.

    Args:
    matched_boxes: A tensor of shape [N, 4] representing the ground truth boxes 
      that have been matched to the anchor boxes.
    feature_box: A tensor of shape [1, 4] representing the anchor box 
      dimensions.
    return_format: A string indicating the desired output format. 
      "encode": Returns encoded boxes.
      "decode": Returns decoded boxes.

    Returns:
    A tensor of shape [N, 4] representing the encoded or decoded ground truth 
    boxes.
    """

    # Convert data types to float32
    matched_boxes = tf.cast(matched_boxes,tf.float32)
    feature_box = tf.cast(feature_box,tf.float32)

    if return_format == "encode":
        
        """
        Encoding ground truth boxes:

        This process transforms the ground truth boxes into relative offsets and 
        scale factors with respect to the anchor box. The offsets are normalized 
        by dividing them by the anchor box's width and height. The scale factors are 
        represented as the logarithm of the ratio between the ground truth box's 
        width/height and the anchor box's width/height.

        The normalized values are then further scaled by constants (0.1 for 
        offsets and 0.2 for scale factors) to ensure stability during training.
        """
        x_offset = (matched_boxes[:, 0] - feature_box[:, 0]) / feature_box[:, 2]
        y_offset = (matched_boxes[:, 1] - feature_box[:, 1]) / feature_box[:, 3]
        w_scale = tf.math.log(matched_boxes[:, 2] / feature_box[:, 2])
        h_scale = tf.math.log(matched_boxes[:, 3] / feature_box[:, 3])

        encoded_boxes = tf.stack([x_offset, y_offset, w_scale, h_scale], axis=-1)
        return encoded_boxes / [0.1, 0.1, 0.2, 0.2]

    elif return_format == "decode":
        
        """
        Decoding ground truth boxes:

        This process reverses the encoding process to obtain the original ground
        truth boxes from the encoded values. It involves:
        - Reversing the scaling applied during encoding.
        - Using the encoded values to calculate the center coordinates (x_center,
        y_center) and width (w) and height (h) of the boxes relative to the anchor 
        box.
        - Applying the exponential function to the scale factors to recover the 
        original width and height ratios.
        - Finally, multiplying the calculated dimensions by the anchor box's width 
        and height to obtain the absolute dimensions of the ground truth box.
        """
        encoded_boxes = matched_boxes * [0.1, 0.1, 0.2, 0.2]
        x_center = encoded_boxes[:, 0] * feature_box[:, 2] + feature_box[:, 0]
        y_center = encoded_boxes[:, 1] * feature_box[:, 3] + feature_box[:, 1]
        w = tf.math.exp(encoded_boxes[:, 2]) * feature_box[:, 2]
        h = tf.math.exp(encoded_boxes[:, 3]) * feature_box[:, 3]

        decoded_boxes = tf.stack([x_center, y_center, w, h], axis=-1)
        return decoded_boxes

In [4]:
import json 
# File path to save and read the data
file_path = "train_image_annotations.json"

# Read the data back from the file
with open(file_path, "r") as f:
    train_image = json.load(f)

len(train_image),train_image[0]

(14256,
 {'object': [{'name': 'objects',
    'xmin': 104,
    'xmax': 154,
    'ymin': 218,
    'ymax': 237}],
  'filename': 'JPEGImages/1001_jpg.rf.dfc0798279d416e17e5b66b7732b0d02.jpg',
  'width': 300,
  'height': 300})

In [5]:
import tensorflow as tf
import cv2
import numpy as np

# ----- Your helper functions -----
def fun(x, H, W):
    w = x['width']
    h = x['height']
    
    attr = []
    for i in x['object']:
        t = {}
        xmin = i['xmin'] * W / w
        xmax = i['xmax'] * W / w
        ymin = i['ymin'] * H / h
        ymax = i['ymax'] * H / h
        name = i['name']
        t['name'] = name
        t['xmin'] = xmin
        t['ymin'] = ymin
        t['xmax'] = xmax
        t['ymax'] = ymax
        attr.append(t)
    
    image = cv2.imread(x['filename'])
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (W, H))
    image = image / 255.0
    return image, attr

def fun_binary(x):  
    # Note: update this logic as needed because here x is a string.
    # This dummy example converts non-'0' labels to 1 and '0' to 0.
    return 0 if x != '0' else 1

# Assuming iou, df_match, convert_format, and normalised_ground_truth are defined elsewhere.
def main(label, feature_box, feature_box_conv):
    iou_matrix = iou(feature_box_conv, label)
    gt_box, matched = df_match(convert_format(label, 'xywh'), iou_matrix)
    boxes = gt_box[:, :4]
    classes = gt_box[:, 4]
    
    classes = tf.cast(classes, dtype=tf.int32)
    matched = tf.cast(matched, dtype=tf.int32)
    classes = tf.cast(classes * matched, dtype=tf.int32)
    classes = tf.one_hot(classes, depth=1, dtype=tf.float32)
    normalised_gtbox = normalised_ground_truth(boxes, feature_box, 'encode')
    df_box = tf.concat((normalised_gtbox, classes), axis=-1)
    df_box.set_shape([feature_box.shape[0], 4 + 1])
    return df_box

# ----- Define a generator function -----
def data_generator(train_image, feature_box, feature_box_conv):
    for annotation in train_image:
        # Process image and extract object attributes.
        image, attr = fun(annotation, 300, 300)  # returns image (300x300x3) and a list of object dicts
        
        # Build coordinate array from the attributes.
        coords = []
        for a in attr:
            xmin, ymin, xmax, ymax, name = a['xmin'], a['ymin'], a['xmax'], a['ymax'], a['name']
            # Convert the label to a binary value.
            # (Adjust the fun_binary logic as needed for your labeling.)
            label = 1 - fun_binary(name)
            coords.append([xmin, ymin, xmax, ymax, label])
        
        coords = np.array(coords, dtype=np.float32)
        
        # Compute the final coordinates using your main function.
        final_coord = main(coords, feature_box, feature_box_conv)
        # If main returns a tf.Tensor, convert it to a NumPy array.
        if isinstance(final_coord, tf.Tensor):
            final_coord = final_coord.numpy()
            
        # Yield the image and corresponding target coordinates.
        yield image.astype(np.float32), final_coord.astype(np.float32)

# ----- Prepare the tf.data.Dataset -----
# Assume that `train_image` is your list of annotation dictionaries.
# Assume get_priors_featureboxes(cfg) returns feature_box and feature_box_conv.
priors, feature_box, feature_box_conv = get_priors_featureboxes(cfg)
num_priors = feature_box.shape[0]  # final_coord shape will be (num_priors, 5)

# Define the output signature for the Dataset.
output_signature = (
    tf.TensorSpec(shape=(300, 300, 3), dtype=tf.float32),
    tf.TensorSpec(shape=(num_priors, 5), dtype=tf.float32)
)

In [6]:
from sklearn.model_selection import train_test_split

# Split your annotation list (train_image) into train and validation sets.
train_imgs, val_imgs = train_test_split(train_image, test_size=0.1, random_state=42)

# Create training dataset
train_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(train_imgs, feature_box, feature_box_conv),
    output_signature=(
        tf.TensorSpec(shape=(300, 300, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(feature_box.shape[0], 5), dtype=tf.float32)
    )
)
train_dataset = train_dataset.batch(32).prefetch(tf.data.AUTOTUNE)

# Create validation dataset
val_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(val_imgs, feature_box, feature_box_conv),
    output_signature=(
        tf.TensorSpec(shape=(300, 300, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(feature_box.shape[0], 5), dtype=tf.float32)
    )
)
val_dataset = val_dataset.batch(32).prefetch(tf.data.AUTOTUNE)

In [7]:
def total_loss(y_true, y_pred):
    y_true = tf.cast(y_true, dtype=tf.float32)
    y_pred = tf.cast(y_pred, dtype=tf.float32)

    # Create a mask for positive samples
    pos_mask = tf.cast(tf.equal(tf.squeeze(y_true[:, :, 4:5], axis=-1), 0.0), tf.float32)
    num_pos = tf.maximum(1.0, tf.cast(tf.math.count_nonzero(pos_mask, axis=-1), tf.float32))

    # Calculate localization loss
    loc_loss = tf.compat.v1.losses.huber_loss(labels=y_true[:, :, :4],
                                              predictions=y_pred[:, :, :4],
                                              reduction="none")
    
    loc_loss = tf.reduce_sum(loc_loss, axis=-1)
    loc_loss = tf.where(tf.equal(pos_mask, 1.0), loc_loss, 0.0)
    loc_loss = tf.reduce_sum(loc_loss, axis=-1)
    loc_loss = loc_loss / num_pos

    # Calculate classification loss
    cce = tf.losses.BinaryCrossentropy(from_logits=True, reduction=tf.losses.Reduction.NONE)
    cross_entropy = cce(y_true[:, :, 4:], y_pred[:, :, 4:])

    # Set negative to positive ratio
    num_neg =  num_pos

    # Perform hard negative mining
    neg_cross_entropy = tf.where(tf.equal(pos_mask, 0.0), cross_entropy, 0.0)
    sorted_dfidx = tf.cast(tf.argsort(neg_cross_entropy, direction='DESCENDING', axis=-1), tf.int32)
    rank = tf.cast(tf.argsort(sorted_dfidx, axis=-1), tf.int32)
    num_neg = tf.cast(num_neg, dtype=tf.int32)
    neg_loss = tf.where(rank < tf.expand_dims(num_neg, axis=1), neg_cross_entropy, 0.0)

    # Calculate positive loss
    pos_loss = tf.where(tf.equal(pos_mask, 1.0), cross_entropy, 0.0)
    clas_loss = tf.reduce_sum(pos_loss + neg_loss, axis=-1)
    clas_loss = clas_loss / num_pos

    # Combine losses
    total_loss = loc_loss + clas_loss
    return total_loss

import tensorflow as tf
from tensorflow.keras.layers import Layer, Input, Conv2D, MaxPool2D, Reshape, Concatenate, MaxPooling2D, ZeroPadding2D, Activation
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2

l2_reg = l2_regularization = 0.0005

class L2Normalization(Layer):
    """Normalizing different scale features for fusion.
    paper: https://arxiv.org/abs/1506.04579
    inputs:
        feature_map = (batch_size, feature_map_height, feature_map_width, depth)
    outputs:
        normalized_feature_map = (batch_size, feature_map_height, feature_map_width, depth)
    """
    def __init__(self, scale_factor, **kwargs):
        super(L2Normalization, self).__init__(**kwargs)
        self.scale_factor = scale_factor

    def get_config(self):
        config = super(L2Normalization, self).get_config()
        config.update({"scale_factor": self.scale_factor})
        return config

    def build(self, input_shape):
        # Network need to learn scale factor for each channel
        init_scale_factor = tf.fill((input_shape[-1],), float(self.scale_factor))
        self.scale = tf.Variable(init_scale_factor, trainable=True)

    def call(self, inputs):
        return tf.nn.l2_normalize(inputs, axis=-1) * self.scale

def buildSSD_VGG16(n_classes, n_boxes = [4, 6, 6, 6, 4, 4]):

    x = Input(shape=(300, 300, 3))

    conv1_1 = Conv2D(32, (3, 3), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv1_1')(x)
    pool1 = MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same', name='pool1')(conv1_1)

    conv2_1 = Conv2D(64, (3, 3), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv2_1')(pool1)
    pool2 = MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same', name='pool2')(conv2_1)

    conv3_1 = Conv2D(128, (3, 3), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv3_1')(pool2)
    pool3 = MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same', name='pool3')(conv3_1)

    conv4_1 = Conv2D(256, (3, 3), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv4_1')(pool3)
    pool4 = MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same', name='pool4')(conv4_1)

    conv5_1 = Conv2D(256, (3, 3), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv5_1')(pool4)
    pool5 = MaxPooling2D(pool_size=(3, 3), strides=(1, 1), padding='same', name='pool5')(conv5_1)

    fc6 = Conv2D(512, (3, 3), dilation_rate=(6, 6), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='fc6')(pool5)

    fc7 = Conv2D(512, (1, 1), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='fc7')(fc6)

    conv6_1 = Conv2D(128, (1, 1), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv6_1')(fc7)
    conv6_1 = ZeroPadding2D(padding=((1, 1), (1, 1)), name='conv6_padding')(conv6_1)
    conv6_2 = Conv2D(256, (3, 3), strides=(2, 2), activation='relu', padding='valid', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv6_2')(conv6_1)

    conv7_1 = Conv2D(64, (1, 1), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv7_1')(conv6_2)
    conv7_1 = ZeroPadding2D(padding=((1, 1), (1, 1)), name='conv7_padding')(conv7_1)
    conv7_2 = Conv2D(128, (3, 3), strides=(2, 2), activation='relu', padding='valid', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv7_2')(conv7_1)

    conv8_1 = Conv2D(64, (1, 1), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv8_1')(conv7_2)
    conv8_2 = Conv2D(128, (3, 3), strides=(1, 1), activation='relu', padding='valid', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv8_2')(conv8_1)

    conv9_1 = Conv2D(64, (1, 1), activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv9_1')(conv8_2)
    conv9_2 = Conv2D(128, (3, 3), strides=(1, 1), activation='relu', padding='valid', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv9_2')(conv9_1)

    # Feed conv4_3 into the L2 normalization layer
    conv4_3_norm = L2Normalization(scale_factor=20, name='conv4_3_norm')(conv4_1)

    ### Build the convolutional predictor layers on top of the base network

    # We precidt `n_classes` confidence values for each box, hence the confidence predictors have depth `n_boxes * n_classes`
    # Output shape of the confidence layers: `(batch, height, width, n_boxes * n_classes)`
    conv4_3_norm_mbox_conf = Conv2D(n_boxes[0] * n_classes, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv4_3_norm_mbox_conf')(conv4_3_norm)
    fc7_mbox_conf = Conv2D(n_boxes[1] * n_classes, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='fc7_mbox_conf')(fc7)
    conv6_2_mbox_conf = Conv2D(n_boxes[2] * n_classes, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv6_2_mbox_conf')(conv6_2)
    conv7_2_mbox_conf = Conv2D(n_boxes[3] * n_classes, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv7_2_mbox_conf')(conv7_2)
    conv8_2_mbox_conf = Conv2D(n_boxes[4] * n_classes, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv8_2_mbox_conf')(conv8_2)
    conv9_2_mbox_conf = Conv2D(n_boxes[5] * n_classes, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv9_2_mbox_conf')(conv9_2)
    # We predict 4 box coordinates for each box, hence the localization predictors have depth `n_boxes * 4`
    # Output shape of the localization layers: `(batch, height, width, n_boxes * 4)`
    conv4_3_norm_mbox_loc = Conv2D(n_boxes[0] * 4, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv4_3_norm_mbox_loc')(conv4_3_norm)
    fc7_mbox_loc = Conv2D(n_boxes[1] * 4, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='fc7_mbox_loc')(fc7)
    conv6_2_mbox_loc = Conv2D(n_boxes[2] * 4, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv6_2_mbox_loc')(conv6_2)
    conv7_2_mbox_loc = Conv2D(n_boxes[3] * 4, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv7_2_mbox_loc')(conv7_2)
    conv8_2_mbox_loc = Conv2D(n_boxes[4] * 4, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv8_2_mbox_loc')(conv8_2)
    conv9_2_mbox_loc = Conv2D(n_boxes[5] * 4, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(l2_reg), name='conv9_2_mbox_loc')(conv9_2)

    conv4_3_norm_mbox_conf_reshape = Reshape((-1, n_classes), name='conv4_3_norm_mbox_conf_reshape')(conv4_3_norm_mbox_conf)
    fc7_mbox_conf_reshape = Reshape((-1, n_classes), name='fc7_mbox_conf_reshape')(fc7_mbox_conf)
    conv6_2_mbox_conf_reshape = Reshape((-1, n_classes), name='conv6_2_mbox_conf_reshape')(conv6_2_mbox_conf)
    conv7_2_mbox_conf_reshape = Reshape((-1, n_classes), name='conv7_2_mbox_conf_reshape')(conv7_2_mbox_conf)
    conv8_2_mbox_conf_reshape = Reshape((-1, n_classes), name='conv8_2_mbox_conf_reshape')(conv8_2_mbox_conf)
    conv9_2_mbox_conf_reshape = Reshape((-1, n_classes), name='conv9_2_mbox_conf_reshape')(conv9_2_mbox_conf)

    conv4_3_norm_mbox_loc_reshape = Reshape((-1, 4), name='conv4_3_norm_mbox_loc_reshape')(conv4_3_norm_mbox_loc)
    fc7_mbox_loc_reshape = Reshape((-1, 4), name='fc7_mbox_loc_reshape')(fc7_mbox_loc)
    conv6_2_mbox_loc_reshape = Reshape((-1, 4), name='conv6_2_mbox_loc_reshape')(conv6_2_mbox_loc)
    conv7_2_mbox_loc_reshape = Reshape((-1, 4), name='conv7_2_mbox_loc_reshape')(conv7_2_mbox_loc)
    conv8_2_mbox_loc_reshape = Reshape((-1, 4), name='conv8_2_mbox_loc_reshape')(conv8_2_mbox_loc)
    conv9_2_mbox_loc_reshape = Reshape((-1, 4), name='conv9_2_mbox_loc_reshape')(conv9_2_mbox_loc)

    mbox_conf = Concatenate(axis=1, name='mbox_conf')([conv4_3_norm_mbox_conf_reshape,
                                                        fc7_mbox_conf_reshape,
                                                        conv6_2_mbox_conf_reshape,
                                                        conv7_2_mbox_conf_reshape,
                                                        conv8_2_mbox_conf_reshape,
                                                        conv9_2_mbox_conf_reshape])

    mbox_loc = Concatenate(axis=1, name='mbox_loc')([conv4_3_norm_mbox_loc_reshape,
                                                        fc7_mbox_loc_reshape,
                                                        conv6_2_mbox_loc_reshape,
                                                        conv7_2_mbox_loc_reshape,
                                                        conv8_2_mbox_loc_reshape,
                                                        conv9_2_mbox_loc_reshape])

    mbox_conf_sigmoid = Activation('sigmoid', name='mbox_conf_sigmoid')(mbox_conf)
    ssd_VGG16 = Model(inputs=x, outputs=[mbox_loc, mbox_conf_sigmoid])

    return ssd_VGG16

def SSD():
    
    inputs = Input((300,300,3))
    
    model = buildSSD_VGG16(n_classes=1)
    
    loc,clas = model(inputs)
    
    pred = Concatenate(axis=-1)([loc, clas])
    
    return Model(inputs=[inputs], outputs=[pred])

model = SSD()

model.compile(optimizer='adam',loss=total_loss)
model.summary()

In [8]:
# ----- Define your callbacks (as before) -----
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.8,
    patience=2,
    min_lr=1e-6,
    verbose=1
)

early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# Fit the model using both training and validation datasets.
model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=50,
    callbacks=[reduce_lr, early_stop],
    verbose=1
)

Epoch 1/50


Expected: ['keras_tensor']
Received: inputs=Tensor(shape=(None, 300, 300, 3))


    401/Unknown [1m401s[0m 986ms/step - loss: 5.1533



[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m432s[0m 1s/step - loss: 5.1505 - val_loss: 2.8594 - learning_rate: 0.0010
Epoch 2/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m471s[0m 1s/step - loss: 2.6946 - val_loss: 2.3654 - learning_rate: 0.0010
Epoch 3/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m428s[0m 1s/step - loss: 2.3086 - val_loss: 2.2242 - learning_rate: 0.0010
Epoch 4/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m425s[0m 1s/step - loss: 2.1632 - val_loss: 2.1301 - learning_rate: 0.0010
Epoch 5/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m427s[0m 1s/step - loss: 2.0775 - val_loss: 2.1065 - learning_rate: 0.0010
Epoch 6/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m427s[0m 1s/step - loss: 2.0185 - val_loss: 2.0796 - learning_rate: 0.0010
Epoch 7/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m423s[0m 1s/step - loss: 1.9767 - val_loss: 2.0533 - learning_rate: 0.0

<keras.src.callbacks.history.History at 0x30b63b680>

In [9]:
# Save the model
model.save('SSD300.keras')

In [10]:

# Save numpy array
np.save('priors.npy', priors)
# Load numpy array
loaded_priors = np.load('priors.npy')

In [12]:
import tensorflow as tf
import tf2onnx

# Define the input signature based on your model's input shape.
spec = (tf.TensorSpec(model.inputs[0].shape, tf.float32, name="input"),)

# Convert the model to ONNX format and save it.
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec, output_path="SSD300 fp32.onnx")
print("Model converted to ONNX and saved as SSD300 fp32.onnx")

Model converted to ONNX and saved as SSD300 fp32.onnx


In [18]:
import glob
from onnxruntime.quantization import (
    quantize_static,
    CalibrationDataReader,
    QuantFormat,
    QuantType,
)


class FileCalibrationDataReader(CalibrationDataReader):
    def __init__(self, image_paths, input_name="input", preprocess_fn=None):
        self.image_paths = image_paths
        self.input_name = input_name
        self.preprocess = preprocess_fn or self._default_preprocess
        self.index = 0

    def _default_preprocess(self, path):
        import cv2, numpy as np
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (300, 300))
        return img.astype(np.float32) / 255.0

    def get_next(self):
        if self.index >= len(self.image_paths):
            return None

        # 1) load & preprocess a single image: shape (H, W, C)
        img = self.preprocess(self.image_paths[self.index])
        self.index += 1

        # 2) add batch dimension → shape (1, H, W, C)
        batched = img[np.newaxis, ...]

        # 3) return dict with correct rank
        return {self.input_name: batched}


# 1. Point at all the images in your folder:
calib_folder = "JPEGImages_rotated"
# match .jpg, .jpeg, .png (adjust extensions as needed)
image_paths = glob.glob(f"{calib_folder}/*.[jp][pn]g")

# 2. Create the reader (streams one file at a time)
reader = FileCalibrationDataReader(image_paths, input_name="input")

# 3. Run static INT8 quantization
quantize_static(
    model_input="SSD300 fp32.onnx",
    model_output="SSD300 int8.onnx",
    calibration_data_reader=reader,
    quant_format=QuantFormat.QDQ,    # or QuantFormat.QOperator
    activation_type=QuantType.QInt8,
    weight_type=QuantType.QInt8
)

print("✅ Static INT8 quantization complete: SSD300_int8.onnx")




✅ Static INT8 quantization complete: SSD300_int8.onnx
