# Filter out only relevant pictures

Join BelgianTS with DriveU Traffic Lights Dataset

In [1]:
import os
import numpy as np
import cv2 as cv
import json
from tqdm import tqdm

def format(dtld_labels_path, data_path, mode):
    with open(dtld_labels_path) as file:
        images = json.load(file)

    images_path = data_path + "/images/" + mode
    if not os.path.exists(images_path):
        os.makedirs(images_path)

    labels_path = data_path + "/labels/" + mode
    if not os.path.exists(labels_path):
        os.makedirs(labels_path)

    for image in tqdm(images["images"]):
        image_path = "dtld/" + image["image_path"]
        image_name = os.path.splitext(os.path.basename(image_path))[0]
        if not os.path.isfile(image_path):
            continue

        # write image
        img = cv.imread(image_path, cv.IMREAD_UNCHANGED)
        img = cv.cvtColor(img, cv.COLOR_BAYER_GB2BGR)
        img = np.right_shift(img, 4)
        img = img.astype(np.uint8)
        cv.imwrite(images_path + "/" + image_name + ".jpg", img)

        # write labels
        label_file = open(images_path + "/" + image_name + ".txt", "a")
        for label in image["labels"]:
            attributes = label["attributes"]
            if attributes["relevance"] != "relevant":
                continue

            c = -1
            if attributes["state"] == "green":
                c = 21
            if attributes["state"] == "yellow":
                c = -1
            if attributes["state"] == "red":
                c = 20
            if c == -1:
                continue

            img_width = np.shape(img)[1]
            img_height = np.shape(img)[0]
            x = (label["x"] + (label["w"] / 2)) / img_width
            y = (label["y"] + (label["h"] / 2)) / img_height
            w = label["w"] / img_width
            h = label["h"] / img_height
            label_file.write(str(c) + " " + str(x) + " " + str(y)
                             + " " + str(w) + " " + str(h) + "\n")
        label_file.close()



#format(os.getcwd() + "/DTLD/v2.0/Duesseldorf.json", "./Duesseldorf", "Duesseldorf")
#format(os.getcwd() + "/DTLD/v2.0/Bochum.json", "./Bochum", "Bochum")
format(os.getcwd() + "/DTLD/v2.0/Essen.json", "./Essen", "Essen")


100%|██████████| 2914/2914 [05:48<00:00,  8.37it/s]


Map labels to natural language signs

In [4]:
class_names = {
    'A13': 'Uneven road',
    'A14': 'Uneven road',
    'A15': 'Slippy road',
    'A1A': 'Steep turn left',
    'A1B': 'Steep turn right',
    'A1C': 'Multiple steep turns starting turn left',
    'A1D': 'Multiple steep turns starting turn right',
    'A23': 'Children crossing road',
    'A25': 'Bikes crossing road',
    'A29': 'Farm animals on road',
    'A31': 'Construction works',
    'A33': 'Traffic light ahead',
    'A41': 'Road closure',
    'A51': 'Danger',
    'A7A': 'Obstacle! road narrows on both sides',
    'A7B': 'Obstacle! road narrows on left',
    'A7C': 'Obstacle! road narrows on right',
    'B15A': 'Priority at next intersection',
    'B17': 'Dangerous intersection',
    'B1': 'Yield',
    'B19': 'Oncoming traffic has priority',
    'B5': 'Stop',
    'C1': 'No entry',
    'C11': 'No bikes',
    'C21': 'Maximum weight 5.5T',
    'C23': 'No delivery cars',
    'C27': 'Road max width 2.5m',
    'C29': 'Max height 3.5m',
    'C3': 'Passage restriction',
    'C31LEFT': 'No turn left',
    'C31RIGHT': 'No turn right',
    'C35': 'No overtaking',
    'C43': 'Speed limit 50',
    'D10': 'Bikes and pedestrians only',
    'D1a': 'Straight only',
    'D1b': 'Left only',
    'D3b': 'Straight or turn left',
    'D5': 'Roundabout',
    'D7': 'Bike path',
    'D9': 'Bike path and pedestrians separated',
    'E1': 'No parking',
    'E3': 'No stopping',
    'E5': 'No stopping between 1 and 15',
    'E7': 'No stopping between 16 and 31',
    'B21': 'Priority at this bottleneck',
    'E9a': 'Parking',
    'E9a_miva': 'Parking disability',
    'E9b': 'Parking cars only',
    'E9c': 'Parking trucks only',
    'E9d': 'Parking bus only',
    'E9e': 'Parking on sidewalk',
    'F12a': 'Play street',
    'F12b': 'End of play street',
    'F19': 'One way street',
    'F45': 'Dead end',
    'F47': 'End of construction works',
    'F49': 'Crosswalk',
    'F50': 'Bike path crossing',
    'F59': 'Parking on right',
    'F87': 'Speed bump',
    'B11': 'End of priority road',
    'B9': 'Priority road',
}

Reduce signs and merge

In [17]:
import os
import random
from shutil import copyfile
from PIL import Image
classes = []
reducedTS = ['A13','A14','A15','A1A','A1B','A1C','A1D','A23','A25','A29','A31','A33','A41','A51','A7A','A7B','A7C','B15A','B17','B1','B19','B5','C1','C11','C21','C23','C27','C29','C3','C31LEFT','C31RIGHT','C35','C43','D10','D1a','D1b','D3b','D5','D7','D9','E1','E3','E5','E7','B21','E9a','E9a_miva','E9b','E9c','E9d','E9e','F12a','F12b','F19','F45','F47','F49','F50','F59','F87','B11','B9',]
def convert_to_yolo_format(annotation, image_width, image_height):
    # Split the annotation string by semicolon
    parts = annotation.split(';')
    yolo_format=""
    # Extract the required values
    x1, y1, x2, y2 = map(float, parts[1:5])
    class_id = parts[11]
    print(class_id in reducedTS)
    if class_id in reducedTS:

        if class_names[class_id] not in classes:
            classes.append(class_names[class_id])
        class_id = classes.index(class_names[class_id])
        print(len(classes))
        # Calculate the YOLO format values
        x_center = (x1 + x2) / 2 / image_width
        y_center = (y1 + y2) / 2 / image_height
        width = (x2 - x1) / image_width
        height = (y2 - y1) / image_height
        assert(class_id < 64)
        # Create the YOLO format string
        yolo_format = f"{class_id} {x_center} {y_center} {width} {height}"
    
    return yolo_format

def transform_and_filter_lines(input_file, output_dir, target_directory):
    os.makedirs(output_dir, exist_ok=True)
    
    with open(input_file, 'r') as infile:
        for line in infile:
            parts = line[2:].split(';')
            image_path = parts[0][:-4] + '.jpg'
            image_file = os.path.join(target_directory, os.path.basename(image_path))
            #print(image_file)
            if os.path.exists(image_file):
                if True:
                    # Replace '01' with '00' at the start of the line
                    transformed_line = '00' + line[2:]
                    
                    # Get image dimensions
                    with Image.open(image_file) as img:
                        image_width, image_height = img.size
                    
                    # Convert to YOLO format
                    yolo_format = convert_to_yolo_format(transformed_line, image_width, image_height)
                    print(yolo_format)
                    # Write to YOLO format file
                    yolo_filename = os.path.splitext(os.path.basename(image_path))[0] + '.txt'
                    with open(os.path.join(output_dir, yolo_filename), 'w') as outfile:
                        outfile.write(yolo_format + '\n')
            else:
                continue
                #print(f"File not found: {image_file}")

def generate_yaml_file(output_dir, yaml_filename, c):
    yaml_content = f"""
train: {output_dir}/train/images
val: {output_dir}/val/images
test: {output_dir}/test/images

nc: {len(c)}  # number of classes
names: {c}  # class names
"""
    with open(yaml_filename, 'w') as yaml_file:
        yaml_file.write(yaml_content)

def split_dataset(dataset_dirs, output_dir):
    os.makedirs(os.path.join(output_dir, 'train/images'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'train/labels'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'val/images'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'val/labels'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'test/images'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'test/labels'), exist_ok=True)
    
    images = []
    
    for dataset_dir in dataset_dirs:
        images.extend([os.path.join(dataset_dir, f) for f in os.listdir(dataset_dir) if f.endswith('.jpg') or f.endswith('.jp2')])
    
    random.shuffle(images)
    print(images)
    train_split = int(0.7 * len(images))
    val_split = int(0.85 * len(images))
    
    train_images = images[:train_split]
    val_images = images[train_split:val_split]
    test_images = images[val_split:]
    
    for img in train_images:
        try:
            copyfile(img, os.path.join(output_dir, 'train/images', os.path.basename(img)))
            copyfile(img.replace('.jpg', '.txt'), os.path.join(output_dir, 'train/labels', os.path.basename(img).replace('.jpg', '.txt')))
        except FileNotFoundError as e:
            print(f"FileNotFoundError: {e}")
        except PermissionError as e:
            print(f"PermissionError: {e}")
    
    for img in val_images:
        try:
            copyfile(img, os.path.join(output_dir, 'val/images', os.path.basename(img)))
            copyfile(img.replace('.jpg', '.txt'), os.path.join(output_dir, 'val/labels', os.path.basename(img).replace('.jpg', '.txt')))
        except FileNotFoundError as e:
            print(f"FileNotFoundError: {e}")
        except PermissionError as e:
            print(f"PermissionError: {e}")
    
    for img in test_images:
        try:
            copyfile(img, os.path.join(output_dir, 'test/images', os.path.basename(img)))
            copyfile(img.replace('.jpg', '.txt'), os.path.join(output_dir, 'test/labels', os.path.basename(img).replace('.jpg', '.txt')))
        except FileNotFoundError as e:
            print(f"FileNotFoundError: {e}")
        except PermissionError as e:
            print(f"PermissionError: {e}")

# Define the input and output file names and directories for BelgianTS dataset
belgian_input_file = 'annotations/training_GT_test.txt'
belgian_output_dir = 'yolo_belgian_annotations'
belgian_target_directory = 'camera00'

# Transform and filter the lines in the BelgianTS input file and save to the output directory
transform_and_filter_lines(belgian_input_file, belgian_output_dir, belgian_target_directory)

# Define the directories for DTLD dataset locations
dtld_locations = ['Bochum']

# Combine both datasets into a single directory list
combined_dataset_dirs = [belgian_target_directory]

for location in dtld_locations:
    dtld_images_dir = os.path.normpath(f'{location}/images/{location}')
    #dtld_labels_dir = os.path.normpath(f'{location}/labels/{location}')
    
    combined_dataset_dirs.append(dtld_images_dir)
    #combined_dataset_dirs.append(dtld_labels_dir)

classes.append("Red traffic light")
classes.append("Yellow traffic light")
classes.append("Green traffic light")
# Split the combined dataset into train, validation and test sets
split_dataset(combined_dataset_dirs, 'combined_yolo_annotations')

# Generate the YAML file for the combined dataset
generate_yaml_file('combined_yolo_annotations', 'data.yaml', classes)

print(f"YOLO format annotations have been saved to combined_yolo_annotations.")
print(f"YAML file has been generated as data.yaml.")

True
1
0 0.8995515970515969 0.2957807443365696 0.1445331695331695 0.19227346278317156
True
1
0 0.7644871007371008 0.25699433656957926 0.09791769041769033 0.12897249190938515
True
1
0 0.6984613022113022 0.23648462783171523 0.07347051597051604 0.09677184466019419
True
2
1 0.9502119164619164 0.40934870550161817 0.03828624078624074 0.048406148867313904
True
2
1 0.6681633906633907 0.32855987055016184 0.026461916461916417 0.03718446601941746
True
2
1 0.7798863636363638 0.3635922330097087 0.0301167076167076 0.042135922330097074
False

False

False

True
3
2 0.6547051597051597 0.2108616504854369 0.01878378378378388 0.02406957928802589
True
3
2 0.9613298525798526 0.25439724919093853 0.049993857493857555 0.0645064724919094
True
3
2 0.730193488943489 0.219457928802589 0.026775184275184227 0.03202265372168286
True
4
3 0.9402671990171991 0.25797734627831714 0.05490786240786247 0.06703883495145632
True
4
3 0.8632340294840295 0.24309870550161813 0.03886363636363635 0.047880258899676384
True
4
3 0.833