# Traffic Light Detection and Classification
Using a pre-trained model to detect objects in an image.

In [4]:
import numpy as np
import os
import sys
import tensorflow as tf
import time

from collections import defaultdict
from io import StringIO
from matplotlib import pyplot as plt
import cv2
from PIL import Image
%matplotlib inline

# imports from the object detection module
sys.path.append('/nfs/private/models/research/object_detection')
from utils import label_map_util
from utils import visualization_utils as vis_util

## Model preparation

In [27]:
dataset_dir = 'lara'
PATH_TO_IMAGES_DIR = 'lara/Lara3D_UrbanSeq1_JPG'
EVAL_IMAGES_DIR = PATH_TO_IMAGES_DIR + '_INFERRED'

model_name = 'ssd_mobilenet'
model_path = 'models/frozen_%s/frozen_inference_graph.pb' % model_name
PATH_TO_LABELS = 'label_map.pbtxt'

NUM_CLASSES = 3

## Loading label map

Label maps map indices to category names, so that when our convolution network predicts 2, we know that this corresponds to Red. Here we use internal utility functions, but anything that returns a dictionary mapping integers to appropriate string labels would be fine.

In [6]:
label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
category_index = label_map_util.create_category_index(categories)
print(category_index)

{1: {'id': 1, 'name': u'Green'}, 2: {'id': 2, 'name': u'Red'}, 3: {'id': 3, 'name': u'Yellow'}}


In [7]:
def load_image_into_numpy_array(image):
  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (im_height, im_width, 3)).astype(np.uint8)

from glob import glob

## Detection

## Testing Models

In [10]:
detection_graph = tf.Graph()

with detection_graph.as_default():
  od_graph_def = tf.GraphDef()

  with tf.gfile.GFile(model_path, 'rb') as fid:        
    serialized_graph = fid.read()
    od_graph_def.ParseFromString(serialized_graph)
    tf.import_graph_def(od_graph_def, name='')

In [29]:
print(os.path.join(PATH_TO_IMAGES_DIR, '*.jpg'))
TEST_IMAGE_PATHS = glob(os.path.join(PATH_TO_IMAGES_DIR, '*.jpg'))
TEST_IMAGE_PATHS = sorted(TEST_IMAGE_PATHS)
print("Length of test images:", len(TEST_IMAGE_PATHS))

# test_idxs = [890, 961, 3738]
# TEST_IMAGE_PATHS = ['%s/frame_%06d.jpg'% (PATH_TO_IMAGES_DIR, idx) for idx in test_idxs]

lara/Lara3D_UrbanSeq1_JPG/*.jpg
('Length of test images:', 11179)


In [30]:
is_plot_result = False
is_make_video = True
min_score_thresh = .30

def draw_a_detection_result(boxes, scores, classes, num, image_np, t_elapsed):
    boxes = np.squeeze(boxes)
    scores = np.squeeze(scores)
    classes = np.squeeze(classes).astype(np.int32)

    # Visualization of the results of a detection.
    vis_util.visualize_boxes_and_labels_on_image_array(
        image_np, boxes, classes, scores,
        category_index,
        min_score_thresh=min_score_thresh,
        use_normalized_coordinates=True,
        line_thickness=3)

    if is_plot_result:
        plt.figure(figsize=(12, 8))
        plt.imshow(image_np)
        plt.show()

#         print 'num boxes =', boxes.shape[0]
#         print num, classes, scores
#         print image_np.shape

        for i in range(boxes.shape[0]):
            if scores is None or scores[i] > min_score_thresh:
                class_name = category_index[classes[i]]['name']
                print('{}'.format(class_name), scores[i])                  
                #print 'box:', boxes[i] # ymin, xmin, ymax, xmax = box
                print("Time in milliseconds", t_elapsed * 1000, "\n")

                
def make_video(images, outvid='output.avi', fps=5, size=None,
               is_color=True, format="XVID"):
    """
    Create a video from a list of images.
 
    @param      images      list of images to use in the video
    @param      outvid      output video name
    @param      fps         frame per second
    @param      size        size of each frame
    @param      is_color    color
    @param      format      see http://www.fourcc.org/codecs.php
    @return                 see http://opencv-python-tutroals.readthedocs.org/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html
 
    The function relies on http://opencv-python-tutroals.readthedocs.org/en/latest/.
    By default, the video will have the size of the first image.
    It will resize every image to this size before adding them to the video.
    
    from: http://www.xavierdupre.fr/blog/2016-03-30_nojs.html
    """
    from cv2 import VideoWriter, VideoWriter_fourcc, imread, resize
    fourcc = VideoWriter_fourcc(*format)
    vid = None
    for image in images:
        if not os.path.exists(image):
            raise FileNotFoundError(image)
        img = imread(image)
        if vid is None:
            if size is None:
                size = img.shape[1], img.shape[0]
            vid = VideoWriter(outvid, fourcc, float(fps), size, is_color)
        if size[0] != img.shape[1] and size[1] != img.shape[0]:
            img = resize(img, size)
        vid.write(img)
    vid.release()
    return vid

In [31]:
with detection_graph.as_default():
    with tf.Session(graph=detection_graph) as sess:
        # Definite input and output Tensors for detection_graph
        image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
        
        # Each box represents a part of the image where a particular object was detected.
        detection_boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
        
        # Each score represent how level of confidence for each of the objects.
        # Score is shown on the result image, together with the class label.
        detection_scores = detection_graph.get_tensor_by_name('detection_scores:0')
        detection_classes = detection_graph.get_tensor_by_name('detection_classes:0')
        num_detections = detection_graph.get_tensor_by_name('num_detections:0')
        
        def eval_an_image(image_path):
            image = Image.open(image_path)
            # the array based representation of the image will be used later in order to prepare the
            # result image with boxes and labels on it.
            image_np = load_image_into_numpy_array(image)
            # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
            image_np_expanded = np.expand_dims(image_np, axis=0)

            time0 = time.time()

            # Actual detection.
            ret = sess.run(
              [detection_boxes, detection_scores, detection_classes, num_detections],
              feed_dict={image_tensor: image_np_expanded})
            
            time1 = time.time()
            return ret, image_np, time1 - time0

        tot_time_elapsed = 0.
        for image_path in TEST_IMAGE_PATHS:
            (boxes, scores, classes, num), image_np, time_elapsed = eval_an_image(image_path)
            tot_time_elapsed += time_elapsed
            
            draw_a_detection_result(boxes, scores, classes, num, image_np, time_elapsed)                
            if is_make_video:
                if not os.path.exists(EVAL_IMAGES_DIR):
                    os.makedirs(EVAL_IMAGES_DIR)
                Image.fromarray(image_np).save(EVAL_IMAGES_DIR + '/' + os.path.basename(image_path))

In [32]:
print 'tot_time_elapsed:', tot_time_elapsed

tot_time_elapsed: 100.490507126


In [33]:
# Make Video
eval_images = sorted(glob(os.path.join(EVAL_IMAGES_DIR, '*.jpg')))
print("Length of inferred images:", len(eval_images))
make_video(eval_images, outvid=dataset_dir+'/inferred.avi', fps=25)

('Length of inferred images:', 11179)


<VideoWriter 0x7f71ab70a790>