In [1]:
# import required classes
import numpy as np
import os
import csv
from skimage.io import imread, imsave
from PIL import Image, ImageDraw, ImageFont
from keras.models import load_model

import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
from keras.layers import GlobalAveragePooling2D, Dense
from keras.models import Model 
from keras.applications.mobilenetv2 import MobileNetV2
import sys

# configure memory allocate for tensorflow backend
config = tf.ConfigProto()
#config.gpu_options.per_process_gpu_memory_fraction = 0.6
config.gpu_options.allow_growth = True  #dynamically grow the memory used on the GPU
set_session(tf.Session(config=config))

nb_classes = 3

def build_MobileNetV2_model():
    net = MobileNetV2(input_shape=(512, 512, 3), include_top=None, weights=None, classes=nb_classes)
    x = net.output

    outputs = []
    multi_output = 4

    for i in range(multi_output):
        pool = GlobalAveragePooling2D()(x)
        dense = Dense(nb_classes, activation='softmax', use_bias=True)(pool)
        outputs.append(dense)

    model = Model(net.inputs, outputs=outputs, name='mobilenetv2')
    return model

# MobileNetV2
voa_model = load_model('C:\\Users\\hoang\\WorkingSpace\\TrainingModels\\keras\\results\\model_mobilenetv2_view-offset-altitude_08.hdf5')
cam_pitch_model = load_model('C:\\Users\\hoang\\WorkingSpace\\TrainingModels\\keras\\results\\model_mobilenetv2_cam-pitch_transfer-learning_09.hdf5')
model = build_MobileNetV2_model()

w1 = voa_model.get_weights()
w2 = cam_pitch_model.get_weights()
w = w1 + w2[-2:]

model.set_weights(w) 

Using TensorFlow backend.


In [2]:
def write_probs_on_image(vv_prob, height_prob, offset_prob, view_prob, image_array, img_path, ground_truth):
    # initialise the drawing context with
    # the image object as background
    img = Image.fromarray(image_array)
    draw = ImageDraw.Draw(img)
    # create font object with the font file and specify
    # desired size
    font = ImageFont.truetype('..\\Roboto-Medium.ttf', size=15)
    
    # OFFSET
    (x, y) = (235, 450)
    color = 'rgb(0, 255, 255)' 
    draw.text((x, y), str("Offset: ({})".format(ground_truth[0])), fill=color, font=font)
    
    (x, y) = (310, 450)
    color = 'rgb(255, 255, 0)' 
    draw.text((x, y), 'L', fill=color, font=font)
    draw.text((x+15, y), str("{0:.3f}".format(offset_prob[0])), fill=color, font=font)

    (x, y) = (375, 450)
    color = 'rgb(0, 255, 0)' #  color
    draw.text((x, y), 'C', fill=color, font=font)
    draw.text((x+15, y), str("{0:.3f}".format(offset_prob[1])), fill=color, font=font)

    (x, y) = (440, 450)
    color = 'rgb(255, 0, 255)' #  color
    draw.text((x, y), 'R', fill=color, font=font)
    draw.text((x+15, y), str("{0:.3f}".format(offset_prob[2])), fill=color, font=font)
    
    # VIEW
    (x, y) = (235, 420)
    color = 'rgb(0, 255, 255)' 
    draw.text((x, y), "View: ({})".format(ground_truth[1]), fill=color, font=font)
    
    (x, y) = (310, 420)
    color = 'rgb(255, 255, 0)' 
    draw.text((x, y), 'L', fill=color, font=font)
    draw.text((x+15, y), str("{0:.3f}".format(view_prob[0])), fill=color, font=font)

    (x, y) = (375, 420)
    color = 'rgb(0, 255, 0)' #  color
    draw.text((x, y), 'S', fill=color, font=font)
    draw.text((x+15, y), str("{0:.3f}".format(view_prob[1])), fill=color, font=font)

    (x, y) = (440, 420)
    color = 'rgb(255, 0, 255)' #  color
    draw.text((x, y), 'R', fill=color, font=font)
    draw.text((x+15, y), str("{0:.3f}".format(view_prob[2])), fill=color, font=font)
    
    # HEIGHT
    # starting position of the message
    (x, y) = (300, 300)
    color = 'rgb(0, 255, 255)' 
    draw.text((x, y), str("Altitude: ({})".format(ground_truth[2])), fill=color, font=font)
    
    (x, y) = (300, 320)
    color = 'rgb(255, 255, 0)' 
    draw.text((x, y), 'H', fill=color, font=font)
    draw.text((x+20, y), str("{0:.3f}".format(height_prob[2])), fill=color, font=font)

    (x, y) = (300, 340)
    color = 'rgb(0, 255, 0)' #  color
    draw.text((x, y), 'M', fill=color, font=font)
    draw.text((x+20, y), str("{0:.3f}".format(height_prob[1])), fill=color, font=font)

    (x, y) = (300, 360)
    color = 'rgb(255, 0, 255)' #  color
    draw.text((x, y), 'L', fill=color, font=font)
    draw.text((x+20, y), str("{0:.3f}".format(height_prob[0])), fill=color, font=font)
    
    # VERTICAL_VIEW
    # starting position of the message
    (x, y) = (400, 300)
    color = 'rgb(0, 255, 255)' 
    draw.text((x, y), 'Vertical-View: ', fill=color, font=font)
    
    (x, y) = (400, 320)
    color = 'rgb(255, 255, 0)' 
    draw.text((x, y), 'U', fill=color, font=font)
    draw.text((x+20, y), str("{0:.3f}".format(vv_prob[2])), fill=color, font=font)

    (x, y) = (400, 340)
    color = 'rgb(0, 255, 0)' #  color
    draw.text((x, y), 'M', fill=color, font=font)
    draw.text((x+20, y), str("{0:.3f}".format(vv_prob[1])), fill=color, font=font)

    (x, y) = (400, 360)
    color = 'rgb(255, 0, 255)' #  color
    draw.text((x, y), 'D', fill=color, font=font)
    draw.text((x+20, y), str("{0:.3f}".format(vv_prob[0])), fill=color, font=font)
    
    
    # save the edited image
    #print(img_path)
    img.save(img_path)

import re
def getGroundTruth(f, pitch=False):
    if pitch==False:
        return getGroundTruth_1(f)
    else:
        return getGroundTruth_2(f)
def getGroundTruth_1 (f):
    p = re.compile('.*(left|center|right)Offset_(left|straight|right)View_(low|medium|high)Altitude.*')
    finds = p.findall(f)
    #TODO assert only finds has only 1 element [(_,_)]
    (offsetStr, viewStr, altitudeStr) =  finds[0]
    ground_truth = [offsetStr[0].upper(), viewStr[0].upper(), altitudeStr[0].upper(), 'M']
    return ground_truth

def getGroundTruth_2(f):
    p = re.compile('.*(down|medium|up).*')
    finds = p.findall(f)
    #TODO assert only finds has only 1 element [(_,_)]
    (pitchStr) =  finds[0]
    ground_truth = ['C', 'S', '', pitchStr[0].upper()]
    return ground_truth

enum_offset = ('L', 'C', 'R')
enum_view = ('L', 'S', 'R')
enum_height = ('L', 'M', 'H')
enum_pitch = ('D', 'M', 'U')

def evaluate (ground_truth, offset_prob, view_prob, height_prob, pitch_prob, pitch, result_writer, f):
    res = [False, False, False, False]
    
    offset_pred = np.argmax(offset_prob)
    offset_truth = enum_offset.index (ground_truth[0])
    if (offset_pred == offset_truth):
        res[0] = True
                                    
    view_pred = np.argmax(view_prob)  
    view_truth = enum_view.index (ground_truth[1])
    if ( view_pred == view_truth):
        res[1] = True
    
    
    height_pred = np.argmax(height_prob)
    if (pitch): #ignore in case pitch
        height_truth = height_pred
    else:
        height_truth = enum_height.index (ground_truth[2])    
    if (height_pred == height_truth):
        res[2] = True
        
    pitch_pred = np.argmax(pitch_prob)
    pitch_truth = enum_pitch.index (ground_truth[3])    
    if ( pitch_pred == pitch_truth):
        res[3] = True
    
    row = [f, offset_pred, offset_truth, view_pred, view_truth, height_pred, height_truth, pitch_pred, pitch_truth, str(res)]
    result_writer.writerow(row)
    return res

In [3]:
import glob


def run_prediction(input_path, output_path, pitch): 
    output_path_fail = output_path + 'fail\\'
    output_path_pass = output_path + 'pass\\'

    if not os.path.exists(output_path):
        os.makedirs(output_path)
    if not os.path.exists(output_path_fail):
        os.makedirs(output_path_fail)
    if not os.path.exists(output_path_pass):
        os.makedirs(output_path_pass)

    files = [f for f in glob.glob(input_path + "**/*.png", recursive=True)]

    result_file_path = os.path.join (output_path, 'result.csv')
    fails = 0
    with open(result_file_path, mode='w') as result_file:
        fieldnames = ['path', 'offset_pred', 'offset_truth', 'view_pred', 'view_truth', 'height_pred', 'height_truth', 'pitch_pred', 'pitch_truth', 'result']
        result_writer = csv.writer(result_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        result_writer.writerow(fieldnames)

        print ("total:", len(files))
        for f in files:
    #         print(f)

            img = imread(f)

            img_rgb = img[:,:,:3]
            img_norm = img_rgb / 255.0
            img_final = np.reshape(img_norm,[1,512,512,3])
            probs = model.predict(img_final)

            offset_prob = probs[0][0]
            view_prob = probs[1][0]
            height_prob = probs[2][0]
            pitch_prob = probs[3][0]

            ground_truth = getGroundTruth (f, pitch)
            res = evaluate(ground_truth, offset_prob, view_prob, height_prob, pitch_prob, pitch, result_writer, f)
            
            
            name = os.path.basename(f)
            if (res[0] and res[1] and res[2] and res [3]):
                img_path = os.path.join(output_path_pass, 'prob_'+name)
            else:
                img_path = os.path.join(output_path_fail, 'prob_'+name)
                fails +=1

    #         print(res)
            write_probs_on_image(pitch_prob, height_prob, offset_prob, view_prob, img_rgb, img_path, ground_truth)

    print ('fails: ', fails)



In [5]:
input_path = 'E:\\UAV_drone\\data\\training\\constant_distance_to_lines\\val\\'
output_path = 'E:\\UAV_drone\\data\\predictions\\MobileNetV2\\Offset_View_Height\\'
run_prediction (input_path, output_path, False)


total: 29802
fails:  12374


In [6]:
input_path = 'E:\\UAV_drone\\data\\training\\vertical_view\\val\\'
output_path = 'E:\\UAV_drone\\data\\predictions\\MobileNetV2\\Pitch\\'
run_prediction (input_path, output_path, True)

total: 12117
fails:  3529
