In [1]:
import cv2
import tensorflow as tf
import numpy as np
from tensorflow.image import resize_bilinear
from tensorflow.keras.layers import Conv2D, LeakyReLU, MaxPool2D, BatchNormalization, LocallyConnected2D, Flatten, Dropout, Dense

In [12]:
class Conv2D_FixedPad:
    def __init__(self, filters, kernel_size, strides=1, activation=None):
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.activation = activation
        
    def __call__(self, inputs):
        """ All convolution layers of YOLO padding size of kernel_size // 2 """
        pad_size = self.kernel_size // 2
        # Don't pad in batch and channel dimensions
        outputs = tf.pad(inputs, [[0, 0], [pad_size, pad_size], [pad_size, pad_size], [0, 0]])
        outputs = Conv2D(filters=self.filters, 
                         kernel_size=self.kernel_size, 
                         strides=self.strides,  
                         activation=self.activation)(outputs)
            
        return BatchNormalization()(outputs)

class NetWork:
    def __init__(self, img_h=448, 
                 img_w=448, 
                 img_ch=3, 
                 data_format="channels_last", 
                 alpha=0.1, 
                 classes=20, 
                 sides=7,
                 momentum=0.9,
                 decay=0.0005,
                 learning_rate=0.0005,
                 saturation=1.5,
                 exposure=1.5):
        """
        img_h: Height of input image
        img_w: Width of input image
        img_ch: Channels of input image (3: RGB, 1: Gray-scale)
        data_format: "channels_last" >> (batch, height, width, channels)
                     "channels_first" >> (batch, channels, height, width)
        alpha: Negative slope coefficient for leaky relu
        momentum: Momentum for the moving average for batch normalization
        classes: Numbers of predicted classes
        sides: Numbers of grids NxN on input image
        """
        assert isinstance(img_h, int), "Expect img_h's type to be integer: {}".format(img_h)
        assert isinstance(img_w, int), "Expect img_w's type to be integer: {}".format(img_w)
        assert isinstance(img_ch, int), "Expect img_ch's type to be integer: {}".format(img_ch)
        assert isinstance(classes, int), "Expect classes's type to be integer: {}".format(classes)
        assert isinstance(sides, int), "Expect sides's type to be integer: {}".format(sides)
        assert isinstance(alpha, float), "Expect alpha's type to be float: {}".format(alpha)
        assert isinstance(momentum, float), "Expect momentum's type to be float: {}".format(momentum)
        assert data_format == "channels_last" or data_format == "channels_first"
        
        self.img_h = img_h
        self.img_w = img_w
        self.img_ch = img_ch
        self.data_format = data_format
        self.alpha = alpha
        self.classes = classes
        self.sides = sides
        
        self.momentum = momentum
        self.decay = decay
        self.learning_rate = learning_rate
        
        self.saturation = saturation
        self.exposure = exposure
            
        # Model's input placeholder
        self.inputs = None
        # Model's outputs
        self.Y_hat = None
    
    def _build(self):        
        # Create placeholder for input image
        self.inputs = tf.placeholder(dtype=tf.float32, shape=[None, self.img_h, self.img_w, self.img_ch])
        
        outputs = self.inputs
        
        # Construct YOLOv1 model
        with tf.variable_scope("YOLOv1"):
            # Convolutional layer1
            # (?, 448, 448, 3) >> (?, 224, 224, 64)
            outputs = Conv2D_FixedPad(filters=64, 
                                      kernel_size=7, 
                                      strides=2,
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 224, 224, 64) >> (?, 112, 112, 64)
            outputs = MaxPool2D(pool_size=2,
                                strides=2)(outputs)
            
            # Convolutional layer2
            # (?, 112, 112, 64) >> (?, 112, 112, 192)
            outputs = Conv2D_FixedPad(filters=192, 
                                      kernel_size=3, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 112, 112, 192) >> (?, 56, 56, 192)
            outputs = MaxPool2D(pool_size=2,
                                strides=2)(outputs)
            
            # Convolutional layer3
            # (?, 56, 56, 192) >> (?, 56, 56, 128)
            outputs = Conv2D_FixedPad(filters=128, 
                                      kernel_size=1, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 56, 56, 128) >> (?, 56, 56, 256)
            outputs = Conv2D_FixedPad(filters=256, 
                                      kernel_size=3, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 56, 56, 256) >> (?, 56, 56, 256)
            outputs = Conv2D_FixedPad(filters=256, 
                                      kernel_size=1, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 56, 56, 256) >> (?, 56, 56, 512)
            outputs = Conv2D_FixedPad(filters=512, 
                                      kernel_size=3, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 56, 56, 512) >> (?, 28, 28, 512)
            outputs = MaxPool2D(pool_size=2,
                                strides=2)(outputs)
            
            # Convolutional layer4
            # (?, 28, 28, 512) >> (?, 28, 28, 256) >> (?, 28, 28, 512)
            for _ in range(4):
                outputs = Conv2D_FixedPad(filters=256, 
                                          kernel_size=1, 
                                          activation=LeakyReLU(self.alpha))(outputs)
                
                outputs = Conv2D_FixedPad(filters=512, 
                                          kernel_size=3, 
                                          activation=LeakyReLU(self.alpha))(outputs)
                
            # (?, 28, 28, 512) >> (?, 28, 28, 512)
            outputs = Conv2D_FixedPad(filters=512, 
                                      kernel_size=1, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 28, 28, 512) >> (?, 28, 28, 1024)
            outputs = Conv2D_FixedPad(filters=1024, 
                                      kernel_size=3, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 28, 28, 1024) >> (?, 14, 14, 1024)
            outputs = MaxPool2D(pool_size=2,
                                strides=2)(outputs)
            
            # Convolutional layer5
            # (?, 14, 14, 1024) >> (?, 14, 14, 512) >> (?, 14, 14, 1024)
            for _ in range(2):
                outputs = Conv2D_FixedPad(filters=512, 
                                          kernel_size=1, 
                                          activation=LeakyReLU(self.alpha))(outputs)
                
                outputs = Conv2D_FixedPad(filters=1024, 
                                          kernel_size=3, 
                                          activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 14, 14, 1024) >> (?, 14, 14, 1024)
            outputs = Conv2D_FixedPad(filters=1024, 
                                      kernel_size=3, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 14, 14, 1024) >> (?, 7, 7, 1024)
            outputs = Conv2D_FixedPad(filters=1024, 
                                      kernel_size=3,
                                      strides=2,
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # Convolutional layer6
            # (?, 7, 7, 1024) >> (?, 7, 7, 1024)
            outputs = Conv2D_FixedPad(filters=1024, 
                                      kernel_size=3, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # (?, 7, 7, 1024) >> (?, 7, 7, 1024)
            outputs = Conv2D_FixedPad(filters=1024, 
                                      kernel_size=3, 
                                      activation=LeakyReLU(self.alpha))(outputs)
            
            # Locally connected layer1
            # (?, 7, 7, 1024) >> (?, 7, 7, 256)
            outputs = LocallyConnected2D(filters=256,
                                         kernel_size=3,
                                         activation=LeakyReLU(alpha=self.alpha))(outputs)
            
            # Flatten
            # (? 7, 7, 256) >> (?, 12544)
            outputs = Flatten()(outputs)
            
            # Dropout
            outputs = Dropout(0.5, seed=42)(outputs)
            
            # Fully connected layer1
            # (?, 12544) >> (?, 1715)
            outputs = Dense(1715, activation=LeakyReLU(alpha=self.alpha))(outputs)
            
            # Detection
            # (?, 1715) >> (?, 25)
            # 1st - 5th are [confidence, x, y, width, height]
            # 6th - 25th are class probabilities
            outputs = Dense(25)(outputs)
            
    def _loss_func(Y_hat, Y):
        pass
            
    def train(self, X, Y):
        """
        X: Numpy array of images of 4 dimensions (batch, height, width, channels) or (batch, channels, height, width)
        """
        assert isinstance(X, np.ndarray), "Expect inputs's type to be numpy array: {}".format(inputs)
        assert len(X.shape) == 4, "Expect inputs's shape to be 4 dimensions (batch, height, width, channels) or (batch, channels, height, width): {}".format(inputs.shape)
        
        # If data_format is "channels first", it shall be transform to "channels last" first
        if self.data_format == "channels_first":
            X = np.moveaxis(X, 1, -1)
        
        # Reset graph
        tf.reset_default_graph()
        
        # Build model
        self.Y_hat = self._build()
        
        # Train model
        init = tf.global_variables_initializer()
        with tf.Session() as sess:
            sess.run(init)
        
        

In [13]:
# Reset graph
tf.reset_default_graph()

model = NetWork()
model._build()

In [14]:
tf.get_collection(tf.GraphKeys.VARIABLES)

[<tf.Variable 'YOLOv1/conv2d/kernel:0' shape=(7, 7, 3, 64) dtype=float32>,
 <tf.Variable 'YOLOv1/conv2d/bias:0' shape=(64,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1/gamma:0' shape=(64,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1/beta:0' shape=(64,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1/moving_mean:0' shape=(64,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1/moving_variance:0' shape=(64,) dtype=float32>,
 <tf.Variable 'YOLOv1/conv2d_1/kernel:0' shape=(3, 3, 64, 192) dtype=float32>,
 <tf.Variable 'YOLOv1/conv2d_1/bias:0' shape=(192,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1_1/gamma:0' shape=(192,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1_1/beta:0' shape=(192,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1_1/moving_mean:0' shape=(192,) dtype=float32>,
 <tf.Variable 'YOLOv1/batch_normalization_v1_1/moving_variance:0' shape=(192,) dtype=float32>,
 <tf.Variable '

In [None]:
# (?, 7, 7, 1024) >> (?, 7, 7, 256)   (25=5x5, 9216=3*3*1024, 256)