# Single Input Multiple Output Preprocessing Layers

<img src="preprocessing.png" alt="Drawing" style="width: 500px;"/>

*Image taken from https://blog.tensorflow.org/2021/11/an-introduction-to-keras-preprocessing.html*

In this example we will show case how to apply different transformations and preprocessing steps on the same feature. What we have here is an example of a **Single input Multiple output** feature transformation scenario.

This is what the feature transformation `Pipeline` looks like:

                                            Feature
                                              /  \
                                             /    \
                                            /      \
                                           /        \
                                      Transform1 Transform2
                                             \    /
                                              \  /
                                               \/
                                     Concat into a Single Layer

We will be utilizing a library called `easyflow` which implements feature transformation pipelines natively implemented in Keras (https://pypi.org/project/easy-tensorflow/)

In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds

from easyflow.preprocessing.pipeline import FeatureUnion
from easyflow.preprocessing import (FeatureInputLayer,
                                    PreprocessorChain,
                                    MultiOutputTransformer
                                   )

For our example we will use the imdb reviews dataset. The steps here is similar to the preprocessing example on th Tensorflow blog: https://blog.tensorflow.org/2021/11/an-introduction-to-keras-preprocessing.html. We will however make use of the Feature Preprocessing and Transformation Pipelines from the `easyflow` library.

In [2]:
train_ds = tfds.load('imdb_reviews', split='train', as_supervised=True).batch(32)
train_ds = train_ds.map(lambda x, y: ({'review': x}, y))

# Create Feature transformation Pipeline

Lets create our feature transformation Pipeline. For this example we only have one raw feature; `review` . The transformations that we will be applying is:

1) TextVectorization as one step and;\
2) Another step transforming text to the length of the review and than normalizing it.

These steps will be concatenated in our final output layer. The transformation layer we will be using is a custom layer implemented in `easyflow` namely `MultiOutputPreprocessor` . This layer takes as input a list of independent preprocessing or transformation steps (composed of layers) that will be applied on the giving feature. `MultiOutputTransformer` is used as a step in `FeaturePreprocessor` and `FeatureUnion` .

In [3]:
def TextLengthPipeline():
    """Create a sequential Pipeline to compute length followed by normalising feature
    """
    return PreprocessorChain([
        tf.keras.layers.Lambda(lambda x: tf.strings.length(x)),
        tf.keras.layers.Normalization()
    ])

steps = MultiOutputTransformer([
    # transform 1: create multi hot encoder
    tf.keras.layers.TextVectorization(output_mode='multi_hot', max_tokens=2500),
    # transform 2: get the length of the review
    TextLengthPipeline()
])

pipeline = FeatureUnion([
    ('review', steps, ['review'])
])

pipeline.adapt(train_ds)

2022-04-18 18:04:23.129960: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


In [4]:
feature_layer_inputs = FeatureInputLayer(
    {'review': tf.string}
)

preprocessed_inputs = pipeline(feature_layer_inputs)

Next we will use the common pattern for training by creating a model that applies the preprocessing to speed up training. When we start from raw data as in our example. We need to preprocess all string operations on the CPU and than feed that to a GPU. Preprocessing is also not something that we train and it is independent from the forward pass. This will reduce our throughput as the GPU will be idle while waiting for data. To speed things up we will prefetch batches of preprocessed data. This will ensure that while we processing batch of data on the GPU the CPU is getting the next batch of preprocessed data ready.

<img src="gpu_cpu_gaps.png" alt="Drawing" style="width: 500px;"/>

*Image taken from https://www.tensorflow.org*

In [5]:
# create our preprocessing model
preprocessing_model = tf.keras.Model(feature_layer_inputs, preprocessed_inputs)

# create training model that will be applied on the forward pass
outputs = tf.keras.layers.Dense(1, activation='sigmoid')(preprocessed_inputs)
training_model = tf.keras.Model(preprocessed_inputs, outputs)
training_model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.BinaryCrossentropy(),
    metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy'), tf.keras.metrics.AUC(name='auc')])

In [6]:
preprocessed_ds = train_ds.map(
    lambda x, y: (preprocessing_model(x), y),
    num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE)

In [7]:
training_model.fit(preprocessed_ds, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x1753c84f0>

This gives as nice speed improvement. Our utilization graph will look something like this:

<img src="full_utilization.png" alt="Drawing" style="width: 500px;"/>

*Image taken from https://www.tensorflow.org*