# Extracting cropped images

In [None]:
import numpy as np
import tensorflow as tf
from scipy import misc
from random import shuffle, random
import cv2
import os
import six.moves.urllib as urllib
import tarfile
from PIL import Image
from tqdm import tqdm
from time import gmtime, strftime
import yaml
from PIL import Image

In [None]:
# Initializing Tensorflow detection API
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util

In [None]:
class DetectionObj(object):
    """
    DetectionObj is a class suitable to leverage Google Tensorflow
    detection API for image annotation from different sources:
    files, images acquired by own's webcam, videos.
    """

    def __init__(self, model='ssd_mobilenet_v1_coco_11_06_2017'):
        """
        The instructions to be run when the class is instantiated
        """

        # Path where the Python script is being run
        self.CURRENT_PATH = os.getcwd()

        # Path where to save the annotations (it can be modified)
        self.TARGET_PATH = self.CURRENT_PATH

        # Selection of pre-trained detection models
        # from the Tensorflow Model Zoo
        self.MODELS = ["ssd_mobilenet_v1_coco_11_06_2017",
                       "ssd_inception_v2_coco_11_06_2017",
                       "rfcn_resnet101_coco_11_06_2017",
                       "faster_rcnn_resnet101_coco_11_06_2017",
                       "faster_rcnn_inception_resnet_v2_atrous_coco_11_06_2017"
                       ]

        # Setting a threshold for detecting an object by the models
        self.THRESHOLD = 0.25 # Most used threshold in practice

        # Checking if the desired pre-trained detection model is available
        if model in self.MODELS:
            self.MODEL_NAME = model
        else:
            # Otherwise revert to a default model
            print("Model not available, reverted to default", self.MODELS[0])
            self.MODEL_NAME = self.MODELS[0]

        # The file name of the Tensorflow frozen model
        self.CKPT_FILE = os.path.join(self.CURRENT_PATH, 'object_detection',
                                      self.MODEL_NAME, 'frozen_inference_graph.pb')

        # Attempting loading the detection model, if not available on disk,
        # it will be downloaded from Internet(an Internet connection is required)
        try:
            self.DETECTION_GRAPH = self.load_frozen_model()
        except:
            print ('Couldn\'t find', self.MODEL_NAME)
            self.download_frozen_model()
            self.DETECTION_GRAPH = self.load_frozen_model()

        # Loading the labels of the classes recognized by the detection model
        self.NUM_CLASSES = 90
        path_to_labels = os.path.join(self.CURRENT_PATH,
                                      'object_detection', 'data', 'mscoco_label_map.pbtxt')
        label_mapping = label_map_util.load_labelmap(path_to_labels)
        extracted_categories = label_map_util.convert_label_map_to_categories(label_mapping,
                                                                    max_num_classes=self.NUM_CLASSES,
                                                                    use_display_name=True)
        self.LABELS = {item['id']: item['name'] for item in extracted_categories}
        self.CATEGORY_INDEX = label_map_util.create_category_index(extracted_categories)

        # Starting the tensorflow session
        self.TF_SESSION = tf.Session(graph=self.DETECTION_GRAPH)

    def load_frozen_model(self):
        """
        Loading frozen detection model in ckpt file from disk to memory 
        """
        detection_graph = tf.Graph()
        with detection_graph.as_default():
            od_graph_def = tf.GraphDef()
            with tf.gfile.GFile(self.CKPT_FILE, 'rb') as fid:
                serialized_graph = fid.read()
                od_graph_def.ParseFromString(serialized_graph)
                tf.import_graph_def(od_graph_def, name='')
        return detection_graph

    def download_frozen_model(self):
        """
        Downloading frozen detection model from Internet 
        when not available on disk 
        """
        def my_hook(t):
            """
            Wrapping tqdm instance in order to monitor URLopener  
            """
            last_b = [0]

            def inner(b=1, bsize=1, tsize=None):
                if tsize is not None:
                    t.total = tsize
                t.update((b - last_b[0]) * bsize)
                last_b[0] = b

            return inner

        # Opening the url where to find the model
        model_filename = self.MODEL_NAME + '.tar.gz'
        download_url = 'http://download.tensorflow.org/models/object_detection/'
        opener = urllib.request.URLopener()

        # Downloading the model with tqdm estimations of completion
        print('Downloading ...')
        with tqdm() as t:
            opener.retrieve(download_url + model_filename,
                            model_filename, reporthook=my_hook(t))

        # Extracting the model from the downloaded tar file
        print ('Extracting ...')
        tar_file = tarfile.open(model_filename)
        for file in tar_file.getmembers():
            file_name = os.path.basename(file.name)
            if 'frozen_inference_graph.pb' in file_name:
                tar_file.extract(file, os.path.join(self.CURRENT_PATH,
                                                    'object_detection'))

    def load_image_from_disk(self, image_path):
        """
        Loading an image from disk
        """
        return Image.open(image_path)

    def load_image_into_numpy_array(self, image):
        """
        Turning an image into a Numpy ndarray
        """
        try:
            (im_width, im_height) = image.size
            return np.array(image.getdata()).reshape(
                (im_height, im_width, 3)).astype(np.uint8)
        except:
            # If the previous procedure fails, we expect the
            # image is already a Numpy ndarray
            return image

    def detect(self, images, annotate_on_image=True):
        """
        Processing a list of images, feeding it into the detection
        model and getting from it scores, bounding boxes and predicted
        classes present in the images
        """
        if type(images) is not list:
            images = [images]
        results = list()
        for image in images:
            # the array based representation of the image will be used later in order to prepare the
            # result image with boxes and labels on it.
            image_np = self.load_image_into_numpy_array(image)
            # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
            image_np_expanded = np.expand_dims(image_np, axis=0)
            image_tensor = self.DETECTION_GRAPH.get_tensor_by_name('image_tensor:0')
            # Each box represents a part of the image where a particular object was detected.
            boxes = self.DETECTION_GRAPH.get_tensor_by_name('detection_boxes:0')
            # Each score represent how level of confidence for each of the objects.
            # Score could be shown on the result image, together with the class label.
            scores = self.DETECTION_GRAPH.get_tensor_by_name('detection_scores:0')
            classes = self.DETECTION_GRAPH.get_tensor_by_name('detection_classes:0')
            num_detections = self.DETECTION_GRAPH.get_tensor_by_name('num_detections:0')
            # Actual detection happens here
            (boxes, scores, classes, num_detections) = self.TF_SESSION.run(
                [boxes, scores, classes, num_detections],
                feed_dict={image_tensor: image_np_expanded})
            if annotate_on_image:
                new_image = self.detection_on_image(image_np, boxes, scores, classes)
                results.append((new_image, boxes, scores, classes, num_detections))
            else:
                results.append((image_np, boxes, scores, classes, num_detections))
        return results
    
    def detection_on_image(self, image_np, boxes, scores, classes):
        """
        Overimposing detection boxes on the images over the detected classes: 
        """
        vis_util.visualize_boxes_and_labels_on_image_array(
            image_np,
            np.squeeze(boxes),
            np.squeeze(classes).astype(np.int32),
            np.squeeze(scores),
            self.CATEGORY_INDEX,
            use_normalized_coordinates=True,
            line_thickness=8)
        return image_np

In [None]:
def visualize(data, saving=False):
    """visualizing and saving an image"""
    img = Image.fromarray(data, 'RGB')
    if saving:
        img.save('my.png')
    img.show()

In [None]:
def read_image_from_disk(filename):
    """
    Reads a png image from disk and 
    converts it into a Numpy ndarray
    """
    file_contents = misc.imread(filename)
    return file_contents

In [None]:
def get_simulation_training():
    """Arranging examples from simulator's images"""
    labels, filenames = (list(), list())
    for k, light in enumerate(['red', 'yellow', 'green', 'none']):
        path = os.path.join(os.getcwd(), 'simulator_images', light)
        examples = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
        labels += [light] * len(examples)
        filenames += examples
    return np.array(filenames), np.array(labels)

In [None]:
def get_rosbag_training():
     """Arranging examples from rosbag's images"""
    labels, filenames = (list(), list())
    for k, light in enumerate(['red','green']):
        path = os.path.join(os.getcwd(), 'ros_bag_images', light)
        examples = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
        labels += [light] * len(examples)
        filenames += examples
    return np.array(filenames), np.array(labels)

In [None]:
def get_all_bosch_labels(input_yaml, riib=False):
    """ Gets all labels within label file
    Note that RGB images are 1280x720 and RIIB images are 1280x736.
    :param input_yaml: Path to yaml file
    :param riib: If True, change path to labeled pictures
    :return: images: Labels for traffic lights
    """
    images = yaml.load(open(input_yaml, 'rb').read())

    for i in range(len(images)):
        images[i]['path'] = os.path.abspath(os.path.join(os.path.dirname(input_yaml), images[i]['path']))
        if riib:
            images[i]['path'] = images[i]['path'].replace('.png', '.pgm')
            images[i]['path'] = images[i]['path'].replace('rgb/train', 'riib/train')
            images[i]['path'] = images[i]['path'].replace('rgb/test', 'riib/test')
            for box in images[i]['boxes']:
                box['y_max'] = box['y_max'] + 8
                box['y_min'] = box['y_min'] + 8
    return images

def process_bosch_training_data(filename):
     """Arranging examples from Bosch's images"""
    train_labels = get_all_bosch_labels(filename)
    numeric_label = {'R':'red', 'Y':'yellow', 'G':'green', 'V':'null'}
    preprocessed_labels = list()
    labels = list()
    filenames = list()
    for n, label in enumerate(train_labels):
        preprocessed_labels.append(list())
        for box in label['boxes']:
            preprocessed_labels[n].append(box['label'])
        if len(preprocessed_labels[n]) == 0:
            preprocessed_labels[n].append('Void')
        color = {item[0] for item in preprocessed_labels[n] if item!='off'}
        if len(color) == 1:
            filenames.append(label['path'])
            labels.append(numeric_label[list(color)[0]])
    filenames = np.array(filenames)
    labels = np.array(labels)
    return filenames, labels

In [None]:
def extract_bounding_boxes_bosch(target_label='Yellow'):
     """Arranging examples from simulator's images based on provided bounding boxes"""
    sequence = get_all_bosch_labels('./bosch/train.yaml')
    CURRENT_PATH = detector.CURRENT_PATH
    source = 'bosch'
    for k, item in enumerate(sequence):
        image = list()
        for box in item['boxes']:
            if box['label'] == target_label:
                x1, y1, x2, y2 = (box['x_max'], box['y_max'], box['x_min'], box['y_min'])
                width  = int((x1 - x2) * 0.2)
                height = int((y1 - y2) * 0.2)
                if len(image)==0:
                    image = read_image_from_disk(item['path'])
                cropped_image = image[int(y2)-height:int(y1)+height, int(x2)-width:int(x1)+width, :]
                if cropped_image.shape[0] * cropped_image.shape[1] >= 10 * 10:
                    resized = misc.imresize(cropped_image, (32, 64))
                    saving_path = os.path.join(CURRENT_PATH, 'small_lights', target_label.lower()+'_'+source+'_'+str(int(random()*10**6))+".jpg")
                    cv2.imwrite(saving_path, cv2.cvtColor(resized, cv2.COLOR_BGR2RGB))   

In [None]:
def crop_traffic_lights(images, labels, source=''):
    """detecting traffic lights in images and cropping them. Providing an equivalent number of false exampels"""
    results = detector.detect(images, annotate_on_image=False)
    for result, label in zip(results, labels):
        image_np, boxes, scores, classes, num_detections = result
        x, y, _ = image_np.shape

        cropped_items = list()
        cropped_areas = list()
        for k, item in enumerate(classes[0]):
            if item== 10:
                box, score = boxes[0][k], scores[0][k]
                if score > 0.70:
                    cropped_items.append(image_np[int(box[0]*x):int(box[2]*x), int(box[1]*y):int(box[3]*y), :])
                    cropped_areas.append(box)

        empty_examples = []
        if cropped_items:
            for item, box in zip(cropped_items, cropped_areas):
                px, py, _ = item.shape
                while 1==1:
                    random_point = (int(x*random()), int(y*random()))
                    valid = ((random_point[0] < min(box[0], box[2])) | (random_point[0] > max(box[0], box[2]))) | ((random_point[1] < min(box[1], box[3])) | (random_point[1] > max(box[1], box[3])))
                    if valid:
                        break
                empty_examples.append(image_np[random_point[0]:(random_point[0]+px), random_point[1]:(random_point[1]+py), :])
        else:
            for j in range(3):
                px, py = (32*3, 64*3)
                random_point = (int(x*random()), int(y*random()))
                empty_examples.append(image_np[random_point[0]:(random_point[0]+px), random_point[1]:(random_point[1]+py), :])

        CURRENT_PATH = detector.CURRENT_PATH
        for item in cropped_items:
            resized = misc.imresize(item, (32, 64))
            saving_path = os.path.join(CURRENT_PATH, 'small_lights', label+'_'+source+'_'+str(int(random()*10**6))+".jpg")
            cv2.imwrite(saving_path, cv2.cvtColor(resized, cv2.COLOR_BGR2RGB))
        for item in empty_examples:
            resized = misc.imresize(item, (32, 64))
            saving_path = os.path.join(CURRENT_PATH, 'small_lights', 'none_'+source+'_'+str(int(random()*10**6))+".jpg")
            cv2.imwrite(saving_path, cv2.cvtColor(resized, cv2.COLOR_BGR2RGB))

In [None]:
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

In [None]:
# initializing detector
detector = DetectionObj('faster_rcnn_inception_resnet_v2_atrous_coco_11_06_2017')

In [None]:
# processing simulator images
sim_filenames, sim_labels = get_simulation_training()

images = [read_image_from_disk(item) for item in sim_filenames]
labels = sim_labels
crop_traffic_lights(images, labels, source='sim')

In [None]:
# processing rosbag images
rosbag_filenames, rosbag_labels = get_rosbag_training()

for rb_files, rb_labels in zip(chunks(rosbag_filenames, 5), chunks(rosbag_labels, 5)):
    images = [read_image_from_disk(item) for item in rb_files]
    labels = rb_labels
    crop_traffic_lights(images, labels, source='rosbag')

In [None]:
# processing Bosch dataset
bosch_filenames, bosch_labels = process_bosch_training_data('./bosch/train.yaml')

for rb_files, rb_labels in zip(chunks(bosch_filenames, 5), chunks(bosch_labels, 5)):
    if 'yellow' in rb_labels:
        images = [read_image_from_disk(item) for item in rb_files]
        labels = rb_labels
        crop_traffic_lights(images, labels, source='bosch')

In [None]:
# extracting data from Bosch dataset based on the provided bounding boxes
extract_bounding_boxes_bosch(target_label='Yellow')
extract_bounding_boxes_bosch(target_label='Red')
extract_bounding_boxes_bosch(target_label='Green')