# Train different Neural Networks on a Traffic Sign Detection Dataset

In [None]:
import matplotlib.pyplot as plt
from copy import deepcopy
import os
from enum import Enum
from abc import ABC, abstractmethod
from typing import Optional
import random
import numpy as np
from PIL import Image
import yaml

In [None]:
class DataType(Enum):
    TRAIN = 1
    TEST = 2
    VALID = 3

class Label:
    def __init__(self, raw_data: str) -> None:
        split_data = raw_data.split(' ')
        self.category = int(split_data[0])
        self.center_x, self.center_y, self.width, self.height = map(float, split_data[1:])

class Entry:
    def __init__(self, image: np.array, labels: list[Label], image_name: str) -> None:
        self.image = image
        self.labels = labels
        self.image_name = image_name


dataset_path = 'data/traffic-signs-detection'
info_file = os.path.join(dataset_path, 'car/data.yaml')
categories = yaml.load(open(info_file), Loader=yaml.FullLoader)['names']

img_size = 64
min_bounding_box_size = 0.3
only_single_label = True
forbidden_file_prefixes = ['FisheyeCamera', 'road']
grayscale = False

def load_image_data(type: DataType):
    data_path = os.path.join(dataset_path, 'car', type.name.lower())
    images_path = os.path.join(data_path, 'images')
    labels_path = os.path.join(data_path, 'labels')

    entries = []
    files_in_folder = os.listdir(images_path)
    print(f'Scanning {len(files_in_folder)} entries from {images_path} and {labels_path}...')

    for image_name in files_in_folder:
        image = plt.imread(os.path.join(images_path, image_name))
        image = np.array(Image.fromarray(image).resize((img_size, img_size)))

        if grayscale:
            image = np.mean(image, axis=2)
        
        image = image / 255.0
        labels_raw = open(os.path.join(labels_path, image_name.replace('.jpg', '.txt'))).read().split('\n')
        labels = [Label(label) for label in labels_raw if label]

        if (
            only_single_label and len(labels) > 1
            or len(labels) == 0
            or any([image_name.startswith(prefix) for prefix in forbidden_file_prefixes])
            or any([label.width < min_bounding_box_size or label.height < min_bounding_box_size for label in labels])
        ):
            continue

        entries.append(Entry(image, labels, image_name))

    return entries

train_data = load_image_data(DataType.TRAIN)
validate_data = load_image_data(DataType.VALID)
test_data = load_image_data(DataType.TEST)

print(f'Loaded {len(train_data)} training images, {len(validate_data)} validation images and {len(test_data)} test images')

In [None]:
num_images = 6
categories_to_display = [0, 3, 13, 14]
fig, axs = plt.subplots(len(categories_to_display), num_images, figsize=(num_images * 2, len(categories_to_display) * 2))
print(f'Displaying {num_images} random images from each of the following categories: {", ".join([categories[category] for category in categories_to_display])}')

for idx, category in enumerate(categories_to_display):
    category_entries = [entry for entry in validate_data if entry.labels[0].category == category]
    
    if len(category_entries) >= num_images:
        selected_entries = random.sample(category_entries, num_images)
    else:
        selected_entries = category_entries
    
    for i, entry in enumerate(selected_entries):
        axs[idx, i].imshow(entry.image, cmap='gray')
        axs[idx, i].axis('off')
        if i == 0:
            axs[idx, i].set_title(categories[category])
            
        for label in entry.labels:
            x = label.center_x * img_size
            y = label.center_y * img_size
            w = label.width * img_size
            h = label.height * img_size
            rect = plt.Rectangle((x - w / 2, y - h / 2), w, h, fill=False, color='r')
            axs[idx, i].add_patch(rect)

for ax in axs.flat:
    ax.axis('off')

plt.show()


In [None]:
from typing import List, Tuple, TypeVar

Entry = TypeVar('Entry')

def get_data(data: list[Entry]):
    np.random.shuffle(data)
    images = np.array([entry.image for entry in data])
    labels = np.array([entry.labels[0].category for entry in data])
    return images, labels

# def print_category_distribution(labels):
#     total = len(labels)
#     distribution = {category: np.sum(labels == category) / total * 100 for category in np.unique(labels)}
#     print(", ".join([f'{categories[cat]}: {dist:.2f}%' for cat, dist in distribution.items()]))

# def get_balanced_data(source_data: List[Entry], target_distribution: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
#     """
#     Resamples data to match a target distribution of categories.
    
#     Args:
#         source_data: List of entries to be resampled
#         target_distribution: Array of target percentages for each category
        
#     Returns:
#         Tuple of (images, labels) with the desired distribution
#     """
#     # Get initial data
#     np.random.shuffle(source_data)
#     images = np.array([entry.image for entry in source_data])
#     labels = np.array([entry.labels[0].category for entry in source_data])
    
#     # Calculate current and target distributions
#     n_categories = len(target_distribution)
#     current_counts = np.array([np.sum(labels == i) for i in range(n_categories)])
#     current_dist = current_counts / len(labels)
    
#     # Calculate number of samples needed for each category
#     target_size = len(labels)
#     target_counts = (target_distribution * target_size).astype(int)
    
#     # Initialize arrays for balanced dataset
#     balanced_images = []
#     balanced_labels = []
    
#     # Resample each category
#     for category in range(n_categories):
#         category_indices = np.where(labels == category)[0]
        
#         # If we need more samples than available, sample with replacement
#         if target_counts[category] > len(category_indices):
#             sampled_indices = np.random.choice(
#                 category_indices,
#                 size=target_counts[category],
#                 replace=True
#             )
#         # If we need fewer samples, sample without replacement
#         else:
#             sampled_indices = np.random.choice(
#                 category_indices,
#                 size=target_counts[category],
#                 replace=False
#             )
            
#         balanced_images.extend(images[sampled_indices])
#         balanced_labels.extend(labels[sampled_indices])
    
#     # Convert to numpy arrays and shuffle
#     balanced_images = np.array(balanced_images)
#     balanced_labels = np.array(balanced_labels)
    
#     # Shuffle the balanced dataset
#     shuffle_idx = np.random.permutation(len(balanced_images))
#     balanced_images = balanced_images[shuffle_idx]
#     balanced_labels = balanced_labels[shuffle_idx]
    
#     return balanced_images, balanced_labels

# # Calculate target distribution from training data
# train_images, train_labels = get_data(train_data)
# target_distribution = np.array([np.sum(train_labels == i) for i in range(len(categories))]) / len(train_labels)

# # Resample validation and test data to match training distribution
# validate_images, validate_labels = get_balanced_data(validate_data, target_distribution)
# test_images, test_labels = get_balanced_data(test_data, target_distribution)

# # Print distributions to verify
# print(f'Category distribution in the training dataset ({len(train_images)} images):')
# print_category_distribution(train_labels)
# print(f'Category distribution in the validation dataset ({len(validate_images)} images):')
# print_category_distribution(validate_labels)
# print(f'Category distribution in the test dataset ({len(test_images)} images):')
# print_category_distribution(test_labels)

test_images, test_labels = get_data(test_data)
validate_images, validate_labels = get_data(validate_data)
train_images, train_labels = get_data(train_data)

In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
import tensorflow as tf
import keras_tuner as kt

class CustomModel(kt.HyperModel):
    def build(self, hp):
        tf.keras.backend.clear_session()
        model = Sequential()

        # Define input shape
        model.add(Input(shape=(img_size, img_size, 3)))
        model.add(Flatten())
        
        # Search for number of layers between 1 and 3
        for i in range(hp.Int('num_layers', 1, 3)):
            model.add(Dense(units=hp.Int(f'units_{i}', min_value=32, max_value=512, step=32), activation='relu'))
            model.add(Dropout(rate=hp.Float(f'dropout_{i}', min_value=0, max_value=0.5, step=0.05)))

        # Output layer
        model.add(Dense(len(categories), activation='softmax'))

        # Compile the model with an optimizer that has a tunable learning rate
        model.compile(optimizer=Adam(learning_rate=hp.Float('learning_rate', min_value=1e-6, max_value=1e-2, sampling='log')), 
                    loss='sparse_categorical_crossentropy', metrics=['accuracy'])

        return model
    
    # thanks to https://github.com/keras-team/keras-tuner/issues/122#issuecomment-544648268
    def fit(self, hp, model, *args, **kwargs):
        return model.fit(
            *args,
            batch_size=hp.Choice("batch_size", [2 ** n for n in range(1, 11)]),
            verbose=0,
            **kwargs,
        )

# Set up the tuner
tuner = kt.Hyperband(
    CustomModel(),
    objective='val_accuracy',
    max_epochs=200,
    hyperband_iterations=6,
    directory='models',
    project_name='ann_hyperparam_search'
)

class ClearGPUCallback(tf.keras.callbacks.Callback):
    def on_train_end(self):
        tf.keras.backend.clear_session()

# Callbacks
lr_scheduler = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=12, min_lr=1e-6)
early_stop = EarlyStopping(monitor='val_accuracy', patience=20, restore_best_weights=True)
gpu_clear_callback = ClearGPUCallback()

# Search for the best hyperparameters
tuner.search(train_images, train_labels,
             validation_data=(validate_images, validate_labels),
             callbacks=[lr_scheduler, early_stop])

print('Search complete!')
# Get the best model
model = tuner.get_best_models()[0]
print(model.summary())
trials = tuner.oracle.get_best_trials()[:10]

for trial in trials:
    print(trial.summary())

In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, Input, AveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau

model = Sequential()

# validation accuracy would always stagnate at 75%, the model could go to 99% training accuracy with 4096 - 1024 - 256 but would overfit
# that's why we need dropout but more neurons to compensate for the dropout, i don't wanna use l1/l2 or augmentation - just plain ann
# training + validation accuracy would go to 70% with 6144 - 3072 - 1024 - 512 and 0.0001 lr, log curve and 150 epochs
# now testing with 300 epochs and 0.0002 lr
# lr 0.0002 didn't work, wonky and plateau at 50%, trying lr 0.0001 and 300 epochs -> still stopping at 70%
# may try a different optimizer if my hyperparam search over 1e-2 - 1e-6 lr, 1-3 layers (0 - 0.5 dropout & 32-512 neurons each) doesnt work
# split = int(len(train_data) * 0.8)
# np.random.shuffle(train_data)
# train_images, train_labels = get_data(train_data, None, split)
# validate_images, validate_labels = get_data(train_data, split, None)

model.add(Input(shape=(img_size, img_size, 3)))
model.add(AveragePooling2D((3, 3), strides=3))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.35))
model.add(Dense(len(categories), activation='softmax'))
model.compile(optimizer=Adam(),
              loss='sparse_categorical_crossentropy', 
              metrics=['accuracy'])

lr_scheduler = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=12, min_lr=1e-6)

history = model.fit(
            train_images,
            train_labels,
            batch_size=32,
            epochs=300,
            validation_data=(validate_images, validate_labels), 
            callbacks=[lr_scheduler])


In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Input, Dropout, AveragePooling2D, BatchNormalization, LeakyReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l1, l2

# Create model
model = Sequential()
model.add(Input(shape=(img_size, img_size, 3)))

model.add(Conv2D(16, (5, 5), activation='relu'))
model.add(Conv2D(16, (5, 5), activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(64, activation='relu', kernel_regularizer=l2(1e-4)))
model.add(Dropout(0.4))
model.add(Dense(len(categories), activation='softmax'))

model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

history = model.fit(train_images, 
                    train_labels, 
                    epochs=20,
                    batch_size=32,
                    validation_data=(validate_images, validate_labels))

In [None]:
loss, accuracy = model.evaluate(test_images, test_labels)

plt.plot(history.history['accuracy'], color='red', label='Training')
plt.plot(history.history['val_accuracy'], color='blue', label='Validierung')
plt.legend()
plt.xlabel('Epochen')
plt.ylabel('Genauigkeit')
print(f'Accuracy: {accuracy * 100:.2f}%')

In [None]:
from IPython.display import clear_output

def reshape_to_aspect(arr, aspect_ratio):
    """
    Reshapes a 1D numpy array into a 2D grid that best matches the desired aspect ratio.
    Adds padding (zeros) if necessary to fill the grid.
    
    Parameters:
    arr (numpy.ndarray): 1D input array.
    aspect_ratio (tuple): Desired aspect ratio as (width, height).
    
    Returns:
    numpy.ndarray: 2D reshaped array with padding if necessary.
    """
    if arr.size == 0:
        return np.zeros((0, 0))
    
    w, h = aspect_ratio
    target_ratio = w / h
    N = arr.size
    
    # Calculate ideal number of rows based on target ratio
    rows_ideal = np.sqrt(N / target_ratio)
    
    # Determine the range of rows to check around the ideal value
    min_row = max(1, int(np.floor(rows_ideal)) - 3)
    max_row = int(np.ceil(rows_ideal)) + 3
    
    best_diff = float('inf')
    best_pad = float('inf')
    best_shape = (1, N)  # Default shape, will be updated
    
    for row in range(min_row, max_row + 1):
        if row == 0:
            continue
        cols = (N + row - 1) // row  # Ceiling division
        actual_ratio = cols / row
        ratio_diff = abs(actual_ratio - target_ratio)
        padding = row * cols - N
        
        # Update best candidate if current is better
        if (ratio_diff < best_diff) or (ratio_diff == best_diff and padding < best_pad):
            best_diff = ratio_diff
            best_pad = padding
            best_shape = (row, cols)
    
    # Pad the array with zeros to fit the desired shape
    padded = np.pad(arr, (0, best_shape[0] * best_shape[1] - N), mode='constant')
    
    return padded.reshape(best_shape)

def visualize_model(test_image):
    clear_output(wait=True)
    
    # Display input image
    plt.figure(figsize=(6, 6))
    plt.imshow(test_image[0])
    plt.title('Input Bild')
    plt.axis('off')
    plt.show()

    for i, layer in enumerate(model.layers):    
        if isinstance(layer, Conv2D):
            output = Sequential(layers=model.layers[:i+1]).predict(test_image, verbose=0)
            filters = layer.get_weights()[0]
            n_filters = filters.shape[3]
            
            fig, axs = plt.subplots(2, n_filters, figsize=(n_filters * 2, 4))
            
            # Row 1: Filter weights
            for i in range(n_filters):
                filter_viz = np.mean(filters[:, :, :, i-1], axis=2)
                axs[0, i].imshow(filter_viz, cmap='gray')
                axs[0, i].set_title(f'Filter {i + 1} Gewichte')
                axs[0, i].axis('off')
            
            # Row 2: Filter outputs
            for i in range(n_filters):
                axs[1, i].imshow(output[0, :, :, i], cmap='gray')
                axs[1, i].set_title(f'Filter {i + 1} Output')
                axs[1, i].axis('off')
            
            plt.suptitle(f'Layer: {layer.name} (Convolutional)', y=1.05)
            plt.tight_layout()
            plt.show()
            
        elif isinstance(layer, (MaxPooling2D, AveragePooling2D)):
            output = Sequential(layers=model.layers[:i+1]).predict(test_image, verbose=0)
            n_outputs = output.shape[-1]
            
            fig, axs = plt.subplots(1, n_outputs, figsize=(n_outputs * 2, 2))
            
            for j in range(n_outputs):
                axs[j].imshow(output[0, :, :, j], cmap='gray' if output.shape[-1] != 3 else ['Reds', 'Greens', 'Blues'][j])
                axs[j].set_title(f'Output {j + 1}')
                axs[j].axis('off')
            
            pool_type = "Max Pooling" if isinstance(layer, MaxPooling2D) else "Average Pooling"
            plt.suptitle(f'Schicht: {layer.name} ({pool_type})', y=1.05)
            plt.tight_layout()
            plt.show()   
        
        elif i == len(model.layers) - 1:
            output = Sequential(layers=model.layers[:i+1]).predict(test_image, verbose=0)
            fig, ax = plt.subplots(1, 1, figsize=(5, 8))
            ax.barh(np.arange(len(output[0])), output[0])
            ax.set_yticks(np.arange(len(output[0])))
            ax.set_yticklabels([categories[i] for i in range(len(output[0]))], rotation=45, ha='right')
            ax.set_title('Output Klassenwahrscheinlichkeiten')
            plt.tight_layout()
            plt.show()
        
                
        elif isinstance(layer, Dense):
            output = Sequential(layers=model.layers[:i+1]).predict(test_image, verbose=0)
            image = reshape_to_aspect(output[0], (5, 1))
            plt.title(f'Schicht: {layer.name} (Dense)')
            plt.axis('off')
            plt.imshow(image, cmap='gray')


# find images from validation set that are misclassified
misclassified = []
for i, (image, label) in enumerate(zip(validate_images, validate_labels)):
    prediction = model.predict(image[np.newaxis, ...])
    predicted_label = np.argmax(prediction)
    if predicted_label != label or True:
        misclassified.append((image, label, predicted_label))
        visualize_model(np.array([image]))
        input('Weiter mit Enter')
            
    

In [None]:
from IPython.display import clear_output

skipped = 0
for entry in validate_data:
    label = model.predict(np.array([entry.image]), verbose=0)[0].argmax()
    if label == entry.labels[0].category:
        skipped += 1
        continue

    print(f'Skipped: {skipped} images')
    skipped = 0
    plt.imshow(entry.image, cmap='gray')
    plt.axis('off')
    plt.title(f'Predicted: {categories[label]}, Actual: {categories[entry.labels[0].category]}')
    plt.show()
    input('Press enter to continue...')
    clear_output(wait=True)

In [None]:
def apply_filter(image: np.array, filter: np.array, stride: int = 1)->np.array:
    image_copy = deepcopy(image)
    filter_size = filter.shape[0]
    image_size = image.shape[0]
    for i in range(0, image_size - filter_size + 1, stride):
        for j in range(0, image_size - filter_size + 1, stride):
            image[i:i + filter_size, j:j + filter_size] = (image_copy[i:i + filter_size, j:j + filter_size] * filter).sum()
    
    return image


stop_signs = [entry for entry in validate_data if entry.labels[0].category == 14]
grayscale_img = np.array(stop_signs[9].image).sum(axis=2) / 3
vertical_edge_filter = np.array([[1, 0, -1], [1, 0, -1], [1, 0, -1]])
horizontal_edge_filter = np.array([[1, 1, 1], [0, 0, 0], [-1, -1, -1]])
laplace_edge_filter = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]])

plt.axis('off')
plt.imshow(grayscale_img, cmap='gray')
# plt.imsave('Stoppschild_laplace.png', grayscale_img, cmap='gray')

# horizontal = apply_filter(deepcopy(grayscale_img),  horizontal_edge_filter, stride=1)
# laplace = apply_filter(deepcopy(grayscale_img), laplace_edge_filter, stride=1)
# fig, axs = plt.subplots(1, 2, figsize=(10, 5))
# axs[0].imshow(horizontal, cmap='gray')
# axs[0].axis('off')
# axs[0].set_title('Horizontale Kanten')

# axs[1].imshow(laplace, cmap='gray')
# axs[1].axis('off')
# axs[1].set_title('Laplace-Filter')

In [None]:
# get the output value of the first layer of the custom model
# print(model.layers)

layer_models = []
for i, layer in enumerate(model.layers):
    layer_model = Sequential()
    for j in range(i + 1):
        layer_model.add(model.layers[j])
    layer_models.append(layer_model)

# layer_models[1].summary()

# image = train_images[:1]
# print(image.shape)
# layer_models[1].predict(image)

# raise

# while True:
#     # keras expects a batch of images, so we need to add an additional dimension to the image
#     input_data = test_images[np.random.randint(0, len(test_images))][np.newaxis]
#     layer_output = [layer_model.predict(input_data) for layer_model in layer_models]

#     # the plot should look like this:
#     #                        Layer 1 Filter 1 weights     Layer 1 Output 1       ...       Layer n Filter 1 weights     Layer n Output 1      
#     # Original Image         ...                          ...                    ...       ...                          ...                     Predicted Category
#     #                        Layer 1 Filter n weights     Layer 1 Output n       ...       Layer n Filter n weights     Layer n Output n
#     #

#     fig, ax, plt.subplots()


#     plt.show()

stop_signs = [entry for entry in validate_data if entry.labels[0].category == 14]

# display the stop_signs[9].image in black and white and 28x28 pixels
# right beside it display the same image but instead of pixels show values between 0 and 1 for the pixels
size = 14
grayscale_img = np.array(stop_signs[9].image).sum(axis=2) / 3
scaled_img = np.array(Image.fromarray(grayscale_img).resize((size, size)))
fig, axs = plt.subplots(1, 3, figsize=(16, 8))
axs[0].imshow(grayscale_img, cmap='gray')
axs[0].axis('off')
axs[0].set_title('Original (Schwarz-Weiß)')

axs[1].imshow(scaled_img, cmap='gray')
axs[1].axis('off')
axs[1].set_title('Skaliert auf 14x14 Pixel')

axs[2].imshow(scaled_img, cmap='gray')
axs[2].axis('off')
axs[2].set_title('Helligkeitswerte')

for i in range(size):
    for j in range(size):
        axs[2].text(j, i, f'{scaled_img[i, j]:.2f}', ha='center', va='center', color='black' if scaled_img[i, j] > 0.5 else 'white', fontsize=7)

plt.show()


input_data = test_images[np.random.randint(0, len(test_images))][np.newaxis]
input_data = stop_signs[9].image[np.newaxis]
layer_output = layer_models[0].predict(input_data)
# plot the original image, the weights of the first kernel and the output of the first layer
fig, axs = plt.subplots(1, 3, figsize=(12, 4))
axs[0].imshow(input_data[0])
axs[0].axis('off')
axs[0].set_title('Original')

weights = model.layers[0].get_weights()[0]
weights_min = weights.min()
weights_max = weights.max()
weights = (weights - weights_min) / (weights_max - weights_min)
axs[1].imshow(weights[:, :, 0, 0], cmap='gray')
axs[1].axis('off')
axs[1].set_title('First Kernel Weights')

output = layer_output[0]
axs[2].imshow(output[:, :, 0], cmap='gray')
axs[2].axis('off')
axs[2].set_title('Output of First Layer')


## What happened?
1. I suffered from exploding gradients with HeInitialization() and MeanSquaredError(). I tried gradient clipping to 1.0 but turns out I don't have exploding gradients but vanishing gradients and exploding outputs. The outputs were very high with very low gradients, weird... Possible reasons are MeanSquaredError(), ReLU() and the batch size of 1 leading to instable gradients. Other potential reason: high learning rate.
2. I changed to CrossEntropy() -> still vanishing gradients
3. LeakyReLU() -> still vanishing
4. Used RandomInitialization() and Sigmoid() activation function, works perfectly now

I'm going to try adding batch processing so it will also work, maybe something with my Softmax output function was wrong, I want to try the original architecture (128 64 LeakyReLU, CrossEntropy, Xavier but with a different output activation function to see if the code for Softmax is broken)


