# Dotscience Roadsigns Demo

In [None]:
import dotscience as ds
import numpy as np
import pandas as pd
import pickle
import os
import cv2
import random
import skimage.morphology as morp
from skimage.filters import rank
from skimage.transform import resize
import matplotlib.pyplot as plt
import tensorflow as tf
import logging
import scipy
logging.getLogger('tensorflow').disabled = True

In [None]:
ds.start()
DATASET = "databricks"
ds.parameter("dataset", DATASET)

In [None]:
def transform_pickle(loaded):
    """
    Convert a pandas (image, label) pandas dataset, where image is a pyspark
    image, to a {features: [list of image data], labels: [list of label ids]}
    format as expected by the rest of this script.
    
    Also resize the images to fit the expected 32x32x3 array shape.
    
    See: https://spark.apache.org/docs/2.3.0/api/python/pyspark.ml.html#pyspark.ml.image._ImageSchema.toNDArray
    """
    labels = loaded["label"].values
    images = np.zeros((len(loaded["image"]), 32, 32, 3), dtype=np.uint8)

    for idx, image in enumerate(loaded["image"]):
        height = image["height"]
        width = image["width"]
        nChannels = image["nChannels"]
        ndimg = np.ndarray(
            shape=(height, width, nChannels),
            dtype=np.uint8,
            buffer=image["data"],
            strides=(width * nChannels, nChannels, 1))
        images[idx] = resize(ndimg, (32, 32))
    
    return {"features": np.asarray(images), "labels": labels}

In [None]:
train = transform_pickle(pickle.load(open(ds.input("../data/%s-train.p" % (DATASET,)),"rb")))
valid = transform_pickle(pickle.load(open(ds.input("../data/%s-validate.p" % (DATASET,)),"rb")))
test = transform_pickle(pickle.load(open(ds.input("../data/%s-test.p" % (DATASET,)),"rb")))

In [None]:
train['features'].shape

In [None]:
orig = pickle.load(open(ds.input("../test.p"),"rb"))

In [None]:
orig['features'].shape

In [None]:
import csv
# Mapping ClassID to traffic sign names
signs = []
with open(ds.input('../data/signnames.csv'), 'r') as csvfile:
    signnames = csv.reader(csvfile, delimiter=',')
    next(signnames,None)
    for row in signnames:
        signs.append(row[1])
    csvfile.close()

In [None]:
X_train, y_train = train['features'], train['labels']
X_valid, y_valid = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']

# Number of training examples
n_train = X_train.shape[0]

# Number of testing examples
n_test = X_test.shape[0]

# Number of validation examples.
n_validation = X_valid.shape[0]

# What's the shape of an traffic sign image?
image_shape = X_train[0].shape

# How many unique classes/labels there are in the dataset.
n_classes = len(np.unique(y_train))

print("Number of training examples: ", n_train)
print("Number of testing examples: ", n_test)
print("Number of validation examples: ", n_validation)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)

In [None]:
X_train[0].shape

# Preprocess data

In [None]:
# define helper functions
def list_images(dataset, dataset_y, ylabel="", cmap=None):
    """
    Display a list of images in a single figure with matplotlib.
        Parameters:
            images: An np.array compatible with plt.imshow.
            lanel (Default = No label): A string to be used as a label for each image.
            cmap (Default = None): Used to display gray images.
    """
    plt.figure(figsize=(15, 16))
    for i in range(6):
        plt.subplot(1, 6, i+1)
        indx = random.randint(0, len(dataset))
        #Use gray scale color map if there is only one channel
        cmap = 'gray' if len(dataset[indx].shape) == 2 else cmap
        plt.imshow(dataset[indx], cmap = cmap)
        plt.xlabel(signs[dataset_y[indx]])
        plt.ylabel(ylabel)
        plt.xticks([])
        plt.yticks([])
    plt.tight_layout(pad=0, h_pad=0, w_pad=0)
    plt.show()
    
def histogram_plot(dataset, label):
    """
    Plots a histogram of the input data.
        Parameters:
            dataset: Input data to be plotted as a histogram.
            lanel: A string to be used as a label for the histogram.
    """
    hist, bins = np.histogram(dataset, bins=n_classes)
    width = 0.7 * (bins[1] - bins[0])
    center = (bins[:-1] + bins[1:]) / 2
    plt.bar(center, hist, align='center', width=width)
    plt.xlabel(label)
    plt.ylabel("Image count")
    plt.show()
    
def gray_scale(image):
    """
    Convert images to gray scale.
        Parameters:
            image: An np.array compatible with plt.imshow.
    """
    return cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

def local_histo_equalize(image):
    """
    Apply local histogram equalization to grayscale images.
        Parameters:
            image: A grayscale image.
    """
    kernel = morp.disk(30)
    img_local = rank.equalize(image, selem=kernel)
    return img_local

def image_normalize(image):
    """
    Normalize images to [0, 1] scale.
        Parameters:
            image: An np.array compatible with plt.imshow.
    """
    image = np.divide(image, 255)
    return image

def preprocess(data):
    """
    Applying the preprocessing steps to the input data.
        Parameters:
            data: An np.array compatible with plt.imshow.
    """
    gray_images = list(map(gray_scale, data))
    equalized_images = list(map(local_histo_equalize, gray_images))
    n_training = data.shape
    normalized_images = np.zeros((n_training[0], n_training[1], n_training[2]))
    for i, img in enumerate(equalized_images):
        normalized_images[i] = image_normalize(img)
    normalized_images = normalized_images[..., None]
    return normalized_images

In [None]:
X_valid_preprocessed = preprocess(X_valid)
X_test_preprocessed = preprocess(X_test)
X_train_preprocessed = preprocess(X_train)

In [None]:
X_train_preprocessed.shape
y_train.shape

# Train & test model

## Decoder model

Uses `tf.keras` to decode base64, and resize the image to a tensor of shape (32, 32, 1).

Note that this model _must_ be supplied urlsafe base64. You can convert regular base64 to urlsafe using Python's [`base64` module](https://docs.python.org/3.7/library/base64.html).

In [None]:
def preprocess_and_decode(img_str):
    #print("[preprocess_and_decode] got %s" % (img_str,))
    img = tf.io.decode_base64(img_str)
    img = tf.image.decode_jpeg(img, channels=1)
    img = tf.image.resize_images(img, (32, 32))
    img = tf.cast(img, tf.float32)
    #img = preprocess(tf.Tensor([img]))
    return img
  
InputLayer = tf.keras.Input(shape = (1,),dtype="string")
OutputLayer = tf.keras.layers.Lambda(lambda img : tf.map_fn(lambda im : preprocess_and_decode(im[0]), img, dtype="float32"))(InputLayer)
base64_model = tf.keras.Model(InputLayer, OutputLayer)


## Convolutional neural net


In [None]:
num_classes=43
conv = tf.keras.models.Sequential()
conv.add(tf.keras.layers.Conv2D(32, kernel_size=(5, 5), strides=(1, 1), activation='relu', input_shape=(32, 32, 1)))
conv.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
conv.add(tf.keras.layers.Conv2D(32, (5, 5), activation='relu'))
conv.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
conv.add(tf.keras.layers.Flatten())
conv.add(tf.keras.layers.Dense(1000, activation='relu'))
conv.add(tf.keras.layers.Dense(units = num_classes, activation='softmax'))

conv.compile(optimizer=ds.parameter("optimizer", 'adam'),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

es = tf.keras.callbacks.EarlyStopping(monitor='acc')

conv.fit(X_train_preprocessed, y_train,
          epochs=ds.parameter("epochs", 3),
          verbose=1,
          validation_data=(X_valid_preprocessed, y_valid),
          callbacks=[es])

In [None]:
ds.summary("accuracy", conv.evaluate(X_test_preprocessed, y_test)[1])

## Combined model of models

We wire up the decoder and the convolutional neural net, so we can serve them both as a single model.


In [None]:
base64_input = base64_model.input
final_output = conv(base64_model.output)
model = tf.keras.Model(base64_input,final_output)

# Save the `SavedModel` model to Dotscience


In [None]:
# Fetch the Keras session and save the model
# The signature definition is defined by the input and output tensors,
# and stored with the default serving key
import tempfile

MODEL_DIR = "../model"
version = 1
export_path = os.path.join(MODEL_DIR, str(version))
print('export_path = {}\n'.format(export_path))
if os.path.isdir(export_path):
  print('\nAlready saved a model, cleaning up\n')
  !rm -r {export_path}

tf.saved_model.simple_save(
    tf.keras.backend.get_session(),
    export_path,
    inputs={'input_image_bytes': model.input}, 
    outputs={t.name:t for t in model.outputs})

ds.output(MODEL_DIR)

print('\nSaved model:')
!ls -l {export_path}

In [None]:
!saved_model_cli show --dir {export_path} --all

In [None]:
ds.label("model.directory", "model")
ds.label("model.framework", "tensorflow")
ds.label("model.framework.version", tf.__version__)

In [None]:
ds.publish("trained tensorflow model")