## TENSORFLOW INPUT PIPELINE

FOR INPUT DATA PIPELINE, WE USE `tf.data` FRAMEWORK AND MAIN CLASS IN THAT IS `tf.data.Dataset`

FOR FILTER OPERATION TO REMOVE BLUR IMAGE, WE USE `tf_dataset.filter`

FOR SCALING OPERATION TO STANDARDIZE IMAGE, WE USE `tf_dataset.map(lambda x: x/255)`

FINALLY:  `model.fit(batch)`

BUILDING DATA PIPELINE USING TF

`tf_dataset = tf.data.Dataset.list_files('images/*').map(process_img).filter(filter_func).map(lambda x: x/255)`

TF_DATASET ANALYSIS

In [1]:
import tensorflow as tf

In [2]:
daily_sales_numbers = [23, 21, -19, 45, 98, 34, -45, 0, 56]

In [3]:
tf_dataset = tf.data.Dataset.from_tensor_slices(daily_sales_numbers)

Metal device set to: Apple M1


2022-12-12 13:15:08.959674: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2022-12-12 13:15:08.959760: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [4]:
for sales in tf_dataset:
    print(sales)

tf.Tensor(23, shape=(), dtype=int32)
tf.Tensor(21, shape=(), dtype=int32)
tf.Tensor(-19, shape=(), dtype=int32)
tf.Tensor(45, shape=(), dtype=int32)
tf.Tensor(98, shape=(), dtype=int32)
tf.Tensor(34, shape=(), dtype=int32)
tf.Tensor(-45, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(56, shape=(), dtype=int32)


ALL VALUES INSIDE LIST HAS BEEN CONVERTED INTO TENSORS.

In [5]:
for sales in tf_dataset:
    print(sales.numpy())

23
21
-19
45
98
34
-45
0
56


In [6]:
for sales in tf_dataset.as_numpy_iterator():
    print(sales)

23
21
-19
45
98
34
-45
0
56


In [7]:
for sales in tf_dataset.take(3):
    print(sales)

tf.Tensor(23, shape=(), dtype=int32)
tf.Tensor(21, shape=(), dtype=int32)
tf.Tensor(-19, shape=(), dtype=int32)


In [8]:
for sales in tf_dataset.take(3):
    print(sales.numpy())

23
21
-19


FILTER OPERATION

In [9]:
tf_dataset = tf_dataset.filter(lambda x: x>0)
for sales in tf_dataset.as_numpy_iterator():
    print(sales)

23
21
45
98
34
56


2022-12-12 13:15:09.033898: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


MAPPING

In [10]:
tf_dataset = tf_dataset.map(lambda x: x*72)
for sales in tf_dataset.as_numpy_iterator():
    print(sales)

1656
1512
3240
7056
2448
4032


SHUFFLE

In [11]:
tf_dataset = tf_dataset.shuffle(3)
for sales in tf_dataset.as_numpy_iterator():
    print(sales)

1656
3240
7056
1512
2448
4032


BATCHING

In [12]:
for sales_batch in tf_dataset.batch(3):
    print(sales_batch.numpy())

[3240 1656 7056]
[2448 4032 1512]


PERFORMING ALL THE ABOVE OPERATION IN ONE LINE USING TF

## ETL USING TF INPUT PIPELINE

In [13]:
tf_dataset = tf.data.Dataset.from_tensor_slices(daily_sales_numbers)

tf_dataset_yearly = tf_dataset.filter(lambda x: x > 0).map(lambda y: y*365).shuffle(2).batch(2)

for sales in tf_dataset_yearly.as_numpy_iterator():
    print(sales)

[7665 8395]
[16425 35770]
[20440 12410]


In [14]:
images_ds = tf.data.Dataset.list_files('images/*/*', shuffle=False)

In [15]:
for file in images_ds.take(5):
    print(file.numpy())

b'images/cats/cat.2000.jpg'
b'images/cats/cat.2001.jpg'
b'images/cats/cat.2002.jpg'
b'images/cats/cat.2003.jpg'
b'images/cats/cat.2004.jpg'


In [16]:
image_ds = images_ds.shuffle(2500)

for file in image_ds.take(5):
    print(file.numpy())

b'images/dogs/dog.2195.jpg'
b'images/cats/cat.2363.jpg'
b'images/dogs/dog.2335.jpg'
b'images/dogs/dog.2164.jpg'
b'images/dogs/dog.2292.jpg'


In [17]:
class_names = ['cat', 'dog']

In [18]:
image_count = len(image_ds)
image_count

1000

TRAIN-TEST SPLIT IN TF

In [19]:
train_size = int(image_count * 0.85)

train_ds = image_ds.take(train_size)
test_ds = image_ds.skip(train_size)

In [20]:
print(len(train_ds))
print(len(test_ds))

850
150


`take` will include all the file paths from the directory.

`skip` will include all the file paths ignored by take from the directory.

REMOVING THE LABELS FROM THE IMAGE PATH

In [21]:
s = 'images/cats/cat.2000.jpg'
s.split('/')[-2]

'cats'

In [22]:
import os
def get_label(file_path):
    return tf.strings.split(file_path, os.path.sep)[-2]

PROCESSING THE INPUT TRAINING DATASET

In [23]:
def process_image(file_path):
    label = get_label(file_path)
    img = tf.io.read_file(file_path)
    img = tf.image.decode_jpeg(img)
    img = tf.image.resize(img, [128,128])
    
    return img, label

In [24]:
for t in train_ds.take(3):
    print(t.numpy())

b'images/cats/cat.2290.jpg'
b'images/cats/cat.2024.jpg'
b'images/dogs/dog.2183.jpg'


In [25]:
train_ds = train_ds.map(process_image)
for img, label in train_ds.take(5):
    print("Image: ", img)
    print("Label: ", label)

Image:  tf.Tensor(
[[[ 16.453125  17.453125  11.453125]
  [ 18.359375  19.359375  13.359375]
  [ 19.        20.        14.      ]
  ...
  [ 33.265625  34.265625  28.265625]
  [ 28.359375  29.359375  23.359375]
  [ 78.24219   79.24219   73.24219 ]]

 [[ 16.453125  17.453125  11.453125]
  [ 18.359375  19.359375  13.359375]
  [ 19.        20.        14.      ]
  ...
  [ 33.265625  34.265625  28.265625]
  [ 28.359375  29.359375  23.359375]
  [ 82.85156   83.85156   77.85156 ]]

 [[ 16.453125  17.453125  11.453125]
  [ 18.359375  19.359375  13.359375]
  [ 19.        20.        14.      ]
  ...
  [ 33.265625  34.265625  28.265625]
  [ 28.359375  29.359375  23.359375]
  [ 84.39844   85.39844   79.39844 ]]

 ...

 [[ 80.06775   80.06775   70.24475 ]
  [ 76.41406   76.41406   66.55444 ]
  [ 77.734375  77.734375  67.838135]
  ...
  [ 39.872803  35.984985  20.797485]
  [ 32.859497  29.094238  15.445435]
  [ 40.171875  31.651001  22.278687]]

 [[ 77.546875  78.546875  72.546875]
  [ 73.75781   74.

SCALING

In [26]:
def scale(image, label):
    return image/255, label

In [27]:
train_ds = train_ds.map(scale)
for image, label in train_ds.take(5):
    print("Image: ", image.numpy()[0][0])
    print("Label: ", label.numpy())

Image:  [0.1797325  0.18365407 0.16404623]
Label:  b'dogs'
Image:  [0.00420544 0.00420544 0.01204858]
Label:  b'dogs'
Image:  [0. 0. 0.]
Label:  b'cats'
Image:  [0.48600367 0.2637151  0.15783273]
Label:  b'cats'
Image:  [0.63715124 0.5077395  0.29989636]
Label:  b'cats'


OPTIMIZE THE TENSORFLOW PIPELINE PERFORMANCE

OPTIMIZING THE PERFORMANCE MEANS REDUCING THE TIME TO TRAIN THE MODEL. IN THIS CASE WE MAKE OUR CPU AND GPU BOTH WORK TOGETHER. DURING THE TIME WHEN OUR GPU IS ENGAGED IN TRAINING THE MODEL, CPU WILL PREPARE THE NEXT BATCH TO BE READY FOR TRAINING.

FOR THIS OPERATION WE USE:

## PREFETCH

`tf.data.Dataset.prefetch(no. of batch we want to make ready in CPU)`

WE WANT MAKE OPTIMAL USE OF OUR HARDWARES. WE CAN LET TF DECIDE HOW TO DO IT. SO WE CAN PROVIDE `AUTOTUNE`

`tf.data.Dataset.prefetch(AUTOTUNE)`

## CACHING

WE HAVE A SERIES OF OPERATIONS TO PERFORM IN DATA PREPROCESSING STAGE. IN EVERY EPOCH WHEN WE PERFORM THE TRAINING, EVERYTIME WE DO ALL THE OPERATION AGAIN LIKE MAP, FILTER ETC. TO AVOID THIS WE USE CACHING. CACHING SAVES TIME AND RESOURCES.

LETS SEE HOW TRAINING NORMALLY HAPPENS

In [28]:
import time

In [29]:
class FileDataset(tf.data.Dataset):
    def read_files_in_batches(num_samples):
        #open file
        time.sleep(0.03)
        for sample_idx in range(num_samples):
            time.sleep(0.015)
            yield(sample_idx,)
            
    def __new__(cls, num_samples=5):
        return tf.data.Dataset.from_generator(
            cls.read_files_in_batches,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args = (num_samples,))

In [30]:
def benchmark(dataset, num_epochs=100):
    for epoch_num in range(num_epochs):
        for sample in dataset:
            time.sleep(0.01)

In [31]:
%time
benchmark(FileDataset())

CPU times: user 1 µs, sys: 1 µs, total: 2 µs
Wall time: 2.86 µs


NOW WE WILL USE PREFETCH

In [32]:
%time
benchmark(FileDataset().prefetch(tf.data.AUTOTUNE))

CPU times: user 3 µs, sys: 3 µs, total: 6 µs
Wall time: 13.1 µs


In [33]:
%time
benchmark(FileDataset().prefetch(1))

CPU times: user 3 µs, sys: 3 µs, total: 6 µs
Wall time: 11 µs


LETS USE CACHE

In [38]:
dataset = tf.data.Dataset.range(5)
for d in dataset:
    print(d.numpy())

0
1
2
3
4


In [39]:
dataset = dataset.map(lambda x: x ** 2)
for d in dataset:
    print(d.numpy())

0
1
4
9
16


In [42]:
dataset = dataset.cache()

for d in dataset.as_numpy_iterator():
    print(d)

0
1
4
9
16


In [43]:
list(dataset.as_numpy_iterator())

[0, 1, 4, 9, 16]

It is reading the data from the cache

In [44]:
def mapped_function(s):
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

In [48]:
%time

benchmark(FileDataset().map(mapped_function), 5)

CPU times: user 3 µs, sys: 2 µs, total: 5 µs
Wall time: 10 µs


PERFORMANCE WITH CACHE

In [49]:
%time

benchmark(FileDataset().map(mapped_function).cache(), 5)

CPU times: user 5 µs, sys: 1 µs, total: 6 µs
Wall time: 11 µs
