In [1]:
import os
import numpy as np
import sklearn
from sklearn.model_selection import train_test_split
from pathlib import Path
import preprocessing


current_folder = Path()
dataset_folder = current_folder / "dataset"
images_folder = dataset_folder / "images"
models_folder = current_folder / "models"
logs_folder = current_folder / "logs"

X_train, X_test, y_train, y_test = preprocessing.get_dataset()
X_train = np.asarray(X_train)
X_test = np.asarray(X_test)
y_train = np.asarray(y_train)
y_test = np.asarray(y_test)

INFO: modified dataset already created


In [2]:
np.unique(y_train, return_counts=True)

(array(['crosswalk', 'speedlimit', 'stop', 'trafficlight'], dtype='<U12'),
 array([114, 554,  67,  91]))

In [4]:
! pip install imblearn

Collecting imblearn
  Downloading imblearn-0.0-py2.py3-none-any.whl (1.9 kB)
Collecting imbalanced-learn
  Downloading imbalanced_learn-0.10.1-py3-none-any.whl (226 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m226.0/226.0 kB[0m [31m458.6 kB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m0:01[0m:01[0m
Installing collected packages: imbalanced-learn, imblearn
Successfully installed imbalanced-learn-0.10.1 imblearn-0.0


In [5]:
from imblearn.over_sampling import SMOTE

X_flat = np.reshape(X_train, (X_train.shape[0], int(np.product(X_train.shape) / X_train.shape[0])))

sm = SMOTE(n_jobs=-1, random_state=42)
X_train_os, y_train_os = sm.fit_resample(X_flat, y_train)

X_train_os_rs = np.reshape(X_train_os, tuple([X_train_os.shape[0]]) + X_train.shape[1:])

X_train = X_train_os_rs
y_train = y_train_os



In [None]:
np.unique(y_train, return_counts=True)

In [None]:
# One hot encoding
label_enc = sklearn.preprocessing.LabelEncoder()
y_train = label_enc.fit_transform(y_train)
y_test = label_enc.transform(y_test)
one_hot = sklearn.preprocessing.OneHotEncoder(sparse=False)
y_train = one_hot.fit_transform(y_train.reshape(-1, 1))
y_test = one_hot.transform(y_test.reshape(-1, 1))

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train, y_train, test_size=0.25, random_state=54
)

In [None]:
print(np.sum(y_train, axis=0) / len(y_train))
print(np.sum(y_test, axis=0) / len(y_train))

In [None]:
import tensorflow as tf
from tensorflow import keras
from functools import partial

# Keras model
DefaultConv2D = partial(
    keras.layers.Conv2D, kernel_size=3, activation="relu", padding="SAME"
)

model = keras.Sequential(
    [
        DefaultConv2D(
            filters=64, kernel_size=7, input_shape=list(X_train[0].shape)
        ),  # was 28, 28, 1
        keras.layers.MaxPooling2D(pool_size=2),
        DefaultConv2D(filters=128),
        DefaultConv2D(filters=128),
        keras.layers.MaxPooling2D(pool_size=2),
        DefaultConv2D(filters=256),
        DefaultConv2D(filters=256),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=128, activation="relu"),
        keras.layers.Dropout(0.2),  # lower less regularization
        keras.layers.Dense(units=64, activation="relu"),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(units=4, activation="softmax"),
    ]
)

model.compile(
    loss="categorical_crossentropy",
    optimizer="sgd",
    metrics=["accuracy"],
)

# loss
# sparse_categorical_crossentropy used for sparse labels (target class index)
# categorial_cross_entropy would yield a one-hot vector (only one positive label)
# mean_squared_error for regression

tensorboard_cb = tf.keras.callbacks.TensorBoard(
    log_dir=logs_folder,
    histogram_freq=1,
)
early_stopping_cb = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=3,
    restore_best_weights=False,
)

print("Start training")
history = model.fit(
    X_train,
    y_train,
    epochs=500,
    validation_data=(X_valid, y_valid),
    callbacks=[tensorboard_cb, early_stopping_cb],
)

# Save model
os.makedirs(models_folder, exist_ok=True)
model.save(models_folder / "model1.h5")

In [None]:
# Evaluate per class
y_pred = model.predict(X_test)
Y_pred = np.argmax(y_pred, axis=1)  # one-hot to index
Y_test = np.argmax(y_test, axis=1)
print(
    sklearn.metrics.classification_report(
        Y_test, Y_pred, target_names=label_enc.classes_
    )
)

# Evaluate general
test_results = model.evaluate(X_test, y_test)  # loss and metrics
print(f"Test Data - Loss: {test_results[0]:.3f}, Metrics: {test_results[1:]}")