In [9]:
import os
import numpy as np
import sklearn
from sklearn.model_selection import train_test_split
from pathlib import Path
import preprocessing


current_folder = Path()
dataset_folder = current_folder / "dataset"
images_folder = dataset_folder / "images"
models_folder = current_folder / "models"
logs_folder = current_folder / "logs"

X_train, X_test, y_train, y_test = preprocessing.get_dataset()
X_train = np.asarray(X_train)
X_test = np.asarray(X_test)
y_train = np.asarray(y_train)
y_test = np.asarray(y_test)

INFO: modified dataset already created


In [10]:
np.unique(y_train, return_counts=True)

(array(['crosswalk', 'speedlimit', 'stop', 'trafficlight'], dtype='<U12'),
 array([160, 626,  74, 135], dtype=int64))

In [11]:
from imblearn.over_sampling import SMOTE

X_flat = np.reshape(X_train, (X_train.shape[0], int(np.product(X_train.shape) / X_train.shape[0])))

sm = SMOTE(n_jobs=-1, random_state=42)
X_train_os, y_train_os = sm.fit_resample(X_flat, y_train)

X_train_os_rs = np.reshape(X_train_os, tuple([X_train_os.shape[0]]) + X_train.shape[1:])

X_train = X_train_os_rs
y_train = y_train_os

In [12]:
np.unique(y_train, return_counts=True)

(array(['crosswalk', 'speedlimit', 'stop', 'trafficlight'], dtype='<U12'),
 array([626, 626, 626, 626], dtype=int64))

In [13]:
# One hot encoding
label_enc = sklearn.preprocessing.LabelEncoder()
y_train = label_enc.fit_transform(y_train)
y_test = label_enc.transform(y_test)
one_hot = sklearn.preprocessing.OneHotEncoder(sparse=False)
y_train = one_hot.fit_transform(y_train.reshape(-1, 1))
y_test = one_hot.transform(y_test.reshape(-1, 1))

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train, y_train, test_size=0.25, random_state=54
)

In [14]:
print(np.sum(y_train, axis=0) / len(y_train))
print(np.sum(y_test, axis=0) / len(y_train))

[0.25292865 0.25079872 0.24440895 0.25186368]
[0.02129925 0.08359957 0.00905218 0.01863685]


In [15]:
import tensorflow as tf
from tensorflow import keras
from functools import partial

# Keras model
DefaultConv2D = partial(
    keras.layers.Conv2D, kernel_size=3, activation="relu", padding="SAME"
)

model = keras.Sequential(
    [
        DefaultConv2D(
            filters=64, kernel_size=7, input_shape=list(X_train[0].shape)
        ),  # was 28, 28, 1
        keras.layers.MaxPooling2D(pool_size=2),
        DefaultConv2D(filters=128),
        DefaultConv2D(filters=128),
        keras.layers.MaxPooling2D(pool_size=2),
        DefaultConv2D(filters=256),
        DefaultConv2D(filters=256),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=128, activation="relu"),
        keras.layers.Dropout(0.2),  # lower less regularization
        keras.layers.Dense(units=64, activation="relu"),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(units=4, activation="softmax"),
    ]
)

model.compile(
    loss="categorical_crossentropy",
    optimizer="sgd",
    metrics=["accuracy"],
)

# loss
# sparse_categorical_crossentropy used for sparse labels (target class index)
# categorial_cross_entropy would yield a one-hot vector (only one positive label)
# mean_squared_error for regression

tensorboard_cb = tf.keras.callbacks.TensorBoard(
    log_dir=logs_folder,
    histogram_freq=1,
)
early_stopping_cb = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=3,
    restore_best_weights=False,
)

print("Start training")
history = model.fit(
    X_train,
    y_train,
    epochs=500,
    validation_data=(X_valid, y_valid),
    callbacks=[tensorboard_cb, early_stopping_cb],
)

# Save model
os.makedirs(models_folder, exist_ok=True)
model.save(models_folder / "model1.h5")

Start training
Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500


In [16]:
# Evaluate per class
y_pred = model.predict(X_test)
Y_pred = np.argmax(y_pred, axis=1)  # one-hot to index
Y_test = np.argmax(y_test, axis=1)
print(
    sklearn.metrics.classification_report(
        Y_test, Y_pred, target_names=label_enc.classes_
    )
)

# Evaluate general
test_results = model.evaluate(X_test, y_test)  # loss and metrics
print(f"Test Data - Loss: {test_results[0]:.3f}, Metrics: {test_results[1:]}")

              precision    recall  f1-score   support

   crosswalk       0.93      0.93      0.93        40
  speedlimit       0.98      1.00      0.99       157
        stop       0.94      0.94      0.94        17
trafficlight       0.97      0.86      0.91        35

    accuracy                           0.96       249
   macro avg       0.95      0.93      0.94       249
weighted avg       0.96      0.96      0.96       249

Test Data - Loss: 0.232, Metrics: [0.9638554453849792]
