# Traffic light classification 
This Notebook consists of 3 Parts

* Set up environment 
  * load modules 
  * load paths from .env
  * convert Yolo V8 data format to this from the lecture
* Dataset generation
  * Detecting traffic lights with yolo
  * Results postprocessing
* CNN 
  * define CNN
  * Training 


The dataset generation works in this notebook version with a "original" dataset which already obtain pictures which are labeled 
with the parent folder name.
The classes of the original dataset should be "red", "green", "yellow".

The resulting dataset folder consists of 3 folder:
* original_data
  consists of images with the labels from above 
* custom_data
  The dataset where the CNN is trained on.
* drop_outs
  consists of images which are to similar to other images in the same classes to make sure that data is not to much correlated


> If the custom dataset with the classes "green", "yellow", "red" and "none" already exists, then it is possible to define and train the CNN after the 
> [CNN](#cnn) Markdown cell.


# Set up environment
* Load modules
* Load paths from .env

In [3]:
import custom_utils as cu
from ultralytics import YOLO
import numpy as np
import matplotlib.pyplot as plt

import cv2

from skimage.metrics import structural_similarity as ssim

from sklearn.model_selection import train_test_split


In [11]:
# prepare environment
import os
from dotenv import load_dotenv
# Load .env files
load_dotenv()

# Get environment variables
dataset_dir = os.getenv('DATASET_DIR')
ori_data_dir = os.getenv('TRAFFIC_LIGHT_ORIGINAL_DATA')
custom_data_dir = os.getenv('TRAFFIC_LIGHT_CUSTOM_DATA')
drop_data_dir = os.getenv('TRAFFIC_LIGHT_DROP_DATA')

# check if folders exists or create them
if not os.path.exists(ori_data_dir):
    print("Error: No original data set")

if not os.path.exists(custom_data_dir):
    os.mkdir(custom_data_dir)

if not os.path.exists(drop_data_dir):
    os.mkdir(drop_data_dir)



### convert yolo data format to lecture format
1. download dataset from [Roboflow](https://universe.roboflow.com/wawan-pradana/cinta_v2/dataset/1)
2. extract it and copy its content to aai-selfdriving-cars/dataset/traffic_light/original_data
3. execute the cell below

In [35]:
# convert the yolo v8 data set and extract its labels 
labels = cu.convert_dataset(ori_data_dir, ori_data_dir)

In [6]:
# create label folders 
for l in labels:
    custom_label_dir = os.path.join(custom_data_dir, l)
    drop_label_dir = os.path.join(drop_data_dir, l)

    if not os.path.exists(custom_label_dir):
        os.mkdir(custom_label_dir)
    if not os.path.exists(drop_label_dir):
        os.mkdir(drop_label_dir)

none_path = os.path.join(custom_data_dir, "none")
if not os.path.exists(none_path):
    os.mkdir(none_path)

## Dataset generation
### load images from original dataset

In [7]:
img_dic = {}
for l in labels: 
    paths, images = cu.get_images(os.path.join(ori_data_dir, l), n=100) # this loads  images from each class label in an array
    # label array pair get stored in dictionary 
    img_dic.update({l: images })

In [6]:
# plot images for debug not necessary 
for c in img_dic:
    i = 0
    while i < len(img_dic[c]):
        cv2.imshow("Hi", img_dic[c][i])
        key = cv2.waitKey(0)

        if key == ord("0"):
            break
        elif key == ord("1"):
            i += 1

        cv2.destroyAllWindows()
    cv2.destroyAllWindows()

### Detecting traffic lights with YOLO

In [7]:
# load pretrained YOLO medium size model
model = YOLO("yolov8m.pt") # test with sample images shows that, size m is a good mid way between accuracy and run time

### analyze images and extract boxes with YOLO 

Images of every class are passed to yolo.
    
Its output get parsed in extract_boxes which identify all traffic light boxes (boxes with number 9) and return just the box as an new image.
    
res_dic = {"label1", [array of extracted boxes of folder from label1], "label2" : [boxes array from label2]}


In [8]:
res_dict = {}
for l in labels:
    # analyze all images from one class at the same time 
    res = model.predict(img_dic[l], conf=0.3)
    # extract all traffic light boxes (9 is the class number for traffic lights in yolov8)
    boxes = cu.extract_boxes(res,9)
    res_dict.update({l : boxes})


0: 640x640 1 traffic light, 1: 640x640 1 traffic light, 2: 640x640 1 traffic light, 3: 640x640 1 traffic light, 4: 640x640 1 traffic light, 5: 640x640 1 traffic light, 6: 640x640 1 traffic light, 7: 640x640 1 traffic light, 8: 640x640 1 traffic light, 9: 640x640 1 traffic light, 10: 640x640 1 traffic light, 1 bench, 11: 640x640 1 traffic light, 12: 640x640 1 traffic light, 13: 640x640 1 traffic light, 14: 640x640 1 traffic light, 15: 640x640 1 traffic light, 16: 640x640 1 traffic light, 17: 640x640 1 traffic light, 18: 640x640 1 traffic light, 19: 640x640 1 traffic light, 20: 640x640 1 traffic light, 21: 640x640 1 traffic light, 22: 640x640 1 traffic light, 23: 640x640 1 traffic light, 24: 640x640 1 traffic light, 25: 640x640 1 traffic light, 26: 640x640 1 traffic light, 27: 640x640 1 traffic light, 28: 640x640 1 traffic light, 29: 640x640 1 traffic light, 30: 640x640 1 traffic light, 31: 640x640 1 traffic light, 32: 640x640 1 traffic light, 33: 640x640 1 traffic light, 34: 640x640 1 

### resize the extracted images
Because the images has different sizes it necessary to resize them.


In [30]:
def average_resize(res_dict):
    hights = []
    widths = []
    for l in res_dict:
        hights += [i.shape[0] for i in res_dict[l]]
        widths += [i.shape[1] for i in res_dict[l]]
    avr_x = int(np.average(widths))
    avr_y = int(np.average(hights))
    for l in res_dict:
        for idx, i in enumerate(res_dict[l]):
            res_dict[l][idx] = cv2.resize(i, (avr_x,avr_y))
    return res_dict, (avr_x, avr_y) 

In [31]:
# resize all images to the same average hight and width 
res_dict, target_size = average_resize(res_dict) 


NameError: name 'res_dict' is not defined

In [22]:
# save resized images in custom_data
for l in labels:
    path = os.path.join(custom_data_dir, l)
    cu.write_images(res_dict[l], path, l)

### manuel label check
YOLO detects also traffic lights which not facing the camera. This Traffic lights cant have a class of green yellow red. 
So it is necessary to go through the dataset and set the label of all images with this properties to "none".
The following function helps.

In [12]:
def show_images(cus_dic):
    # program to iterate over all images and decide if they are in class "none"
    # the other images are already in right class directory 
    for l in cus_dic:
        curr_img = 0
        imgs = cus_dic[l][0]
        imgs_paths = cus_dic[l][1]
        
        none_img = []
        none_paths = []

        while curr_img < len(cus_dic[l][0]):
            basename = os.path.basename(imgs_paths[curr_img])
            cv2.imshow("Label: " + l + " " + basename, imgs[curr_img])
            key = cv2.waitKey(0)

            if key == ord("0"):
                break
            elif key == ord("w"):
                curr_img += 1
            elif key == ord("n"):
                none_path = os.path.join(custom_data_dir, "none", basename)
                os.rename(imgs_paths[curr_img], none_path)
                none_img.append(imgs[curr_img])
                none_paths.append(imgs_paths[curr_img])
                imgs.pop(curr_img)
                imgs_paths.pop(curr_img)

                curr_img += 1
            cv2.destroyAllWindows()
        cv2.destroyAllWindows()

    cus_dic.update({"none" : [imgs, imgs_paths]})
    return cus_dic

In [40]:
# just if you want to try show_images without running all above once again
res_dict = {}
labels = os.listdir(custom_data_dir)
for l in labels:
    custom_cls_path = os.path.join(custom_data_dir, l)
    paths , custom_images = cu.get_images(custom_cls_path)
    res_dict.update({l : [custom_images, paths]})

In [39]:
show_images(res_dict)

{'yellow': [[array([[[ 84, 165, 110],
           [100, 181, 126],
           [122, 203, 148],
           ...,
           [110, 127, 114],
           [114, 128, 117],
           [115, 129, 118]],
   
          [[ 92, 175, 120],
           [101, 185, 127],
           [114, 197, 142],
           ...,
           [ 59,  82,  67],
           [ 76,  97,  82],
           [ 82, 103,  88]],
   
          [[108, 198, 139],
           [109, 199, 139],
           [103, 193, 134],
           ...,
           [ 39,  80,  53],
           [ 37,  71,  47],
           [ 40,  74,  50]],
   
          ...,
   
          [[ 99, 101,  95],
           [100, 102,  96],
           [102, 104,  98],
           ...,
           [ 94, 118, 100],
           [ 94, 118, 100],
           [ 94, 119,  99]],
   
          [[ 85,  82,  78],
           [ 89,  86,  82],
           [ 96,  93,  89],
           ...,
           [ 92, 118, 100],
           [ 92, 118, 100],
           [ 92, 118, 100]],
   
          [[ 34,  29,  26]

In [42]:
def drop_similar(custom_dict=None, threshold=0.6):
    # find similar pictures 
    for l in custom_dict:
        #similar_images = []

        images = custom_dict[l][0]
        paths = custom_dict[l][1]

        # compare each picture
        i = 0
        while i < len(images):
            for j in range(i + 1, len(images)):
                if j == len(images):
                    break
                # convert images into grayscale (ssim works only with that)
                gray_image1 = cv2.cvtColor(images[i], cv2.COLOR_BGR2GRAY)
                gray_image2 = cv2.cvtColor(images[j], cv2.COLOR_BGR2GRAY)

                # calculate ssim value
                similarity_score, _ = ssim(gray_image1, gray_image2, full=True)

                # check if ssim is bigger than threshold to find most similar pictures
                if similarity_score > threshold:

                    # drop images[i] to other folder:
                    pa = os.path.join(drop_data_dir, l,
                                      os.path.basename(paths[i]))
                    os.rename(paths[i], pa)
                    paths.pop(i)
                    images.pop(i)
            i += 1

        custom_dict[l][0] = images
        custom_dict[l][1] = paths

    return custom_dict 

In [43]:

cus_dic = {}
for l in labels:
    custom_cls_path = os.path.join(custom_data_dir, l)
    paths , custom_images = cu.get_images(custom_cls_path)
    cus_dic.update({l : [custom_images, paths]})
    cus_dic = drop_similar(cus_dic, threshold=0.8)

## CNN
### preprocess the data
The following code is copied from the lecture.


In [14]:
# Specify the path to the dataset directory
dataset_dir = custom_data_dir

# Initialize lists to store the images and labels
images = []
labels = []

# Iterate over the subdirectories in the dataset directory
for subdir in os.listdir(dataset_dir):
    subdir_path = os.path.join(dataset_dir, subdir)
    if os.path.isdir(subdir_path):
        # Extract the class label from the subdirectory name
        label = subdir
        # Iterate over the image files in the subdirectory
        for file_name in os.listdir(subdir_path):
            # Read the image file
            image_path = os.path.join(subdir_path, file_name)
            image = cv2.imread(image_path)
            # Preprocess the image (resize, normalize, etc.) - already done
            # Add the preprocessed image and label to the lists
            images.append(image)
            labels.append(label)

# Convert the lists to NumPy arrays
images = np.array(images)
labels = np.array(labels)
images_sh = images.shape
print(images_sh)
print(labels.shape)

(396, 147, 74, 3)
(396,)


In [15]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder


# Split the dataset into training and testing sets
train_images, test_images, train_labels0, test_labels0 = train_test_split(images, labels, test_size=0.2, random_state=42)

# Convert the labels to one-hot encoded vectors
classes = np.unique(labels)
num_classes = len(classes)
print(classes)
print(num_classes)

label_encoder = LabelEncoder()
train_labels1 = label_encoder.fit_transform(train_labels0)
test_labels1  = label_encoder.fit_transform(test_labels0)


from tensorflow.keras.utils import to_categorical

train_labels = to_categorical(train_labels1, num_classes)
test_labels = to_categorical(test_labels1, num_classes)

['green' 'none' 'red' 'yellow']
4


### define the CNN 
We oriented us at the VGG16 model and the Car Classification model from the lecture.
We increased the number of convolutional layers and experiments a bit with the layers. The following model seams to be really good but they might exists much better architectures for this purpose.

In [16]:
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

In [24]:
model = Sequential()

image_height = images_sh[1] 
image_width = images_sh[2]

# Add convolutional and pooling layers
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Add more convolutional and pooling layers if desired
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Add convolutional and pooling layers
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(image_height, image_width, 3)))

# Flatten the output from the previous layer
model.add(Flatten())

# Add fully connected layers
model.add(Dense(128, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])




print(train_images.shape)
print(train_labels.shape)
print(test_images.shape)
print(test_labels.shape)

(316, 147, 74, 3)
(316, 4)
(80, 147, 74, 3)
(80, 4)


### train the CNN

In [25]:
num_epochs = 3
batch_size = 10

model.fit(train_images, train_labels, epochs=num_epochs, batch_size=batch_size, validation_data=(test_images, test_labels))

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.src.callbacks.History at 0x7ff9a32cfaf0>

In [26]:
loss, accuracy = model.evaluate(test_images, test_labels)
print(f'Test Loss: {loss:.4f}')
print(f'Test Accuracy: {accuracy:.4f}')

Test Loss: 0.5079
Test Accuracy: 0.8000


### save the trained model
The model needs to be saved to be reused in the main file.

In [44]:
# save model for reuse in main
model.save('./traffic_light_model/traffic_light_model_cpu.keras')