In [1]:
import os
import random
import sys
from pathlib import Path

import cv2
import numpy as np
import tensorflow as tf
import ujson
from tqdm import tqdm
from collections import deque
import copy

In [2]:
def __init__(self, root, config):
    self.seq_len = np.array(config.seq_len)
    assert config.img_seq_len == 1
    self.pred_len = np.array(config.pred_len)

    self.img_resolution = np.array(config.img_resolution)
    self.img_width = np.array(config.img_width)

    self.scale = np.array(config.scale)

    self.augment = np.array(config.augment)
    self.aug_max_rotation = np.array(config.aug_max_rotation)
    self.inv_augment_prob = np.array(config.inv_augment_prob)

    self.image_shape = (160, 704, 3)
    self.measurements_shape = (4,)

    self.images = []
    self.labels = []
    self.measurements = []
    self.num_samples = []

In [None]:
def file_generator(self, root):
    for sub_root in tqdm(root, file=sys.stdout):
        sub_root = Path(sub_root)

        # list sub-directories in root
        root_files = os.listdir(sub_root)
        routes = [
            folder
            for folder in root_files
            if not os.path.isfile(os.path.join(sub_root, folder))
        ]
        for route in routes:
            images = []
            measurements = []
            num_samples = []

            route_dir = sub_root / route
            num_seq = len(os.listdir(route_dir / "rgb"))

            num_samples.append(num_seq - 4)
            # ignore the first two and last two frame
            for seq in range(2, num_seq - 2):
                images.append(route_dir / "rgb" / ("%04d.png" % seq))
                measurements.append(
                    route_dir / "measurements" / ("%04d.json" % seq)
                )
            yield images, measurements, num_samples

In [None]:

def element_spec(self):
    # Return the structure of elements in the dataset
    return {
        "rgb": tf.TensorSpec(shape=self.image_shape, dtype=tf.float32),
        "measurements": tf.TensorSpec(shape=(4,), dtype=tf.float32)
    }

def create_dataset(self):
    return tf.data.Dataset.from_generator(
        self.data_generator, output_signature=self.element_spec
    )


def crop_image(image, crop=(128, 640), crop_shift=0):
    """
    Scale and crop a PIL image, returning a channels-first numpy array.
    """
    width = image.width
    height = image.height
    crop_h, crop_w = crop
    start_y = height // 2 - crop_h // 2
    start_x = width // 2 - crop_w // 2

    # only shift for x direction
    start_x += int(crop_shift)

    image = np.asarray(image)
    cropped_image = image[start_y : start_y + crop_h, start_x : start_x + crop_w]
    cropped_image = np.transpose(cropped_image, (2, 0, 1))
    return cropped_image


def crop_image_cv2(image, crop=(128, 640), crop_shift=0):

    width = image.shape[1]
    height = image.shape[0]
    crop_h, crop_w = crop
    start_y = height // 2 - crop_h // 2
    start_x = width // 2 - crop_w // 2

    # only shift for x direction
    start_x += int(crop_shift)

    cropped_image = image[start_y : start_y + crop_h, start_x : start_x + crop_w]
    return cropped_image


def get_waypoints(labels, len_labels):
    assert len(labels) == len_labels
    num = len_labels
    waypoints = {}

    for result in labels[0]:
        car_id = result["id"]
        waypoints[car_id] = [[result["ego_matrix"], True]]
        for i in range(1, num):
            for to_match in labels[i]:
                if to_match["id"] == car_id:
                    waypoints[car_id].append([to_match["ego_matrix"], True])

    Identity = list(list(row) for row in np.eye(4))
    # padding here
    for k in waypoints.keys():
        while len(waypoints[k]) < num:
            waypoints[k].append([Identity, False])
    return waypoints

In [None]:
def data_generator(self):
    images = self.images
    labels = self.labels
    measurements = self.measurements

    len_parameters = len(self.images)

    # load measurements
    loaded_labels = []

    # Since we also load labels for future timesteps, we load and store them separately
    for i in range(len_parameters):
        with open(str(measurements[i]), "r", encoding="utf-8") as f1:
            measurement = ujson.load(f1)

        image = cv2.imread(str(images[i]), cv2.IMREAD_COLOR)
        if image is None:
            print("Error loading file: ", str(images[i], encoding="utf-8"))

        """ 
            IMAGE
        """
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image / 255.0
        image = np.array(image, dtype=np.float32)

        # augmentation
        crop_shift = 0
        degree = 0
        rad = np.deg2rad(degree)
        do_augment = (
            self.augment and random.random() <= self.inv_augment_prob
        )  # if random number is less than or equal to inv_augment_prob (0.15)
        if do_augment:
            degree = (random.random() * 2.0 - 1.0) * self.aug_max_rotation
            crop_shift = degree / 60 * self.img_width / self.scale  # we scale first

        image = crop_image_cv2(image, crop=self.img_resolution, crop_shift=crop_shift)

        steer = measurement["steer"]
        throttle = measurement["throttle"]
        brake = measurement["brake"]
        light = measurement["light_hazard"]
        speed = measurement["speed"]
        theta = measurement["theta"]
        x_command = measurement["x_command"]
        y_command = measurement["y_command"]

        self.image_shape = image.shape
        # self.measurements_shape = np.array([steer, throttle, brake, light, speed, theta, x_command, y_command]).shape
        self.measurements_shape = np.array([steer, throttle, brake, light]).shape

        yield {
            "rgb": image,
            "measurements": np.array([steer, throttle, brake, light], dtype=np.float32),
        }