In [None]:
# Imports
import tensorflow as tf
# import cupy as cp
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import *

print("Reg Train Keras version: ", keras.__version__)
print("Reg Train TF version: ", tf.__version__)

from ml_library.model import *
from ml_library.utils import *
from ml_library.config import *
from ml_library.w_utils import *

import warnings
warnings.filterwarnings("ignore")

model_dir = '/models/'
tmpdir = tempfile.mkdtemp()


# Import Dataset

In [None]:


importDatasets()


# Load data
training_file = "./Datasets/GTSRB_Final_Training_Images.zip"
testing_file = "./Datasets/GTSRB_Final_Test_Images.zip"

train = generateTensor(training_file)
test = generateTensor(testing_file)

x_train, y_train = split(train)
x_test, y_test = split(test)

label_map = map_labels('signnames.csv')

#Train/validation split
x_train, x_valid, y_train, y_valid = train_test_split(
    np.asarray(x_train), np.asarray(y_train), test_size=VALIDATION_SIZE)


valid_ratio = len(x_valid) / len(file_paths)
print("Train size: {} Validation size: {} ({:0.3f})").format(
    len(x_train),
    len(x_valid),
    valid_ratio)

classes, dist = np.unique(y_train+y_valid, return_counts=True)
NUM_CLASSES = len(classes)
print("No classes:{}".format(NUM_CLASSES))

plt.bar(classes, dist, align='center', alpha=0.5)
plt.show()


print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("x_valid shape:", x_valid.shape)
print("y_valid shape:", y_valid.shape)
print("x_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)


In [None]:
def visualize(original, augmented):
  fig = plt.figure()
  plt.subplot(1, 2, 1)
  plt.title('Original image')
  plt.imshow(original)

  plt.subplot(1, 2, 2)
  plt.title('Augmented image')
  plt.imshow(augmented)




In [None]:
image, label = next(iter(train_ds))
_ = plt.imshow(image)
_ = plt.title(get_label_name(label))

flipped = tf.image.flip_left_right(image)
visualize(image, flipped)

grayscaled = tf.image.rgb_to_grayscale(image)
visualize(image, tf.squeeze(grayscaled))
_ = plt.colorbar()


saturated = tf.image.adjust_saturation(image, 3)
visualize(image, saturated)


bright = tf.image.adjust_brightness(image, 0.4)
visualize(image, bright)


cropped = tf.image.central_crop(image, central_fraction=0.5)
visualize(image,cropped)

rotated = tf.image.rot90(image)
visualize(image, rotated)


In [None]:
original_dim = IMG_SIZE*IMG_SIZE

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

# First, let's create a training Dataset instance.
# For the sake of our example, we'll use the same MNIST data as before.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Shuffle and slice the dataset.
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

# Prepare the validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))
val_dataset = val_dataset.batch(64)

# Now we get a test dataset.
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.batch(64)

AUTOTUNE = tf.data.AUTOTUNE

train_dataset = train_dataset.cache().shuffle(
    1000).prefetch(buffer_size=AUTOTUNE)
val_dataset = val_dataset.cache().prefetch(buffer_size=AUTOTUNE)



vae = VariationalAutoEncoder(original_dim, IMG_SIZE*2, IMG_SIZE, REIN=False)
vae.compile(optimizer, loss= mse_loss_fn, metrics= loss_metric)
vae.fit(x_train, x_train, epochs=NUM_EPOCH,  batch_size=64, verbose=1)
vae.evaluate(x_valid, x_valid, verbose=1)


# epochs = NUM_EPOCH

# # Iterate over epochs.
# for epoch in range(epochs):
#     print("Start of epoch %d" % (epoch,))

#     # Iterate over the batches of the dataset.
#     for step, x_batch_train in enumerate(train_dataset):
#         with tf.GradientTape() as tape:
#             reconstructed = vae(x_batch_train)
#             # Compute reconstruction loss
#             loss = mse_loss_fn(x_batch_train, reconstructed)
#             loss += sum(vae.losses)  # Add KLD regularization loss

#         grads = tape.gradient(loss, vae.trainable_weights)
#         optimizer.apply_gradients(zip(grads, vae.trainable_weights))

#         loss_metric(loss)

#         if step % 100 == 0:
#             print("step %d: mean loss = %.4f" % (step, loss_metric.result()))


model_path = os.path.join(tmpdir, 'normal_vae')
module(tf.constant(0.))
print('Saving model...')
vae.save(model_path)
# tf.saved_model.save(vae, model_path)


Train Normal Model

In [None]:

normal_model = get_compiled_model(REIN=False)


# Since the dataset already takes care of batching,
# we don't pass a `batch_size` argument.
hist_norm = normal_model.fit(vae.predict(train_dataset), validation_data=val_dataset,
                 validation_steps=10, epochs=NUM_EPOCH, verbose= 1)

# Evaluate the model
print("Evaluate")
result_normal = normal_model.evaluate(test_dataset)
dict(zip(normal_model.metrics_names, result_normal))


model_path = os.path.join(tmpdir, 'normal_model')
module(tf.constant(0.))
print('Saving model...')
normal_model.save(model_path)
# tf.saved_model.save(normal_model, model_path)


Train REIN Model

In [None]:

vae = VariationalAutoEncoder(original_dim, IMG_SIZE*2, IMG_SIZE, REIN=True)
vae.compile(optimizer, loss=mse_loss_fn, metrics=loss_metric)
vae.fit(x_train, x_train, epochs=NUM_EPOCH,  batch_size=64, verbose=1)
vae.evaluate(x_valid, x_valid, verbose=1)

model_path = os.path.join(tmpdir, 'rein_vae')
module(tf.constant(0.))
print('Saving model...')
vae.save(model_path)
# tf.saved_model.save(vae, model_path)


In [None]:

rein_model = get_compiled_model(REIN=True)


# Since the dataset already takes care of batching,
# we don't pass a `batch_size` argument.
hist_rein = rein_model.fit(vae.predict(train_dataset), validation_data=val_dataset,
                      validation_steps=10, epochs=NUM_EPOCH, verbose=1)

# Evaluate the model
print("Evaluate")
result_rein = rein_model.evaluate(test_dataset)
dict(zip(rein_model.metrics_names, result_rein))


model_path = os.path.join(tmpdir, 'rein_model')
module(tf.constant(0.))
print('Saving model...')
rein_model.save(model_path)
# tf.saved_model.save(rein_model, rein_model_path)


## Testing

In [None]:

testing_file = './Datasets/GTSRB_Final_Test_Images.zip'
test = generateTensor(testing_file)
X_test, y_test = preprocess_data(test)
print("x_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)

# # model_path = normal_model_path
# model_path = rein_model_path
# model = tf.saved_model.load(model_path)
# adv_acc = model.evaluate()

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    # model_path = normal_model_path
    model_path = rein_model_path
    model = tf.saved_model.load(model_path)
    acc = model.evaluate(x_test, y_test)
    print("Test Set Accuracy = {:.3f}".format(acc))


    for i in range(20):
        rain_images = rain(x_test, i)
        adv_acc = model.evaluate(rain_images, y_test)
        print("Rain Drops %.2d  vs. Accuracy vs Adv_Accuracy：" % i, acc, adv_acc)
    
    for i in range(20):
        rain_images = fog(x_test, i)
        adv_acc = model.evaluate(rain_images, y_test)
        print("fog Drops %.2d  vs. Accuracy vs Adv_Accuracy：" % i, acc, adv_acc)
    
    for i in range(20):
        rain_images = light(x_test, i)
        adv_acc = model.evaluate(rain_images, y_test)
        print("light Drops %.2d  vs. Accuracy vs Adv_Accuracy：" %
              i, acc, adv_acc)
    
    for i in range(20):
        rain_images = dark(x_test, i)
        adv_acc = model.evaluate(rain_images, y_test)
        print("dark Drops %.2d  vs. Accuracy vs Adv_Accuracy：" % i, acc, adv_acc)
    
    for i in range(20):
        rain_images = blur(x_test, i)
        adv_acc = model.evaluate(rain_images, y_test)
        print("Blur deviation %.2d  vs. Accuracy vs Adv_Accuracy：" %
              i, acc, adv_acc)
