# **TensorFlow Input Pipeline**

We need to use tf.data API framework in order to create a pipeine. Using PipeLine framework it is possible to conduct multiple functionalities the same time in our data(scaling, resizing etc.). For example : 

tf_dataset = tf.data.Dataset.list_files('images/*').map(process_img).filter(filter_func).map(lambda x: x/255)
where : 

- list_files : load images 
- map(process_img) : conver image content to numpy array. Extract lable from folder 
- filter(filter_func) : Filter Blurred images 
- map(lambda x: x/255) : Scaling

So, in fact we implemented all this preprocessing in one line. The next step would be to train the model : model.fit(tf_dataset)

Benefits : 

            1) Handle huge datasets by streaming them from disk using batching
            2) Apply tranformations to make dataset ready for model training
            

On the the first part I am building a simple input pipeline for my dogs-cats data.
On the second part I will Optimize the TensorFlow pipeline : prefetch & cache 

## **First Part**

In [1]:
import tensorflow as tf
import numpy as np 
import pandas as pd
from tensorflow.keras import datasets

You can download the images from : https://www.kaggle.com/datasets/samuelcortinhas/cats-and-dogs-image-classification

In [2]:
# Read the images 
images_ds = tf.data.Dataset.list_files('images/*/*', shuffle=False)


In [3]:
images_ds = images_ds.shuffle(200) # Shuffle the images

for file in images_ds.take(13):
    print(file.numpy())

b'images\\cats\\cat_345.jpg'
b'images\\cats\\cat_152.jpg'
b'images\\cats\\cat_103.jpg'
b'images\\cats\\cat_240.jpg'
b'images\\cats\\cat_109.jpg'
b'images\\cats\\cat_16.jpg'
b'images\\cats\\cat_263.jpg'
b'images\\cats\\cat_180.jpg'
b'images\\cats\\cat_223.jpg'
b'images\\cats\\cat_224.jpg'
b'images\\cats\\cat_361.jpg'
b'images\\cats\\cat_246.jpg'
b'images\\cats\\cat_281.jpg'


In [4]:
# Split data into train - test without using scikit-learn lib
class_names = ["cats", "dogs"]
image_count = len(images_ds)

train_size = int(image_count * 0.8)
train_ds = images_ds.take(train_size)
test_ds = images_ds.skip(train_size)

In [5]:
# Function for label extraction
def get_label(file_path):
    import os 
    return tf.strings.split(file_path,os.path.sep)[-2]

In [6]:
#
def process_image(file_path):
    label = get_label(file_path)
    img = tf.io.read_file(file_path)
    img = tf.image.decode_jpeg(img) # We need to decode the image
    img = tf.image.resize(img, [128, 128]) # Resize the image
    
    return img, label

In [7]:
# Extract the labels
for t in train_ds.take(4):
    print(t.numpy())

train_ds = train_ds.map(process_image)
for img, label in train_ds.take(4):
    print("Image : " ,img)
    print("Label:", label)

b'images\\cats\\cat_276.jpg'
b'images\\cats\\cat_395.jpg'
b'images\\cats\\cat_271.jpg'
b'images\\cats\\cat_35.jpg'
Image :  tf.Tensor(
[[[29.    33.    32.   ]
  [29.    33.    32.   ]
  [29.    33.    32.   ]
  ...
  [31.875 47.875 11.875]
  [31.875 47.75  12.125]
  [29.875 44.875 14.875]]

 [[29.    33.    32.   ]
  [29.    33.    32.   ]
  [29.    33.    32.   ]
  ...
  [33.5   49.5   12.5  ]
  [33.5   49.5   12.5  ]
  [31.5   46.5   14.5  ]]

 [[29.    33.    32.   ]
  [29.    33.    32.   ]
  [29.    33.    32.   ]
  ...
  [35.    51.    12.   ]
  [34.    50.    11.   ]
  [30.75  46.75  10.75 ]]

 ...

 [[32.    32.    32.   ]
  [32.    32.    32.   ]
  [33.    34.    29.   ]
  ...
  [29.    33.    32.   ]
  [31.    33.    30.   ]
  [35.25  34.    29.   ]]

 [[32.    32.    32.   ]
  [32.    32.    32.   ]
  [33.    34.    29.   ]
  ...
  [29.    33.    32.   ]
  [30.5   32.    27.   ]
  [43.75  34.25  29.5  ]]

 [[32.    32.    32.   ]
  [32.    32.    32.   ]
  [33.    34.    29

In [8]:
# Scale the data Function
def scale(image, label):
    return image/255, label

In [9]:
# Scale data
train_ds = train_ds.map(scale)
for image, label in train_ds.take(5):
    print("****Image: ", image.numpy()[0][0])
    print("****Label: ", label.numpy())

****Image:  [1. 1. 1.]
****Label:  b'cats'
****Image:  [0.9098039  0.94509804 0.972549  ]
****Label:  b'cats'
****Image:  [0.66926646 0.59867823 0.34769782]
****Label:  b'cats'
****Image:  [0.00392157 0.00392157 0.00392157]
****Label:  b'cats'
****Image:  [0.98579186 0.98971343 0.99755657]
****Label:  b'cats'


## **Second Part**

## Optimize TensorFlow Pipeline Performance

Using prefetch function of the pipeline we can train our model simutanelsy in CPU and GPU. 
A simple structure for example would be : 

- tf.data.Dataset.list_files('images/*').map(process_img).filter(filter_func).map(lambda x: x/255).prefetch(AUTOTUNE)

where : 

        - list_files : Load images from images folder
        - map : Convert image content to numpy array. Extract label from folder
        - filter : Filter blurred images 
        - map(lambda x: x/255) : Scaling
        - prefetch(AUTOTUNE) : prefetching

Before we train the model it would be a good practice to remove the redundant functionalities. We don't need to open the file each time and scale each time. In order to avoid that we can use "tf.data.Dataset.cache()". Using  cache we are saving time by not doing redundant things like opening the file on each epoch.

Following the above structure we can then fit the model. 

In [10]:
import tensorflow as tf
import time

In [37]:
# Measure the difference between using and not using prefetch 

class FileDataset(tf.data.Dataset):
    
    def read_files_in_batches(num_samples):
        # open file
        time.sleep(0.03) # Just mimic the delay on opening the file
        for sample_idx in range(num_samples):
            time.sleep(0.015)
            
            yield (sample_idx,) # yield is a generator here
            
    def __new__(cls, num_samples = 3):
        
        return tf.data.Dataset.from_generator(
            cls.read_files_in_batches,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args = (num_samples,)
        )

def benchmark(dataset, num_epochs = 2):
    for epoch_num in range(num_epochs):
        for sample in dataset:
            time.sleep(0.01) # Sleep time counting for GPU calculations 

    

In [38]:
%%timeit
benchmark(FileDataset())            

319 ms ± 39.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [40]:
%%timeit
benchmark(FileDataset().prefetch(tf.data.AUTOTUNE))

303 ms ± 18.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Comparing the above 2 implementations we can see that regarding time, the **prefetch** method has reduced the computational time

In [None]:
dataset = tf.data.Dataset.range(15)


dataset = dataset.map(lambda x: x**2)

for d in dataset:
    print(d.numpy())

In [45]:
# Use cache now : 

dataset  = dataset.cache()
list(dataset.as_numpy_iterator())
# Using cache the model it is not using the lambda function (map) again

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]

In [52]:
def mapped_function(s):
    tf.py_function(lambda : time.sleep(0.03), [], ())
    return s

In [56]:
%%timeit -n1 -r1
benchmark(FileDataset().map(mapped_function), 15)

5.27 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [57]:
%%timeit -n1 -r1
benchmark(FileDataset().map(mapped_function).cache(), 15)

1.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


Using **cache** we can see that the need time is less than half of the implementation without cache.
The reason for this behaviour is that we use the mapped function only for the first epochs. For the rest of the epochs the code uses the same data as the first epochs. This means that for higher num of epochs we are goiing to have larger differences in time.