In [1]:
import keras.layers as KL
from keras.models import Model
import keras.backend as K
import tensorflow as tf
from dataset import dataset
import keras
import numpy as np

Using TensorFlow backend.


In [2]:
def building_block(filters, block):
    if block !=0:
        stride = 1
    else:
        stride = 2
    def f(x):
        y = KL.Conv2D(filters,(1,1),strides=stride,train)(x)
        y = KL.BatchNormalization(axis=3)(y)
        y = KL.Activation('relu')(y)
        
        y = KL.Conv2D(filters,(3,3),padding='same')(y)
        y = KL.BatchNormalization(axis=3)(y)
        y = KL.Activation('relu')(y)        
    
        y = KL.Conv2D(4*filters,(1,1))(y)
        y = KL.BatchNormalization(axis=3)(y)
        
        if block == 0:
            shortcut = KL.Conv2D(4*filters,(1,1),strides=stride)(x)
            shortcut = KL.BatchNormalization(axis=3)(shortcut)
        else:
            shortcut = x
        y = KL.Add()([y,shortcut])
        y = KL.Activation('relu')(y)
        return y
    return f

SyntaxError: positional argument follows keyword argument (<ipython-input-2-5bb54f05dd81>, line 7)

In [None]:
def resnet_featureExtractor(inputs):
    x = KL.Conv2D(64,(3,3),padding='same'，)(inputs)
    x = KL.BatchNormalization(axis=3)(x)
    x = KL.Activation('relu')(x)
    
    filters = 32
    blocks = [3,6,4]
    for i,blocknum in enumerate(blocks):
        for block_id in range(blocknum):
            x = building_block(filters, block_id)(x)
        filters *= 2
    return x

In [None]:
def rpn_net(inputs, k):
    shared_map = KL.Conv2D(256,(3,3),padding='same')(inputs)
    shared_map = KL.Activation('linear')(shared_map)
    rpn_class = KL.Conv2D(2*k,(1,1))(shared_map)
    rpn_class = KL.Lambda(lambda x:tf.reshape(x,[tf.shape(x)[0],-1,2]))(rpn_class)
    rpn_class = KL.Activation('linear')(rpn_class)
    rpn_prob = KL.Activation('softmax')(rpn_class)
    
    y = KL.Conv2D(4*k,(1,1))(shared_map)
    y = KL.Activation('linear')(y)
    
    rpn_bbox = KL.Lambda(lambda x:tf.reshape(x,[tf.shape(x)[0],-1,4]))(y)
    return rpn_class, rpn_prob, rpn_bbox

In [None]:
def rpn_class_loss(rpn_match, rpn_class_logits):
    ## rpn_match (None, 576, 1)
    ## rpn_class_logits (None, 576, 2)
    rpn_match = tf.squeeze(rpn_match, -1)
    indices = tf.where(K.not_equal(rpn_match,0))
    
    anchor_class = K.equal(rpn_match, 1)
    anchor_class = K.cast(anchor_class, tf.int32)   
    
    rpn_class_logits = tf.gather_nd(rpn_class_logits,indices)  ###prediction
    anchor_class = tf.gather_nd(anchor_class, indices)###target
    loss = K.sparse_categorical_crossentropy(target=anchor_class, output=rpn_class_logits, from_logits=True)
    loss = K.switch(tf.size(loss) > 0, K.mean(loss), tf.constant(0.0))
    return loss

def rpn_bbox_loss(target_bbox, rpn_match, rpn_bbox):
    rpn_match = tf.squeeze(rpn_match, -1)
    indices = tf.where(K.equal(rpn_match, 1))
    rpn_bbox = tf.gather_nd(rpn_bbox, indices)
    batch_counts = K.equal(rpn_match, 1)
    batch_counts = K.cast(batch_counts, tf.int32)
    batch_counts = K.sum(batch_counts, axis=1)
    target_bbox = tf.gather_nd(target_bbox, indices)
    diff = K.abs(target_bbox - rpn_bbox)
    less_than_one = K.cast(K.less(diff, 1.0), 'float32')
    loss = (less_than_one * 0.5 * diff**2) + (1 - less_than_one) * 0.5
    loss = K.switch(tf.size(loss) > 0,K.mean(loss), tf.constant(0.0))
    return loss
    
def classifier_deltes_loss(target_deltes,Fdeltes):
    t = tf.reshape(target_deltes[0],[1,-1,4])
    F = tf.reshape(Fdeltes[0],[1,-1,4])
    diff = t-F
    lessTone = K.cast(K.less(diff, 1.0), "float32")
    loss = (lessTone *0.5* diff**2) + (1-lessTone)*0.5
    loss = K.mean(loss)
    return loss
    
def classifier_class_loss(ClassID,Fclass):
    loss = K.mean(K.sparse_categorical_crossentropy(target = ClassID, output=Fclass))
    return loss

In [None]:
from layers import target_detector, proposal, classifier

In [None]:
'''
def classifier(feature_map,cropsize):
    roifeature = roipooling((cropsize*2,cropsize*2), tf.shape(feature_map)[1:3])([feature_map,proposal])
    
    roifeature = tf.map_fn(lambda x:KL.pooling.MaxPool2D()(x),roifeature)
    
    roifeature = KL.Conv2D(64,(3,3),padding='valid',strides=2)(roifeature)
    roifeature = KL.Conv2D(64,(3,3),padding='valid',strides=2)(roifeature)
    roifeature = KL.BatchNormalization(axis = 3)(roifeature)
 
    return roifeature
     '''  
    
    

In [None]:
input_image = KL.Input(shape=[None,None,3], dtype=tf.float32)
input_gtCWH = KL.Input(shape=[None,5], dtype=tf.float32)
input_anchorsCWH = KL.Input(shape=[None,4], dtype=tf.float32)
input_rpn_match = KL.Input(shape=[None,1], dtype=tf.int32)
input_rpn_bbox = KL.Input(shape=[None,4], dtype=tf.float32)

feature_map = resnet_featureExtractor(input_image)
rpn_class, rpn_prob, rpn_bbox = rpn_net(feature_map, 9)

proposal = proposal(0.8, 100, tf.shape(input_image)[1:3])([rpn_class,rpn_bbox,input_anchorsCWH])#input[rpn2K,rpn4K,anchorsCWH]
classID, target_deltes = target_detector(tf.shape(input_image)[1:3])([proposal,input_gtCWH])#input[proposal,imgsize,gt_bboxsCWH]
Fclass, Fdeltes = classifier(7,tf.shape(feature_map)[1:3],512,5)([feature_map,proposal])

loss_rpn_match = KL.Lambda(lambda x:rpn_class_loss(*x), name='loss_rpn_match')([input_rpn_match, rpn_class])
loss_rpn_bbox = KL.Lambda(lambda x:rpn_bbox_loss(*x), name='loss_rpn_bbox')([input_rpn_bbox, input_rpn_match, rpn_bbox])
loss_classifier_deltes = KL.Lambda(lambda x:classifier_deltes_loss(*x), name='loss_classifier_deltes')([target_deltes,Fdeltes])
loss_classifier_class = KL.Lambda(lambda x:classifier_class_loss(*x), name='loss_classifier_class')([classID,Fclass])

model = Model([input_image,input_gtCWH,input_anchorsCWH,input_rpn_match,input_rpn_bbox],
              [loss_rpn_match,loss_rpn_bbox,loss_classifier_deltes,loss_classifier_class])

rlc = model.get_layer('loss_rpn_match').output
rld = model.get_layer('loss_rpn_bbox').output
cfc = model.get_layer('loss_classifier_class').output
cfd = model.get_layer('loss_classifier_deltes').output

model.add_loss(rlc)
model.add_loss(rld)
model.add_loss(cfc)
model.add_loss(cfd)
model.compile(loss=[None]*len(model.output), optimizer=keras.optimizers.SGD(lr=0.0005, momentum = 0.9))

In [None]:
Dir = '../data/VOC-kd'
data = dataset(Dir)
getn = data.gen_batch()
'''                yield [ img,
                        bboxCWH,
                        anchorsCWH,
                        input_rpn_match,
                        rpn_bbox,
                        TL[0]
                        ],[]
                        '''
a = next(getn)[0]

In [None]:
train = model.fit_generator(getn,steps_per_epoch=len(data.TrainList), epochs=2,
                       )