In [6]:
import tensorflow as tf
import time
import os

In [15]:
DATA_DIR = 'data'
BATCH_SIZE = 64
IMAGE_SIZE = (128, 128)
AUTOTUNE = tf.data.AUTOTUNE # Key for performance
AUTOTUNE

-1

In [8]:

def build_tf_data_pipeline(data_dir, batch_size):
    """Builds an optimized tf.data pipeline."""
    # 1. Create a dataset of file paths (fast)
    ds = tf.data.Dataset.list_files(os.path.join(data_dir, '*/*.jpg'), shuffle=True)

    def process_path(file_path):
        # Decode label from file path
        parts = tf.strings.split(file_path, os.path.sep)
        label_str = parts[-2]
        label = tf.where(tf.equal(label_str, 'cats'), 0, 1)

        # 2. Read and decode image in parallel
        img = tf.io.read_file(file_path)
        img = tf.io.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, IMAGE_SIZE)
        return img, label

    # 3. Define augmentation as a Keras layer for efficiency
    augmentation_layer = tf.keras.Sequential([
        tf.keras.layers.RandomFlip("horizontal"),
        tf.keras.layers.RandomBrightness(factor=0.2),
    ])

    # 4. Build the full, parallelized pipeline
    ds = ds.map(process_path, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(batch_size)

    # Apply augmentations on the batch (can be GPU-accelerated)
    ds = ds.map(lambda x, y: (augmentation_layer(x), y), num_parallel_calls=AUTOTUNE)
    ds = ds.map(lambda x, y: (x / 255.0, y), num_parallel_calls=AUTOTUNE) # Normalize
    
    # 5. Overlap CPU preprocessing with GPU training
    ds = ds.prefetch(buffer_size=AUTOTUNE)

    return ds


In [13]:
root_path = "/Users/tharhtet/Documents/github/ML-in-Prod-batch-3/6_deep_learning/tf_best_practices/cats_and_dogs_filtered"
DATA_DIR = root_path+"/train"
BATCH_SIZE = 32

print(f"Data directory: {DATA_DIR}")

dataset = build_tf_data_pipeline(DATA_DIR, BATCH_SIZE)


Data directory: /Users/tharhtet/Documents/github/ML-in-Prod-batch-3/6_deep_learning/tf_best_practices/cats_and_dogs_filtered/train


In [14]:
print("Starting tf.data Loader benchmark...")
start_time = time.time()
for i, (images, labels) in enumerate(dataset):
    #print(images.shape, labels.shape)
    if i >= 500: 
        break
end_time = time.time()

print(f"tf.data Loader: Processed 500 batches in {end_time - start_time:.4f} seconds.")

Starting tf.data Loader benchmark...


2025-09-12 22:26:27.901043: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


tf.data Loader: Processed 500 batches in 0.6170 seconds.


2025-09-12 22:26:28.384295: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
