# Notebook for PreProcessing Images Using TFRecords and the Dataset API in Tensorflow. Notebook (2/6) in the End-to-End Scalable Deep Learning Pipeline on Hops.

This notebook will read the TFRecords that were written by notebook number 1 ([Notebook number one](./Step1_Convert_To_TFRecords.ipynb)) and run them through a preprocessing pipeline that includes:

- Random shuffling of train/val/test datasets
- Data augmentation of the train dataset, including:

- Random flipping of an image (Left-to-Right, not upside-down)
- Random adjustment of the brightness in the image
- Random saturation of the RGB channels in the image

The notebook also includes simple data exploration/validation to inspect the processed images and verify them.

This notebook read TFRecords from:

- hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_raw

And output TFRecords to:

- hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_clean

Running this computation separate from the training can make the training more performance, especially if your CPUs for preprocessing is the bottleneck. 

![step2.png](./../images/step2.png)


## Imports

Tested with versions:

- numpy: 1.14.5
- hops: 2.6.4
- pydoop: 2.0a3
- tensorboard: 1.8.0
- tensorflow: 1.8.0
- tensorflow-gpu: 1.8.0
- tfspark: 1.3.5

In [1]:
import tensorflow as tf
import pydoop.hdfs as py_hdfs
from hops import hdfs
import numpy as np
import os

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
824,application_1536227070932_0146,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
  return f(*args, **kwds)

## Constants

In [2]:
PROJECT_DIR = hdfs.project_path()
DATASET_BASE_DIR = PROJECT_DIR + "tiny-imagenet/tiny-imagenet-200/"
TFR_DIR = DATASET_BASE_DIR + "tfrecords_raw/"
OUTPUT_DIR = DATASET_BASE_DIR + "tfrecords_clean/"
TRAIN_TFR_DIR = TFR_DIR + "train/"
VAL_TFR_DIR = TFR_DIR + "val/"
TRAIN_DIR = DATASET_BASE_DIR + "train"
TEST_TFR_DIR = TFR_DIR + "test/"
TF_FILE_PATTERN = "*.tfrecords"
ID_TO_CLASS_FILE = DATASET_BASE_DIR + "/words.txt"
VAL_LABELS_FILE = DATASET_BASE_DIR + "val/val_annotations.txt"
# This file is written by the other notebook when converting JPEGs to TFRecords, 
# it includes the number of files in train/val/test, which is required to shuffle the dataset correctly 
SIZES_FILE = DATASET_BASE_DIR + "sizes.txt"

## Parse Metadata about the Dataset

The dataset has some .txt files with annotation and other metadata that needs to be parsed.

In [3]:
def parse_metadata():
    """ 
    Parses the words.txt file into a map of label -> words and a list of ordered nids (index of nid = integer label).
    Also parses the val_annotations.txt file into a map of (validation_file_name --> nid)
    """
    # list all directories in the train set, the directory name is the "nid" and identifies the label
    train_dirs = py_hdfs.ls(TRAIN_DIR)
    
    # remove the path except the nid
    train_nid_list = list(map(lambda x: x.replace(TRAIN_DIR + "/", ""), train_dirs))
    
    # the number of nids equal then number of unique classes/labels
    num_classes = len(train_nid_list)
    
    # read the words.txt file that contains lines of the form "nid\twords"
    with py_hdfs.open(ID_TO_CLASS_FILE, 'r') as f:
        file_lines = f.read().decode("utf-8").split("\n")
    label_to_word = {}
    
    for l in file_lines:
        # parse each line
        wnid, word = l.split('\t')
        if wnid in train_nid_list:
            # convert the nids into integer labels by using the position in the index
            label = train_nid_list.index(wnid)
            word = str(label) + ": " + word
            # save the mapping of integer label --> words
            label_to_word[label] = word
    
    # read the val_annotations.txt file that contains lines of the form: 
    # "validation_image\tnid\tx_pos\ty_pos\tw_pos\th_pos"
    with py_hdfs.open(VAL_LABELS_FILE, 'r') as f:
        file_lines = f.read().decode("utf-8").split("\n")
    validation_file_to_nid = {}
    for l in file_lines:
        # parse each line
        tokens = l.split('\t')
        #skip corrupted lines
        if len(tokens) > 2:
            validation_img = tokens[0]
            wnid = tokens[1]
            # we only care about classification in this tutorial, not localization 
            if wnid in train_nid_list:
                validation_file_to_nid[validation_img] = wnid
    
    return train_nid_list, label_to_word, validation_file_to_nid

## Collect FileNames and Dataset Size from HopsFS

In the first notebook, we recorded the number of records in each dataset (train/val/test) and wrote these statistics to a file in HopsFS. Here we will read and parse that file. We need to know the number of records in each dataset to be able to do correct shuffling of the data, and since the Dataset API is "lazy" we cannot just apply dataset.size() to get the number of records. 

In [4]:
def get_filenames_and_size():
    """
    A function for obtaining all the resting TFRecord files to be parsed.
    We need to merge the TFRecords to be able to shuffle and batch the dataset.
    """
    # Convert regular expression file pattern into a list of files (tfrecords files)
    train_tfr_files = tf.gfile.Glob(TRAIN_TFR_DIR + TF_FILE_PATTERN)
    val_tfr_files = tf.gfile.Glob(VAL_TFR_DIR + TF_FILE_PATTERN)
    test_tfr_files = tf.gfile.Glob(TEST_TFR_DIR + TF_FILE_PATTERN)
    
    # Read sizes.txt that to get the size of the datasets (used for correct shuffling)
    with py_hdfs.open(SIZES_FILE, 'r') as f:
        file_lines = f.read().decode("utf-8").split("\n")    
    train_size = int(file_lines[0].split(",")[1])
    val_size = int(file_lines[1].split(",")[1])
    test_size = int(file_lines[2].split(",")[1])
    
    return train_tfr_files, val_tfr_files, test_tfr_files, train_size, val_size, test_size

## Reading TFRecords into Tensorflow Datasets
tf.data is the newest API for building data pipelines into deep learning models and it is what we will use.

A tf.data.Dataset represents a sequence of elements, in which each element contains one or more Tensor objects. In our case we are reading the TFRecords stored in HopsFS into one dataset for train, one for val and one for test. The elements in the datasets will be one tensor for the image and one tensor for the one-hot encoded label. 

The datasets are "lazy" and data will only be read from the resting TFRecords when required for computation or explicitly called through an iterator. 

In [5]:
def create_dataset(files):
    """
     A function for creating a TF dataset from TFR files
    """
    # Parse train dataset from a list of TFRecords files
    dataset = tf.data.TFRecordDataset(files,
        compression_type=None,    
        buffer_size=100240, 
        num_parallel_reads=os.cpu_count() # Parallel read from HopsFS
    )
    return dataset

In [6]:
def create_datasets(train_tfr_files, val_tfr_files, test_tfr_files):
    """
     A function for creating TensorFlow datasets for 
     train,val, and test from TFR files
    """
    # Parse train dataset from a list of TFRecords files
    train_dataset = create_dataset(train_tfr_files)

    # Parse validation dataset from a list of TFRecords files
    val_dataset = create_dataset(val_tfr_files)

    # Parse test dataset from a list of TFRecords files
    test_dataset = create_dataset(test_tfr_files)

    return train_dataset, val_dataset, test_dataset

## Image Parsing and Preprocessing

Since images are serialized into a binary TFRecords format, they need to be parsed into in-memory tensors to perform image preprocessing.

Image preprocessing can include: resizing, augmenting, normalizing etc.

In [7]:
def parse_tfr(example_proto):
    """
    Parses an example protocol buffer (TFRecord) into a dict of
    feature names and tensors
    """
    features = {
        'height': tf.FixedLenFeature((), tf.int64, default_value=0),
        'width': tf.FixedLenFeature((), tf.int64, default_value=0),
        'channel': tf.FixedLenFeature((), tf.int64, default_value=0),
        'label': tf.FixedLenFeature((), tf.int64, default_value=0),
        'label_one_hot': tf.FixedLenFeature((), tf.string, default_value=""),
        'image_raw': tf.FixedLenFeature((), tf.string, default_value="")
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features["image_raw"], parsed_features["label_one_hot"]

In [8]:
def decode_bytes(image, label):
    """
    Decode the bytes that was serialized in the TFRecords in HopsFS to tensors so that we can apply 
    image preprocessing
    """
    image_tensor = tf.decode_raw(image, tf.uint8),
    label_tensor = tf.decode_raw(label, tf.uint8),
    image_tensor = tf.reshape(image_tensor, [64,64,3]) #dimension information was lost when serializing to disk
    return image_tensor,label_tensor

In [9]:
def resize_image(image, label):
    """
    Function for resizing an image to 64x64 pixels
    """
    resized_image = tf.image.resize_images(image, [64, 64])
    return resized_image, label

In [10]:
def image_data_aug(image, label):
    """
    Function for performing image augmentation 
    (a technique that can improve classifier performance and reduce overfitting)
    """
    # Randomly flip an image horizontally (left to right).
    # With a 1 in 2 chance, outputs the contents of image flipped 
    # along the second dimension, which is width. 
    # Otherwise output the image as-is.
    image = tf.image.random_flip_left_right(image)

    # Adjust the brightness of images by a random factor.
    image = tf.image.random_brightness(image, max_delta=32.0 / 255.0)
    
    # Adjust the saturation of an RGB image by a random factor.
    image = tf.image.random_saturation(image, lower=0.5, upper=1.5)

    return image, label

## Defining the Image Processing Pipeline

With the dataset API different processing stages can easily be chained together into a pipeline, just like it would be done in functional programming or in a framework like Spark. 

In [11]:
def image_pre_process_pipeline(dataset, dataset_size, test_or_val_set = False):
    """
    Pipeline for parsing, shuffling, preprocessing, and batching a dataset of images and labels
    """
    # Parse the binary TFR format into dataset
    dataset = dataset.map(parse_tfr)
    # Decode the bytestrings into tensors
    dataset = dataset.map(decode_bytes)
    # Randomly shuffle the elements in the dataset
    dataset = dataset.shuffle(dataset_size)
    # Perform data augmentation on the tensors
    if not test_or_val_set:
        dataset = dataset.map(image_data_aug, num_parallel_calls=os.cpu_count())
    return dataset

In [12]:
def iterate_over_dataset(dataset):
    """
    Creating an iterator over the dataset 
    """
    iterator = dataset.make_one_shot_iterator()
    next_element_op = iterator.get_next()
    return next_element_op

## Use Tensorflow Library to Save the PreProcessed Images and Labels into TFRecords


```json
{
        'label_one_hot': _bytes_feature(label_one_hot_raw),
        'image_raw': _bytes_feature(img_raw)
}
```
 

In [13]:
def _bytes_feature(value):
    """
    Wrapper for inserting bytes features into Example proto.
    """
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

In [14]:
def create_tfr_example(image, label):
    """ 
    Creates a TFRecord Example Protobuf binary format
    """
    image_raw = image.tostring()
    example = tf.train.Example(features=tf.train.Features(
        feature={
        'label_one_hot': _bytes_feature(bytes(label[0])),
        'image_raw': _bytes_feature(image_raw)
    }))
    return example

## Simple Data Validation

Lets first try to parse some sample TFRecords that were written by notebook number 1 ([Notebook number one](./TinyImageNet_Convert_To_TFRecords.ipynb)) to verify that the parsing works correctly and that the data looks like it should.

In [15]:
# Parse metadata so we can transform numeric label into description
train_nid_list, label_to_word, validation_file_to_nid = parse_metadata()

Parsing some sample train images and their labels:
```python
def one_hot_to_int(label):
    return list(label).index(1)

dataset = create_dataset("hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_raw/train/45.tfrecords")
dataset = dataset.map(parse_tfr)
dataset = dataset.map(decode_bytes)
dataset_iterate_op = iterate_over_dataset(dataset)
sample_images = []
sample_labels= []
with tf.Session() as sess:
    for i in range(6):
        element = sess.run(dataset_iterate_op)
        image = element[0]
        label = element[1]
        sample_images.append(image)
        sample_labels.append(label[0])
        
plt.rcParams["figure.figsize"] = (14,10)
count = 0
for i in range(len(sample_images)):
    count += 1
    plt.subplot(3,3,count)
    plt.imshow(sample_images[i])
    plt.title('label: {}'.format(label_to_word[one_hot_to_int(sample_labels[i])]))
    plt.axis("off")
plt.savefig("sample_images_from_tfr.png")
plt.show()
```
![sample_images_from_tfr.png](./../images/sample_images_from_tfr.png)

Parsing some sample validation images and their labels:
```python
def one_hot_to_int(label):
    return list(label).index(1)

dataset = create_dataset("hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_raw/val/val.tfrecords")
dataset = dataset.map(parse_tfr)
dataset = dataset.map(decode_bytes)
dataset_iterate_op = iterate_over_dataset(dataset)
sample_images = []
sample_labels= []
with tf.Session() as sess:
    for i in range(24):
        element = sess.run(dataset_iterate_op)
        image = element[0]
        label = element[1]
        sample_images.append(image)
        sample_labels.append(label[0])
        
plt.rcParams["figure.figsize"] = (14,50)
count = 0
for i in range(len(sample_images)):
    count += 1
    plt.subplot(3,3,count)
    plt.imshow(sample_images[i])
    plt.title('label: {}'.format(label_to_word[one_hot_to_int(sample_labels[i])]))
    plt.axis("off")
plt.savefig("sample_images_from_tfr_val.png")
plt.show()
```
![sample_images_from_tfr_val.png](./../images/sample_images_from_tfr_val.png)

As we can see, the images look like they should and we were able to parse them correctly form the binary strings stored in the Protobuf messages on HopsFS. Moreover, the labels are now correctly grouped together with each example.

## Running the Pipeline

The datasets and the operations on them in Tensorflow are actual nodes in the Tensorflow graph so we need to create a Tensorflow session to run it in order to see some results. For each example that we read and preprocess, we write it out again to a new directory in HopsFS, this means that we can process arbitrarily large datasets since we do not need to store more than a few examples at a time in RAM memory.

In [16]:
def run(train_filename, val_filename, test_filename):
    """
    Orchestrates the pipeline and runs the steps in order:
    1. Get files
    2. Define datasets
    3. Define computational graph over datasets for preprocessing
    4. Iterate overdataset by perprocessing and saving the results to TFRecords again
    """
    # Get files
    train_tfr_files, val_tfr_files, test_tfr_files, train_size, val_size, test_size = get_filenames_and_size()
    # Get datasets (lazy)
    train_dataset, val_dataset, test_dataset = create_datasets(train_tfr_files, val_tfr_files, test_tfr_files)
    
    # train 
    train_dataset = image_pre_process_pipeline(train_dataset, train_size)
    train_dataset_iterate_op = iterate_over_dataset(train_dataset)
    
    # val
    val_dataset = image_pre_process_pipeline(val_dataset, val_size, test_or_val_set = True)
    val_dataset_iterate_op = iterate_over_dataset(val_dataset)
    
    # test
    test_dataset = image_pre_process_pipeline(test_dataset, test_size, test_or_val_set = False)
    test_dataset_iterate_op = iterate_over_dataset(test_dataset)
    
    # Run the graph and save output to TFRecords
    with tf.Session() as sess:
        train_writer = tf.python_io.TFRecordWriter(train_filename)
        for i in range(train_size):
            element = sess.run(train_dataset_iterate_op)
            image = element[0]
            label = element[1]
            tfr_example = create_tfr_example(image, label)
            train_writer.write(tfr_example.SerializeToString())
        
        val_writer = tf.python_io.TFRecordWriter(val_filename)
        for i in range(val_size):
            element = sess.run(val_dataset_iterate_op)
            image = element[0]
            label = element[1]
            tfr_example = create_tfr_example(image, label)
            val_writer.write(tfr_example.SerializeToString())
        
        test_writer = tf.python_io.TFRecordWriter(test_filename)
        for i in range(test_size):
            element = sess.run(test_dataset_iterate_op)
            image = element[0]
            label = element[1]
            tfr_example = create_tfr_example(image, label)
            test_writer.write(tfr_example.SerializeToString())

In [17]:
# Notebook entrypoint, kicking off the pipeline
run(OUTPUT_DIR + "train.tfrecords", OUTPUT_DIR + "val.tfrecords", OUTPUT_DIR + "test.tfrecords")

## Data Validation Step Two
If everything went Okay, the data should be written to HopsFS into three different files:

- hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_clean/train.tfrecords
- hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_clean/val.tfrecords
- hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_clean/test.tfrecords

These files contains preprocessed data that is ready to be fed into a machine learning model for training or evaluation. All three files are shuffled, and only `train.tfrecords` contains data that is preprocessed by random perturbations such as:

- Random flipping of an image (Left-to-Right, not upside-down)
- Random adjustment of the brightness in the image
- Random saturation of the RGB channels in the image

Let's have a look at a few sample images from the train dataset to see that everything worked correctly:

```python
dataset = create_dataset("hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_clean/train.tfrecords")
dataset = dataset.map(parse_tfr)
dataset = dataset.map(decode_bytes)
dataset_iterate_op = iterate_over_dataset(dataset)
sample_images = []
sample_labels= []
with tf.Session() as sess:
    for i in range(24):
        element = sess.run(dataset_iterate_op)
        image = element[0]
        label = element[1]
        sample_images.append(image)
        sample_labels.append(label[0])
        
plt.rcParams["figure.figsize"] = (14,50)
count = 0
for i in range(len(sample_images)):
    count += 1
    plt.subplot(12,3,count)
    plt.imshow(sample_images[i])
    plt.title('label: {}'.format(label_to_word[one_hot_to_int(sample_labels[i])]))
    plt.axis("off")
plt.savefig("sample_images_preprocessed_from_tfr.png")
plt.show()
```
![sample_images_preprocessed_from_tfr.png](./../images/sample_images_preprocessed_from_tfr.png)

And some samples from the validation set: 

```python
dataset = create_dataset("hdfs:///Projects/ImageNet_EndToEnd_MLPipeline/tiny-imagenet/tiny-imagenet-200/tfrecords_clean/val.tfrecords")
dataset = dataset.map(parse_tfr)
dataset = dataset.map(decode_bytes)
dataset_iterate_op = iterate_over_dataset(dataset)
sample_images = []
sample_labels= []
with tf.Session() as sess:
    for i in range(24):
        element = sess.run(dataset_iterate_op)
        image = element[0]
        label = element[1]
        sample_images.append(image)
        sample_labels.append(label[0])
        
plt.rcParams["figure.figsize"] = (14,50)
count = 0
for i in range(len(sample_images)):
    count += 1
    plt.subplot(12,3,count)
    plt.imshow(sample_images[i])
    plt.title('label: {}'.format(label_to_word[one_hot_to_int(sample_labels[i])]))
    plt.axis("off")
plt.savefig("sample_images_preprocessed_from_tfr_val.png")
plt.show()
```
![sample_images_preprocessed_from_tfr_val.png](./../images/sample_images_preprocessed_from_tfr_val.png)