## Dataset folder structure 

- rear_signal_dataset
   - Footage_name
      - Footage_name_XXX
         - Footage_name_XXX_DDD (sequence of class XXX starting from frame number DDD)
            - light_mask
               - frameDDDDDDDD.png (frames with a 8 digit number indicating the frame number)
               - ...
         - Footage_name_XXX_DDD
            - light_mask
               - frameDDDDDDDD.png
               - ...
      - Footage_name_XXX
         - Footage_name_XXX_DDD
            - ...
         - Footage_name_XXX_DDD
            -...
         - Footage_name_XXX_DDD
            - ...
   - Footage_name
      - ...

*XXX indicates the label of the signal

The dataset first divides the data by Footage_name (ex. "20160809_route8-08-09-2016_09-50-36_idx99"). Within each footage directory, there is a sub-directory for each class of label (brake lights on, lights off, etc.). Then, there is a subdirectory for each brief burst of shots. Each burst of shots consists of a single vehicle. These images are nearly identical to each other. 

In [8]:
import os
import shutil
import numpy as np
import imghdr # for checking images are valid

def get_immediate_directories(directory_path):
    """
    Example:
    ```
    >>> print(get_immediate_directories('./data'))
    [...]
    ```
    """
    dirs = os.listdir(directory_path)
    
    # remove unwanted files
    excluded = [".DS_Store"] 
    dirs = [d for d in dirs if d not in excluded]
    
    return dirs

def get_label_from_name(name):
    """
    Gets a label for the images from the name of the directory
    Example name: "20160805_g1k17-08-05-2016_16-25-43_idx99_BLO"
    
    ```
    >>> print(get_label_from_name("20160805_g1k17-08-05-2016_16-25-43_idx99_BLO"))
    "BLO"
    ```
    """
    removed_extension = name.split(".")[0]
    return removed_extension[-3:]

def get_split(test=0.15, val=0.15):
    """
    Returns whether to put something in 'train', 'test', or 'val'
    using a random number generator. Specify test and validation size. 
    Train size will be everything else.
    - test: percentage of test [0-1.0]
    - val: percentage of validation [0, 1.0]
    """
    train_upper_bound = 1.0 - test - val
    test_upper_bound = train_upper_bound + test
    
    x = np.random.rand()
    
    if x < train_upper_bound:
        return 'train'
    elif x < test_upper_bound:
        return 'test'
    return 'val'

def write_difficulty_levels(output_path, easy, medium, hard):
    with open(os.path.join(output_path, 'easy.txt'), 'w') as f:
        for el in easy:
            f.write("%s\n" % el)
        
    with open(os.path.join(output_path, 'moderate.txt'), 'w') as f:
        for el in medium:
            f.write("%s\n" % el)

    with open(os.path.join(output_path, 'hard.txt'), 'w') as f:
        for el in hard:
            f.write("%s\n" % el)
            
def image_is_valid(path_to_image, extension='png'):
    """
    Returns True if the image is valid. False otherwise
    """
    return imghdr.what(path_to_image) == extension

## Read in the list of easy, medium, hard

In [9]:
# Read the Easy, Medium, and Hard sequence lists
with open('./Easy.txt') as f:
    easy = f.read().splitlines()

with open('./Moderate.txt') as f:
    moderate = f.read().splitlines()

with open('./Hard.txt') as f:
    hard = f.read().splitlines()
    
def get_difficulty_level(sequence_dir_name):
    if sequence_dir_name in easy:
        return "E"
    elif sequence_dir_name in moderate:
        return "M"
    elif sequence_dir_name in hard:
        return "H"
    else:
        print('Unrecognized sequence dir name: {}'.format(sequence_dir_name))
        return None

## Get the paths to the images
The path to each image is collected per class.

In [10]:
from collections import defaultdict
import random

# Seed the random image selector so the output is reproducable
random.seed(0)

def get_paths_for_images(one_per_sequence=True):
    """
    Gets the path for the images.
    - one_per_sequence: if this is true, only one image will be used from each sequence for a particular class.
    
    If one_per_sequence, it will return a dictionary of form:
        {label: [(image_path, difficulty), (image_path, difficulty), (image_path, difficulty), ...]}
        
    If not one_per_sequence, it will return a dictionary of form:
        {label: [(image_path, difficulty, split), (image_path, difficulty, split), (image_path, difficulty, split)]}
    """
    
    # Store the path to each of the image paths per class
    # label => [(path, difficulty)] where difficulty is "E", "M", or "H"
    per_class_image_paths = defaultdict(list)

    # Footage directories
    footage_dirs = get_immediate_directories('./data')

    for f_dir in footage_dirs:
        # These are folders corresponding to each class
        path_1 = os.path.join('./data', f_dir)
        f_class_dirs = get_immediate_directories(path_1)

        # Loop through all sequence dirs of form Footage_name_XXX_DDD
        for f_class_dir in f_class_dirs:
            # The label of the class ex. "BLO"
            class_label = get_label_from_name(f_class_dir)
            path_2 = os.path.join(path_1, f_class_dir)
            footage_sequence_dirs = get_immediate_directories(path_2)

            for footage_sequence_dir in footage_sequence_dirs:
                # Only used in keeping the whole sequence 
                split = get_split()
                
                path_3 = os.path.join(path_2, footage_sequence_dir, "light_mask")

                difficulty_level = get_difficulty_level(footage_sequence_dir)

                image_names = get_immediate_directories(path_3)
                
                # Remove any corrupted files (there are some)
                valid_image_paths = []
                for image_path in image_names:
                    full_image_path = os.path.join(path_3, image_path)
                    if image_is_valid(full_image_path):
                        valid_image_paths.append(full_image_path)
                    else:
                        print("Invalid image: {}".format(full_image_path))
                
                if one_per_sequence:
                    # Choose one image randomly
                    # All the images are pretty similar, so just need one
                    full_image_path = random.choice(valid_image_paths)
                    
                    per_class_image_paths[class_label].append((full_image_path, difficulty_level))
                else:
                    for full_image_path in valid_image_paths:
                        per_class_image_paths[class_label].append((full_image_path, difficulty_level, split))
    return per_class_image_paths

## Extract single image from each sequence
The following will extract a single image from each sequence. It will update the names of the images and the new easy, medium, and hard .txt files.

In [11]:
from shutil import copyfile, rmtree

per_class_image_paths = get_paths_for_images()

if os.path.exists("./output"):
    # Remove the existing folder to get rid of old contents
    rmtree("./output")
os.mkdir("./output")

# Create directories for each label
for label in per_class_image_paths:
    os.mkdir('./output/{}'.format(label))

# Move files in per_class_image_paths to new directories
new_easy = []
new_medium = []
new_hard = []

frame_number = 0

# Copy the images into the new paths
for label in per_class_image_paths:
    output_dir_path = './output/{}'.format(label)
    
    for image_path, difficulty in per_class_image_paths[label]:
        # Create a new name for the image
        output_image_name = 'frame_' + f'{frame_number:05}.png'
        frame_number += 1
        
        output_path = os.path.join(output_dir_path, output_image_name)
        
        # Copy the file over
        copyfile(image_path, output_path)
        
        # Add the new name to the difficulty level path
        if difficulty == "E":
            new_easy.append(output_image_name)
        elif difficulty == "M":
            new_medium.append(output_image_name)
        elif difficulty == "H":
            new_hard.append(output_image_name)

Unrecognized sequence dir name: 20161013_demo_test-10-13-2016_15-51-02_OLO_00001274
Unrecognized sequence dir name: 20161013_demo_test-10-13-2016_15-51-02_BOO_00001274
Unrecognized sequence dir name: 20161007_demo_surface-10-07-2016_16-14-04_2_BOR_00005877
Unrecognized sequence dir name: 20161007_demo_surface-10-07-2016_16-14-04_2_BOO_00005877
Unrecognized sequence dir name: 20160915_route_demo2-09-15-2016_18-49-23_OOR_00000215
Invalid image: ./data/test-08-08-2016_20-11-44_idx99/test-08-08-2016_20-11-44_idx99_BOO/test-08-08-2016_20-11-44_idx99_BOO_00003202/light_mask/Thumbs.db
Unrecognized sequence dir name: route-02-23-2016_17-17-51_BOO_9125
Invalid image: ./data/route10-07-11-2016_18-52-33_idx99/route10-07-11-2016_18-52-33_idx99_OLO/route10-07-11-2016_18-52-33_idx99_OLO_00038017/light_mask/Thumbs.db
Unrecognized sequence dir name: 20160920_route_demo-09-20-2016_18-47-39_BLO00001405
Invalid image: ./data/route10-07-12-2016_16-34-38_idx99/route10-07-12-2016_16-34-38_idx99_BOO/route10-

In [12]:
write_difficulty_levels('./output', new_easy, new_medium, new_hard)

## Use the entire sequence
The following can be used to make use of the entire image sequence. It ensures that each sequence will be used for either training, validation, or testing. This means the same car can only appear in a single split.

In [13]:
from shutil import copyfile, rmtree

per_class_image_paths = get_paths_for_images(one_per_sequence=False)

splits = ['train', 'val', 'test']

if os.path.exists("./sequence_output"):
    # Remove the existing folder to get rid of old contents
    rmtree("./sequence_output")
os.mkdir("./sequence_output")

for s in splits:
    # Make sub-directory
    split_directory = "./sequence_output/{}".format(s)
    os.mkdir(split_directory)
    
    # Create directories for each label
    for label in per_class_image_paths:
        os.mkdir(os.path.join(split_directory, label))

# Move files in per_class_image_paths to new directories
new_easy = []
new_medium = []
new_hard = []

frame_number = 0

# Copy the images into the new paths
for label in per_class_image_paths:
    for image_path, difficulty, split in per_class_image_paths[label]:
        output_dir_path = './sequence_output/{}/{}'.format(split, label)
        
        # Create a new name for the image
        output_image_name = 'frame_' + f'{frame_number:05}.png'
        frame_number += 1
        
        output_path = os.path.join(output_dir_path, output_image_name)
        
        # Copy the file over
        copyfile(image_path, output_path)
        
        # Add the new name to the difficulty level path
        if difficulty == "E":
            new_easy.append(output_image_name)
        elif difficulty == "M":
            new_medium.append(output_image_name)
        elif difficulty == "H":
            new_hard.append(output_image_name)

Unrecognized sequence dir name: 20161013_demo_test-10-13-2016_15-51-02_OLO_00001274
Unrecognized sequence dir name: 20161013_demo_test-10-13-2016_15-51-02_BOO_00001274
Unrecognized sequence dir name: 20161007_demo_surface-10-07-2016_16-14-04_2_BOR_00005877
Unrecognized sequence dir name: 20161007_demo_surface-10-07-2016_16-14-04_2_BOO_00005877
Unrecognized sequence dir name: 20160915_route_demo2-09-15-2016_18-49-23_OOR_00000215
Invalid image: ./data/test-08-08-2016_20-11-44_idx99/test-08-08-2016_20-11-44_idx99_BOO/test-08-08-2016_20-11-44_idx99_BOO_00003202/light_mask/Thumbs.db
Unrecognized sequence dir name: route-02-23-2016_17-17-51_BOO_9125
Invalid image: ./data/route10-07-11-2016_18-52-33_idx99/route10-07-11-2016_18-52-33_idx99_OLO/route10-07-11-2016_18-52-33_idx99_OLO_00038017/light_mask/Thumbs.db
Unrecognized sequence dir name: 20160920_route_demo-09-20-2016_18-47-39_BLO00001405
Invalid image: ./data/route10-07-12-2016_16-34-38_idx99/route10-07-12-2016_16-34-38_idx99_BOO/route10-

In [14]:
write_difficulty_levels('./sequence_output', new_easy, new_medium, new_hard)