# ETL Pipeline

For more information please refer [TFDS-PIPELINE Examples](https://github.com/SB-Jr/tldr_tensorflow/tree/master/v2/TFDS_PIPELINE)

The main focus of the ETL pipeline is to make sure that the GPU and CPU are working in sync and that none of the devices are in idle state and so that we get the most out of the hardware we have. The job of CPU is to do all the ETL process and the GPU's task is take the data from the ETL pipline and train the model

As the preprocessing of the data is done by the CPU, it often can become the botlleneck. This can be avoided with a good efficient pipeline.

## Caching

One method to create a efficient ETL pipeline is to use Data caching. Datasets/Tensors can be cached in 2 ways:
- Cachin in memory
- Cachine in Storage

### Caching in memory

Here the caching takes place in the RAM. 
we use `tf.data.Dataset.cache()` to cache the dataset so that the need to do pre-processing again and again for each epoch is not needed.

### Caching in Disk

Here we use the `tf.data.Dataset.cache(filename='<file name here>')` and can then store the cache on disk if the data is too big to e stored in the memory(RAM).

## Parallel APIs


The main APIs in `tf.data.Dataset` that we can use to take the advantage of parallelism to get the most out of our hardware are:
- map
- prefetch
- interleave

### AUTOTUNE

Most of the parallel APIs will need us to give values to parameters based on our system configuraion. These values can be hardcoded in most of the casses but in case of Cloud architectures, where the system are scalled all the time eihter horizontally or vertically, with changing system configuration, we cant keep chaging the code to keep providing the hardocded data. 

This is where the Autotune API helps us. We can set almost all the variables or parameters with Autotune so that the proper values to get the max utilization of the resources are carried by the TF itself.

```python
from tensorflow.data.experimental import AUTOTUNE
```

### Map

Most often we need to perform preprocessing like augmentation on the data before it is passed further towards the model. This can be a very expensive taks if we dont utilize all the cores of the CPU to parallelize the process.
eg:
```python
def augmentation(features):
    x = tf.image.random_flip_left_right(features['image'])
    x = tf.image.random_flip_up_down(x)
    x = tf.image.random_brightness(x, max_dedlta=0.1)
    x = tf.image.random_saturation(x, lower=0.75, upper=1.5)
    x = tf.image.random_hue(x, max_delta=0.15)
    x = tf.image.random_contrast(x, lower=0.75, upper=1.5)
    x = tf.image.resize(x, (224, 224))
    image = x/255.0
    return image, features['label']

# load dataset
dataset = tfds.load(cats_dogs, split=tfds.Split.TRAIN)
# how many cores of CPU do u have?
cores = 8
augmented_dataset = dataset.map(augmentation, num_parallel_calls=cores-1)
```

### Prefetch

We can use prefetch so that the data preprocessing for the next epoch can be done by the CPU while the epoch is being executed on the GPU. This way the CPU and GPU are being used simultaneouslt at the same time thus increasing the system throughput.

eg:
```python
preped_dataset = dataset.map(augmentation_fn).prefetch(AUTOTUNE)
```

### Inteleave

We can also try to optimize the data extraction so that the data that has been extracted( or loaded into memeory or is ready for further use) can be preprocessed so that the CPU resources are used properly. So basically here we are trying to parallelize the I/O and preprocessing(map) operation.

eg:
```python
files = tf.data.Dataset.list_files('regex to cover all files')

num_parallel_reads = 4

dataset = files.interleave(
    tf.data.TFRecordDataset,  # map function
    cycle_length=num_parallel_reads, 
    num_parallel_calls=tf.data.experimental.AUTOTUNE)

```

## Eg:

In [1]:
import tensorflow as tf
import multiprocessing
import tensorflow_datasets as tfds
import os

In [15]:
def create_model():
    input_layer = tf.keras.layers.Input(shape=(224, 224, 3))
    base_model = tf.keras.applications.MobileNetV2(input_tensor=input_layer,
                                                   weights='imagenet',
                                                   include_top=False)
    base_model.trainable = False
    x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
    x = tf.keras.layers.Dense(2, activation='softmax')(x)
    
    model = tf.keras.models.Model(inputs=input_layer, outputs=x)
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['acc'])
    return model

### Naive Approach

In [2]:
dataset_name = 'cats_vs_dogs'
filePath = f'{os.getcwd()}/dataset'
dataset, info = tfds.load(name=dataset_name, split=tfds.Split.TRAIN, with_info=True, data_dir=filePath)

[1mDownloading and preparing dataset cats_vs_dogs/4.0.0 (download: 786.68 MiB, generated: Unknown size, total: 786.68 MiB) to /home/sbjr/my_workspace/tldr_tensorflow/v2/TFDS_PIPELINE/dataset/cats_vs_dogs/4.0.0...[0m


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Completed...', layout=Layout(width='…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Size...', layout=Layout(width='20px'…







HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=23262.0), HTML(value='')))



Shuffling and writing examples to /home/sbjr/my_workspace/tldr_tensorflow/v2/TFDS_PIPELINE/dataset/cats_vs_dogs/4.0.0.incompleteQ6RU67/cats_vs_dogs-train.tfrecord


HBox(children=(FloatProgress(value=0.0, max=23262.0), HTML(value='')))

[1mDataset cats_vs_dogs downloaded and prepared to /home/sbjr/my_workspace/tldr_tensorflow/v2/TFDS_PIPELINE/dataset/cats_vs_dogs/4.0.0. Subsequent calls will reuse this data.[0m


In [3]:
def preproces(features):
    image = features['image']
    image = tf.image.resize(image, (224, 224))
    image = image/255.0
    return image, features['label']

In [4]:
train_dataset = dataset.map(preproces).batch(32)

In [20]:
model = create_model()
model.fit(train_dataset, epochs=5)

CPU times: user 21 µs, sys: 2 µs, total: 23 µs
Wall time: 47.7 µs




Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x7efc0f9ca2e0>

### Parallel Process - Pipeline Creation

#### Interleave

As we have already downlaoded the dataset we will now parallely read the data so as to efficiently use the resources

In [25]:
file_pattern = f'{os.getcwd()}/dataset/{dataset_name}/{info.version}/{dataset_name}-train.tfrecord*'
files = tf.data.Dataset.list_files(file_pattern)

In [26]:
files

<ShuffleDataset shapes: (), types: tf.string>

In [27]:
train_dataset_parallel = files.interleave(
    tf.data.TFRecordDataset,
    cycle_length=4,
    num_parallel_calls=tf.data.experimental.AUTOTUNE
)

#### Prase and Decode


The dataset files in TFRecords format are present in serialized format. We need to parse the data to load them properly as images and labels. For this
- we need to define the structure of each record
- itterate over the dataset parallely to generate the parsed data i.e. images and corresponding label data

In [28]:
format_description = {
    'image': tf.io.FixedLenFeature((), tf.string, ''), # '' is the default value to be assigned if it is empty
    'label': tf.io.FixedLenFeature((), tf.int64, -1) # we are converting the label into int in place of string here with default value -1
}

In [29]:
def parse_tfrecord(record):
    row = tf.io.parse_single_example(record, format_description)
    
    image = tf.io.decode_jpeg(row['image'], channels=3)
    label = row['label']
    
    # preprocessing the image
    image = tf.cast(image, tf.float32)
    image = tf.image.resize(image, (224, 224))
    image = image/255.0
    
    return image, label

In [30]:
cores = multiprocessing.cpu_count()
print(cores)

8


In [31]:
train_dataset_parallel = train_dataset_parallel.map(parse_tfrecord, num_parallel_calls=cores) # here we used cores as num_parallel_calls in place of AUTOTUNE

#### Cache the dataset

This is for better performance. Mostly useful if we have a big model and multiple epochs taking over the same dataset again and again

In [32]:
train_dataset_parallel = train_dataset_parallel.cache()

### Parallel Process - Actual loading of the dataset

In [33]:
train_dataset_parallel = train_dataset_parallel.shuffle(1024).batch(32)
train_dataset_parallel = train_dataset_parallel.prefetch(tf.data.experimental.AUTOTUNE)

In [39]:
model_parallel = create_model()
model_parallel.fit(train_dataset_parallel, epochs=5)





Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x7efa91f5e0d0>