### **Init**

#### Modules and Globals

**Load modules**

In [1]:
# Libraries
# -- Core
import time
import datetime
import logging

# -- OpenCV
import cv2

# -- Numpy
import numpy as np

# -- Pandas
import pandas as pd

# -- TQDM
from tqdm import tqdm

# -- Tensorflow and Keras
import tensorflow as tf
from keras import Model
from keras.layers import Add, Concatenate, Conv2D, Input, Lambda, LeakyReLU, MaxPool2D, UpSampling2D, ZeroPadding2D, BatchNormalization
from keras.regularizers import l2
from keras.losses import binary_crossentropy
from keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, TensorBoard

2023-12-14 22:09:56.066697: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-12-14 22:09:56.066792: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-12-14 22:09:56.112143: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-12-14 22:09:56.205124: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


**Globals**

In [2]:
# Directories
DATA_DIR = '../../data'

# Images info CSV file
ALL_DATA_CSV_FILEPATH = f'{DATA_DIR}/all-data.csv'

# Cell classes dictionary
class_dict = {
    'red blood cell':0,
    'trophozoite': 1, 
    'schizont': 2, 
    'difficult': 3, 
    'ring': 4,
    'leukocyte': 5, 
    'gametocyte': 6
}

#### Initial Variables

In [3]:
batch_size = 8          # packets count

size = 416              # size of resize image
yolo_max_boxes = 223    # maximum yolo boxes predicted per image (if there are less, the others will be filled with zeros)

yolo_iou_threshold = 0.5        # IOU threshold score 
yolo_score_threshold = 0.4      # objectness threshold score
learning_rate = 1e-4            # learning rate
epochs = 100                    # epochs run to fine tune our model

# YOLO anchors
yolo_anchors = np.array([(10, 13), (16, 30), (33, 23), (30, 61), (62, 45),
                         (59, 119), (116, 90), (156, 198), (373, 326)],
                        np.float32) / size
# YOLO masks
yolo_anchor_masks = np.array([[6, 7, 8], [3, 4, 5], [0, 1, 2]])

classes_count = len(class_dict.items()) # categories count
print(f'Classes count : {classes_count}')

Classes count : 7


### **Darknet**

#### Darknet layers

**Convolution layer**

In [4]:
def DarknetConv(x : tf.Tensor, 
                filters: int, 
                size : int, 
                strides: int = 1, 
                batch_norm: bool = True) -> tf.Tensor:
    
    if strides == 1:
        # all-way padding
        padding = 'same'
    else:
        # top left half-padding
        x = ZeroPadding2D(((1, 0), (1, 0)))(x) 
        padding = 'valid' 
    
    # 2D convolution layer 
    x = Conv2D(filters=filters, kernel_size=size,
               strides=strides, padding=padding,
               use_bias=not batch_norm, kernel_regularizer=l2(0.0005))(x)
    
    if batch_norm:
        # batch normalization layer
        x = BatchNormalization()(x)
        
        # Leaky ReLU layer
        # -- negatives values reduction to 10%
        x = LeakyReLU(alpha=0.1)(x)
    
    return x

**Residual layer**

In [5]:
def DarknetResidual(x: tf.Tensor, filters: int) -> tf.Tensor:
    prev : tf.Tensor = x

    # two convolutions
    # -- first layer
    if filters % 2 != 0:
        x = DarknetConv(x, filters, 1) 
    else:
        x = DarknetConv(x, filters // 2, 1)

    # -- second layer
    x = DarknetConv(x, filters, 3)

    # tensors sum
    # -- prev : initial input tensor
    # -- x    : output tensor from convolution layers
    x = Add()([prev, x])

    return x

**Block superlayer**

In [6]:
def DarknetBlock(x, filters: int, repeat: int) -> tf.Tensor:
    # initial convolution layer
    x = DarknetConv(x, filters, size=3, strides=2) 

    # residual layers (in count of `repeat`)
    for _ in range(repeat):
        x = DarknetResidual(x, filters)
        
    return x

#### Darknet builder

**CNN architecture schematic**

![darknet-architecture-schematic](../../assets/darkent-architecture-schematics.png)

**Implementation**

In [7]:
# Neural Network - Darknet Architecture
def Darknet(name: str | None = None) -> Model:
    # input load
    x = inputs = Input([None, None, 3])

    # initial convolution layer
    x = DarknetConv(x, filters=32, strides=3)

    # darknet residual blocks

    x = DarknetBlock(x, filters=64, repeat=1)

    x = DarknetBlock(x, filters=128, repeat=2)

    # -- -- outputs - 1st dimension
    x = x_36 = DarknetBlock(x, filters=256, repeat=8)
    
    # -- -- outputs - 2nd dimension
    x = x_61 = DarknetBlock(x, filters=512, repeat=8)

    # -- -- outputs - 3rd dimension
    x = DarknetBlock(x, filters=1024, repeat=4)

    # group of layers with training interface
    # -- inputs  : base layer
    return Model(inputs, (x_36, x_61, x), name=name)

### **Yolo v3**

#### Yolo Utils

**Boxes**

In [8]:
def yolo_boxes(pred: tf.Tensor, anchors: np.array, classes: int):
    # grid size
    grid_size = tf.shape(pred)[1]

    # split tensor into a list of subtensors
    # -- `box_xy` : bounding box grids tensor (x, y)
    # -- `box_wh` : bounding box dimensions tensot(width, height)
    # -- `objectness`  : objectness score tensor
    # -- `class_probs` : class probabilities tensor
    box_xy, box_wh, objectness, class_probs = tf.split(
        pred, (2, 2, 1, classes), axis=-1)

    # apply sigmoid function to normalize outputs
    box_xy = tf.sigmoid(box_xy) 
    objectness = tf.sigmoid(objectness)
    class_probs = tf.sigmoid(class_probs)

    # concatenate box coordinates and dimensions
    pred_box = tf.concat((box_xy, box_wh), axis=-1)

    # create a `grid` to map the coordinates of the boxes
    grid = tf.meshgrid(tf.range(grid_size), tf.range(grid_size))
    
    # expand dimensions of `grid` for further operations
    grid = tf.expand_dims(tf.stack(grid, axis=-1), axis=2)

    # convert `grid` and `grid_size` to `float32` for mathematical operations
    grid = tf.cast(grid, tf.float32)
    grid_size = tf.cast(grid_size, tf.float32)

    # adjust box coordinates to the grid size
    # -- transoform the box coordinates to the actual position on the image
    box_xy = (box_xy + grid / grid_size)
    
    # adjust box dimensions using the anchors
    # -- scale the width and height of the boxes using the anchor boxes
    box_wh = tf.exp(box_wh) * anchors

    # calculate the coordinates of the upper left and bottom right corners of the boxes
    box_x1y1 = box_xy - box_wh / 2 
    box_x2y2 = box_xy + box_wh / 2
    
    # concatenate the corner coordinates to form the final bounding boxes
    bbox = tf.concat([box_x1y1, box_x2y2], axis=-1)

    return bbox, objectness, class_probs, pred_box

**Non max suppression**

In [9]:
def yolo_nms(outputs):

    # `b` : boxes
    # `c` : confidences
    # `t` : class probabilities
    b, c, t = [], [], []

    # iterating through each outputs predicted by model
    for out in outputs:
        b.append(tf.reshape(out[0], (tf.shape(out[0])[0], -1, tf.shape(out[0])[-1])))
        c.append(tf.reshape(out[1], (tf.shape(out[1])[0], -1, tf.shape(out[1])[-1])))
        t.append(tf.reshape(out[2], (tf.shape(out[2])[0], -1, tf.shape(out[2])[-1])))

    # list concatenatation into the whole tensors 
    bbox = tf.concat(b, axis=1)
    confidence = tf.concat(c, axis=1)
    class_probs = tf.concat(t, axis=1)

    # `scores` : product of confidence and probabilities
    # -- final trust score for each class and box
    scores = confidence * class_probs
    
    # Non-Maximum Suppression
    # -- eliminates overlapped boxes with selecting the most trusted ones
    boxes, scores, classes, valid_detections = tf.image.combined_non_max_suppression(
        boxes=tf.reshape(bbox, (tf.shape(bbox)[0], -1, 1, 4)),
        scores=tf.reshape(scores, (tf.shape(scores)[0], -1, tf.shape(scores)[-1])),
        max_output_size_per_class = yolo_max_boxes,
        max_total_size = yolo_max_boxes,
        iou_threshold = yolo_iou_threshold,
        score_threshold = yolo_score_threshold
    )

    # return filtered boxes, class scores, class labels and valid detections count
    return boxes, scores, classes, valid_detections

**Intersection over Union**

In [10]:
def broadcast_iou(box_1: tf.Tensor, box_2: tf.Tensor):
    # expand dimensions of the boxes for further broadcasting
    box_1 = tf.expand_dims(box_1, -2)
    box_2 = tf.expand_dims(box_2, 0)
    
    # boxes' new shapes calculation
    new_shape = tf.broadcast_dynamic_shape(tf.shape(box_1), tf.shape(box_2))

    # boxes broadcasting - reshape for further numerical operations
    box_1 = tf.broadcast_to(box_1, new_shape)
    box_2 = tf.broadcast_to(box_2, new_shape)

    # intersection width and height calculation
    int_w = tf.maximum(tf.minimum(box_1[..., 2], box_2[..., 2]) -
                       tf.maximum(box_1[..., 0], box_2[..., 0]), 0) 
    int_h = tf.maximum(tf.minimum(box_1[..., 3], box_2[..., 3]) -
                       tf.maximum(box_1[..., 1], box_2[..., 1]), 0) 

    # intersection area surface calculation
    int_area = int_w * int_h 
    
    # boxes area surface calculation
    box_1_area = (box_1[..., 2] - box_1[..., 0]) * (box_1[..., 3] - box_1[..., 1])
    box_2_area = (box_2[..., 2] - box_2[..., 0]) * (box_2[..., 3] - box_2[..., 1])

    # Formula: Union(A,B) = A + B - Intersection(A,B)
    return int_area / (box_1_area + box_2_area - int_area)

#### Yolo Convolution Layer

In [11]:
def YoloConv(filters: int, name : str | None = None):
    
    def yolo_conv(x_in: tf.Tensor | tuple):
        if isinstance(x_in, tuple):
            # if input is a tuple, connect the streams
            inputs = Input(x_in[0].shape[1:]), Input(x_in[1].shape[1:])

            # remember initial input
            x, x_skip = inputs

            # process input with convolution layer
            x = DarknetConv(x, filters, 1)
            
            # enlarge tensor
            x = UpSampling2D(2)(x)
            
            # and concatenate with remembered initial input
            x = Concatenate()([x, x_skip])
        else:
            # otherwise - do not perform the streams connecting
            x = inputs = Input(x_in.shape[1:])
        
        # alternating convolution layers
        # -- filters | kernel sizes
        # -- 3x |  f  | 1
        # -- 2x | 2*f | 3
        x = DarknetConv(x, filters=filters,      size=1)
        x = DarknetConv(x, filters=filters * 2,  size=3)
        x = DarknetConv(x, filters=filters,      size=1)
        x = DarknetConv(x, filters=filters * 2,  size=3)
        x = DarknetConv(x, filters=filters,      size=1)
        
        return Model(inputs, x, name=name)(x_in)
    
    return yolo_conv

#### Yolo Output

In [12]:
def YoloOutput(filters: int, 
               anchors: np.array, 
               classes: int, 
               name: str | None = None):
    
    def yolo_output(x_in: tf.Tensor):
        # load input according to initial tensor shape
        x = inputs = Input(x_in.shape[1:])

        # convolution layer with doubled filters
        x = DarknetConv(x, filters=filters * 2, size=3)
        
        # second convolution layer
        # -- filters : anchors * (classes + 5)
        # -- kernel size : 1
        # -- `classes + 5` : class labels [`classes`], boxes info [4], objectness [1]
        x = DarknetConv(x, filters=anchors * (classes + 5), size=1, batch_norm=False)
        
        # tensor reshape into shape of last convolution 
        x = Lambda(lambda x: tf.reshape(x, (-1, tf.shape(x)[1], tf.shape(x)[2], 
                                            anchors, classes + 5)))(x)

        return Model(inputs, x, name=name)(x_in)
    
    return yolo_output

#### Yolo Loss

In [13]:
def YoloLoss(anchors, classes=80, ignore_thresh=0.5):
    def yolo_loss(y_true, y_pred):
        
        # `pred_box`  : 
        # `pred_obj`  : 
        # `pred_xywh` : 
        pred_box, pred_obj, pred_class, pred_xywh = yolo_boxes(
            y_pred, anchors, classes)
        
        # predicted bounding boxes grids[x, y] and dimensions[width, heigth] calculation
        pred_xy = pred_xywh[..., 0:2]
        pred_wh = pred_xywh[..., 2:4]

        # 2. transform all true outputs
        
        true_box, true_obj, true_class_idx = tf.split(
            y_true, (4, 1, 1), axis=-1) 

        true_xy = (true_box[..., 0:2] + true_box[..., 2:4]) / 2 # finding center (Xcen,Ycen)
        true_wh = true_box[..., 2:4] - true_box[..., 0:2] # width and height

        box_loss_scale = 2 - true_wh[..., 0] * true_wh[..., 1]

        # 3. inverting the pred box equations
        grid_size = tf.shape(y_true)[1]
        grid = tf.meshgrid(tf.range(grid_size), tf.range(grid_size))
        grid = tf.expand_dims(tf.stack(grid, axis=-1), axis=2)
        true_xy = true_xy * tf.cast(grid_size, tf.float32) - \
            tf.cast(grid, tf.float32)

        true_wh = tf.math.log(true_wh / anchors) 
        true_wh = tf.where(tf.math.is_inf(true_wh),
                           tf.zeros_like(true_wh), true_wh)

        # 4. calculate all masks
        obj_mask = tf.squeeze(true_obj, -1)
        
        best_iou = tf.map_fn(
            lambda x: tf.reduce_max(broadcast_iou(x[0], tf.boolean_mask(
                x[1], tf.cast(x[2], tf.bool))), axis=-1),
            (pred_box, true_box, obj_mask),
            tf.float32)
        ignore_mask = tf.cast(best_iou < ignore_thresh, tf.float32)

        # 5. calculate all losses
        xy_loss = obj_mask * box_loss_scale * \
            tf.reduce_sum(tf.square(true_xy - pred_xy), axis=-1)
        wh_loss = obj_mask * box_loss_scale * \
            tf.reduce_sum(tf.square(true_wh - pred_wh), axis=-1)
        
        obj_loss = binary_crossentropy(true_obj, pred_obj)
        
        
        alpha = 0.85
        conf_focal = tf.pow(obj_mask-tf.squeeze(tf.sigmoid(pred_obj),-1),2)
        obj_loss = conf_focal*((1-alpha)*obj_mask*obj_loss + alpha*(1-obj_mask)*ignore_mask*obj_loss)

        class_loss = obj_mask * binary_crossentropy(
            true_class_idx, pred_class)

        # 6. sum over (batch, gridx, gridy, anchors) => (batch, 1)
        xy_loss = tf.reduce_sum(xy_loss, axis=(1, 2, 3))
        wh_loss = tf.reduce_sum(wh_loss, axis=(1, 2, 3))
        obj_loss = tf.reduce_sum(obj_loss, axis=(1, 2, 3))
        class_loss = tf.reduce_sum(class_loss, axis=(1, 2, 3))

        return xy_loss + wh_loss + obj_loss + class_loss
    
    return yolo_loss

#### Yolo Model

In [14]:
def Yolo(size: int | None = None, 
         channels: int = 3, 
         anchors : np.array = yolo_anchors,
         masks: np.array = yolo_anchor_masks, 
         classes: int = 80, 
         training: bool = False):
    
    # load input data : image pixel matrixes (by each channel)
    x = inputs = Input([size, size, channels], name='input')

    # CNN Backbone
    x_36, x_61, x = Darknet(name='yolo_darknet')(x)

    # YOLO convolution layers at 3 levels
    # -- filters : 512
    x = YoloConv(512, name='yolo_conv_0')(x) 
    output_0 = YoloOutput(512, len(masks[0]), classes, name='yolo_output_0')(x)
    
    # -- filters : 256
    # -- -- up scaled with `x_61`
    x = YoloConv(256, name='yolo_conv_1')((x, x_61))
    output_1 = YoloOutput(256, len(masks[1]), classes, name='yolo_output_1')(x)

    # -- filters : 128
    # -- -- up scaled with `x_36`
    x = YoloConv(128, name='yolo_conv_2')((x, x_36))
    output_2 = YoloOutput(128, len(masks[2]), classes, name='yolo_output_2')(x)

    # if model is expected to be trained
    if training:
        return Model(inputs, (output_0, output_1, output_2), name='yolov3')

    else:
        # assumed boxes at convolution layers at 3 scale levels
        # -- level : 0
        boxes_0 = Lambda(lambda x: yolo_boxes(x, anchors[masks[0]], classes),
                         name='yolo_boxes_0')(output_0)
        # -- level : 1
        boxes_1 = Lambda(lambda x: yolo_boxes(x, anchors[masks[1]], classes),
                         name='yolo_boxes_1')(output_1)
        # -- level : 3
        boxes_2 = Lambda(lambda x: yolo_boxes(x, anchors[masks[2]], classes),
                         name='yolo_boxes_2')(output_2)

        # prediction outputs
        outputs = Lambda(lambda x: yolo_nms(x, anchors, masks, classes),
                         name='yolo_nms')((boxes_0[:3], boxes_1[:3], boxes_2[:3]))

        return Model(inputs, outputs, name='yolov3')

### **Core utils**

**Darknet weights loader**

In [15]:
def load_darknet_weights(model: Model, weights_file: str, tiny: bool = False):
    # File open
    wf = open(weights_file, 'rb')
    
    # Skip first 5 values
    major, minor, revision, seen, _ = np.fromfile(wf, dtype=np.int32, count=5)

    layers = ['yolo_darknet',
              'yolo_conv_0', 'yolo_output_0',
              'yolo_conv_1', 'yolo_output_1', 
              'yolo_conv_2', 'yolo_output_2']

    for layer_name in layers:
        sub_model = model.get_layer(layer_name)
        for i, layer in enumerate(sub_model.layers):
            if not layer.name.startswith('conv2d'): 
                continue
            batch_norm = None
            if i + 1 < len(sub_model.layers) and \
                    sub_model.layers[i + 1].name.startswith('batch_norm'):
                batch_norm = sub_model.layers[i + 1]

            logging.info("{}/{} {}".format(
                sub_model.name, layer.name, 'bn' if batch_norm else 'bias'))

            filters = layer.filters
            size = layer.kernel_size[0]
            in_dim = layer.input_shape[-1]

            if batch_norm is None:
                conv_bias = np.fromfile(wf, dtype=np.float32, count=filters)
            else:
                bn_weights = np.fromfile(
                    wf, dtype=np.float32, count=4 * filters)
                bn_weights = bn_weights.reshape((4, filters))[[1, 0, 2, 3]]

            conv_shape = (filters, in_dim, size, size)
            conv_weights = np.fromfile(
                wf, dtype=np.float32, count=np.product(conv_shape))
            conv_weights = conv_weights.reshape(
                conv_shape).transpose([2, 3, 1, 0])

            if batch_norm is None:
                layer.set_weights([conv_weights, conv_bias])
            else:
                layer.set_weights([conv_weights])
                batch_norm.set_weights(bn_weights)

    assert len(wf.read()) == 0, 'failed to read all data'
    wf.close()

**Outputs drawer**

In [16]:
def draw_outputs(img, outputs, class_names,no_rbc = True):
    boxes, objectness, classes, nums = outputs # predicted outputs extraction
    boxes, objectness, classes, nums = boxes[0], objectness[0], classes[0], nums[0]
    wh = np.flip(img.shape[0:2])

    for i in range(nums):
        if no_rbc:
            if classes[i]==0:
                continue 
            
        x1y1 = tuple((np.array(boxes[i][0:2]) * wh).astype(np.int32))
        x2y2 = tuple((np.array(boxes[i][2:4]) * wh).astype(np.int32))
        img = cv2.rectangle(img, x1y1, x2y2, (255, 0, 0), 2)
        img = cv2.putText(img, '{} {:.4f}'.format(
            class_names[int(classes[i])], objectness[i]),
            x1y1, cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 2)
        
    return img

**Model freezer**

In [17]:
def freeze_all(model: Model, frozen: bool = True):
    # set `trainable` attribute for the model and recursively for its each layer 
    model.trainable = not frozen

    # recursively freeze all sublayers for each model layer
    for layer in model.layers:
        freeze_all(layer, frozen)

**Data loader**

In [18]:
def parse_dataset(data: pd.DataFrame, 
                  class_dict: dict, 
                  size: int,
                  image_path: str, 
                  images_dirpath: str, 
                  yolo_max_boxes: int):
    X = []
    Y = []

    # for each image in directory
    for img in tqdm(image_path):
        # image load
        x_train = Image.open(images_dirpath + img)
        
        # image size
        width, height = x_train.size

        # image resize to specified square resolution
        x_train = x_train.resize((size, size))

        # cast to Numpy array
        x_train = np.array(x_train)

        temp_data = []
        # for each object at the image
        for _,row in data[data['img_path']==image_path+img].iterrows():
            # calculate the upper left and bottom right corners grids of the box
            x_min = row.min_c / width
            x_max = row.max_c / width
            y_min = row.min_r / height
            y_max = row.max_r / height
            
            # get the object category label
            category = class_dict[row.category]
            
            # store single object info as an array 
            temp_data.append([x_min, y_min, x_max, y_max, category])
        
        # expand output tensor shape to `yolo_max_boxes` and fill with zeros
        temp_data = temp_data + [[0,0,0,0,0]] * (yolo_max_boxes - len(temp_data))

        # add expected output data
        Y.append(temp_data)
        # add training input data
        X.append(x_train)
    
    # for each input stack the output data and return this matrix
    return (np.array(X),np.stack(np.array(Y)))

**Data transformer**

In [19]:
def transform_images(x: tf.Tensor, size: int):
    # resize images to specified square size
    x = tf.image.resize(x,(size,size))
    
    # normalize values to range <0.0, 1.0>
    x = x / 255.0
    
    return (x)