# 02 data pipelines tfrecords
**Location: TensorVerseHub/notebooks/01_tensorflow_foundations/02_data_pipelines_tfrecords.ipynb**

In [None]:
import tensorflow as tf
import numpy as np
print(f"TensorFlow version: {tf.__version__}")

# TensorFlow Data Pipelines & TFRecords

Welcome to the essential guide for building efficient data pipelines in TensorFlow! This notebook covers `tf.data` API, TFRecords format, and integration with `tf.keras` preprocessing layers. You'll learn to create scalable, performant data pipelines that can handle large datasets efficiently.

## Learning Objectives
- Master the `tf.data` API for building input pipelines
- Understand and create TFRecord files for efficient data storage
- Implement data preprocessing with `tf.keras` preprocessing layers
- Optimize data loading performance with prefetching, caching, and parallelization
- Handle different data types: images, text, and structured data
- Create reusable preprocessing pipelines

---

## 1. Introduction to tf.data API

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
import json
from PIL import Image
import pandas as pd

print(f"TensorFlow version: {tf.__version__}")

# Enable mixed precision for better performance
policy = tf.keras.mixed_precision.Policy('mixed_float16')
tf.keras.mixed_precision.set_global_policy(policy)

In [None]:
# Basic tf.data.Dataset creation methods
# 1. From tensors
tensor_dataset = tf.data.Dataset.from_tensor_slices([1, 2, 3, 4, 5])
print("Dataset from tensors:")
for item in tensor_dataset:
    print(f"  {item.numpy()}")

# 2. From numpy arrays
numpy_data = np.array([[1, 2], [3, 4], [5, 6]], dtype=np.float32)
numpy_dataset = tf.data.Dataset.from_tensor_slices(numpy_data)
print("\nDataset from numpy:")
for item in numpy_dataset:
    print(f"  {item.numpy()}")

# 3. From generator function
def data_generator():
    for i in range(5):
        yield i ** 2

generator_dataset = tf.data.Dataset.from_generator(
    data_generator, 
    output_signature=tf.TensorSpec(shape=(), dtype=tf.int32)
)
print("\nDataset from generator:")
for item in generator_dataset:
    print(f"  {item.numpy()}")

In [None]:
# Dataset transformations - the power of tf.data
base_dataset = tf.data.Dataset.range(10)

# Map transformation
squared_dataset = base_dataset.map(lambda x: x ** 2)
print("Squared values:")
print(list(squared_dataset.as_numpy_iterator()))

# Filter transformation
even_dataset = base_dataset.filter(lambda x: x % 2 == 0)
print("\nEven values:")
print(list(even_dataset.as_numpy_iterator()))

# Batch transformation
batched_dataset = base_dataset.batch(3)
print("\nBatched data:")
for batch in batched_dataset:
    print(f"  {batch.numpy()}")

# Shuffle transformation
shuffled_dataset = base_dataset.shuffle(buffer_size=5)
print("\nShuffled data:")
print(list(shuffled_dataset.as_numpy_iterator()))

In [None]:
# Complex transformations and chaining
# Create a more complex dataset
features = np.random.randn(100, 4).astype(np.float32)
labels = np.random.randint(0, 3, size=(100,))

# Create dataset from features and labels
dataset = tf.data.Dataset.from_tensor_slices((features, labels))

# Apply multiple transformations
processed_dataset = (dataset
                    .shuffle(buffer_size=100)
                    .map(lambda x, y: (tf.nn.l2_normalize(x, axis=0), y))  # Normalize features
                    .filter(lambda x, y: tf.reduce_sum(tf.abs(x)) > 0.5)  # Filter samples
                    .batch(8)
                    .prefetch(tf.data.AUTOTUNE))

print("Processed dataset structure:")
for batch_features, batch_labels in processed_dataset.take(1):
    print(f"  Batch features shape: {batch_features.shape}")
    print(f"  Batch labels shape: {batch_labels.shape}")
    print(f"  Feature stats - mean: {tf.reduce_mean(batch_features):.3f}, std: {tf.math.reduce_std(batch_features):.3f}")

## 2. Performance Optimization Techniques

In [None]:
# Demonstrating performance optimizations
def create_dummy_dataset(num_samples=1000):
    """Create a dummy dataset for performance testing"""
    features = tf.random.normal((num_samples, 100))
    labels = tf.random.uniform((num_samples,), maxval=10, dtype=tf.int32)
    return tf.data.Dataset.from_tensor_slices((features, labels))

# Expensive preprocessing function
def expensive_preprocessing(features, labels):
    """Simulate expensive preprocessing"""
    # Simulate computation time
    features = tf.nn.relu(features)
    features = tf.nn.l2_normalize(features, axis=1)
    processed_features = tf.reduce_mean(tf.reshape(features, (-1, 10, 10)), axis=2)
    return processed_features, labels

# Compare different optimization strategies
import time

def benchmark_dataset(dataset, num_batches=50):
    """Benchmark dataset iteration time"""
    start_time = time.time()
    for i, (features, labels) in enumerate(dataset):
        if i >= num_batches:
            break
        # Simulate model consumption
        _ = tf.reduce_mean(features)
    return time.time() - start_time

# 1. Baseline (no optimization)
baseline_dataset = (create_dummy_dataset()
                   .map(expensive_preprocessing)
                   .batch(32))

# 2. With caching
cached_dataset = (create_dummy_dataset()
                 .cache()
                 .map(expensive_preprocessing)
                 .batch(32))

# 3. With prefetching
prefetch_dataset = (create_dummy_dataset()
                   .map(expensive_preprocessing)
                   .batch(32)
                   .prefetch(tf.data.AUTOTUNE))

# 4. With parallel mapping
parallel_dataset = (create_dummy_dataset()
                   .map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE)
                   .batch(32)
                   .prefetch(tf.data.AUTOTUNE))

# 5. Full optimization
optimized_dataset = (create_dummy_dataset()
                    .cache()
                    .map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE)
                    .batch(32)
                    .prefetch(tf.data.AUTOTUNE))

print("Performance comparison:")
print(f"Baseline time: {benchmark_dataset(baseline_dataset):.2f} seconds")
print(f"With caching: {benchmark_dataset(cached_dataset):.2f} seconds")
print(f"With prefetching: {benchmark_dataset(prefetch_dataset):.2f} seconds")
print(f"With parallel mapping: {benchmark_dataset(parallel_dataset):.2f} seconds")
print(f"Fully optimized: {benchmark_dataset(optimized_dataset):.2f} seconds")

In [None]:
# Advanced performance patterns
def create_interleaved_dataset():
    """Demonstrate dataset interleaving for multiple data sources"""
    
    # Create multiple data sources
    dataset1 = tf.data.Dataset.range(100).map(lambda x: f"source1_{x}")
    dataset2 = tf.data.Dataset.range(100).map(lambda x: f"source2_{x}")
    dataset3 = tf.data.Dataset.range(100).map(lambda x: f"source3_{x}")
    
    # Method 1: Simple concatenation
    concat_dataset = dataset1.concatenate(dataset2).concatenate(dataset3)
    
    # Method 2: Interleaved sampling
    interleaved_dataset = tf.data.Dataset.sample_from_datasets(
        [dataset1, dataset2, dataset3],
        weights=[0.5, 0.3, 0.2]
    )
    
    print("Concatenated dataset (first 10):")
    for i, item in enumerate(concat_dataset.take(10)):
        print(f"  {item.numpy().decode()}")
    
    print("\nInterleaved dataset (first 20):")
    for i, item in enumerate(interleaved_dataset.take(20)):
        print(f"  {item.numpy().decode()}")

create_interleaved_dataset()

## 3. Working with TFRecords

In [None]:
# Creating TFRecord files
def create_tfrecord_examples():
    """Create sample data for TFRecord demonstration"""
    
    # Generate synthetic image data
    images = np.random.randint(0, 255, size=(10, 64, 64, 3), dtype=np.uint8)
    labels = np.random.randint(0, 5, size=(10,))
    texts = [f"Sample text description {i}" for i in range(10)]
    
    return images, labels, texts

def serialize_example(image, label, text):
    """Create a tf.train.Example message"""
    
    # Helper functions for different data types
    def _bytes_feature(value):
        """Returns a bytes_list from a string / byte."""
        if isinstance(value, type(tf.constant(0))):
            value = value.numpy()
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

    def _float_feature(value):
        """Returns a float_list from a float / double."""
        return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

    def _int64_feature(value):
        """Returns an int64_list from a bool / enum / int / uint."""
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
    
    # Convert image to bytes
    image_string = tf.io.serialize_tensor(image).numpy()
    
    # Create feature dictionary
    feature = {
        'image': _bytes_feature(image_string),
        'label': _int64_feature(label),
        'text': _bytes_feature(text.encode('utf-8')),
        'height': _int64_feature(image.shape[0]),
        'width': _int64_feature(image.shape[1]),
        'channels': _int64_feature(image.shape[2])
    }
    
    # Create Example proto
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

# Write TFRecord file
def write_tfrecords(filename, images, labels, texts):
    """Write data to TFRecord file"""
    
    with tf.io.TFRecordWriter(filename) as writer:
        for image, label, text in zip(images, labels, texts):
            example = serialize_example(image, label, text)
            writer.write(example)
    
    print(f"TFRecord file created: {filename}")

# Create and write TFRecord
images, labels, texts = create_tfrecord_examples()
tfrecord_filename = "sample_data.tfrecord"
write_tfrecords(tfrecord_filename, images, labels, texts)

# Verify file creation
if os.path.exists(tfrecord_filename):
    file_size = os.path.getsize(tfrecord_filename)
    print(f"TFRecord file size: {file_size} bytes")

In [None]:
# Reading TFRecord files
def parse_tfrecord_fn(example):
    """Parse a single TFRecord example"""
    
    feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.int64),
        'text': tf.io.FixedLenFeature([], tf.string),
        'height': tf.io.FixedLenFeature([], tf.int64),
        'width': tf.io.FixedLenFeature([], tf.int64),
        'channels': tf.io.FixedLenFeature([], tf.int64),
    }
    
    # Parse the input tf.train.Example proto
    example = tf.io.parse_single_example(example, feature_description)
    
    # Decode the image
    image = tf.io.parse_tensor(example['image'], out_type=tf.uint8)
    image = tf.reshape(image, [example['height'], example['width'], example['channels']])
    image = tf.cast(image, tf.float32) / 255.0  # Normalize to [0, 1]
    
    # Extract other features
    label = tf.cast(example['label'], tf.int32)
    text = example['text']
    
    return {'image': image, 'text': text}, label

# Create dataset from TFRecord
def load_tfrecord_dataset(filename, batch_size=4):
    """Load and preprocess TFRecord dataset"""
    
    raw_dataset = tf.data.TFRecordDataset(filename)
    
    dataset = (raw_dataset
              .map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
              .cache()
              .shuffle(buffer_size=100)
              .batch(batch_size)
              .prefetch(tf.data.AUTOTUNE))
    
    return dataset

# Load and inspect the dataset
tfrecord_dataset = load_tfrecord_dataset(tfrecord_filename)

print("TFRecord dataset structure:")
for features, labels in tfrecord_dataset.take(1):
    print(f"  Image batch shape: {features['image'].shape}")
    print(f"  Text batch shape: {features['text'].shape}")
    print(f"  Label batch shape: {labels.shape}")
    print(f"  Sample text: {features['text'][0].numpy().decode()}")

# Visualize some examples
def visualize_tfrecord_data(dataset, num_examples=4):
    """Visualize data from TFRecord dataset"""
    
    plt.figure(figsize=(15, 8))
    
    for features, labels in dataset.take(1):
        for i in range(min(num_examples, features['image'].shape[0])):
            plt.subplot(2, 2, i + 1)
            plt.imshow(features['image'][i].numpy())
            plt.title(f"Label: {labels[i].numpy()}")
            plt.text(0.5, -0.1, features['text'][i].numpy().decode()[:30] + "...", 
                    ha='center', transform=plt.gca().transAxes)
            plt.axis('off')
    
    plt.tight_layout()
    plt.show()

visualize_tfrecord_data(tfrecord_dataset)

In [None]:
# Advanced TFRecord features and sharding
def write_sharded_tfrecords(base_filename, images, labels, texts, num_shards=3):
    """Write data to multiple TFRecord shards"""
    
    samples_per_shard = len(images) // num_shards
    
    for shard_idx in range(num_shards):
        shard_filename = f"{base_filename}-{shard_idx:05d}-of-{num_shards:05d}.tfrecord"
        
        start_idx = shard_idx * samples_per_shard
        if shard_idx == num_shards - 1:
            # Last shard gets remaining samples
            end_idx = len(images)
        else:
            end_idx = (shard_idx + 1) * samples_per_shard
        
        shard_images = images[start_idx:end_idx]
        shard_labels = labels[start_idx:end_idx]
        shard_texts = texts[start_idx:end_idx]
        
        write_tfrecords(shard_filename, shard_images, shard_labels, shard_texts)

# Create sharded TFRecords
write_sharded_tfrecords("sharded_data", images, labels, texts)

# Read from multiple sharded files
def load_sharded_tfrecords(file_pattern, batch_size=4):
    """Load dataset from multiple TFRecord shards"""
    
    # Find all matching files
    file_list = tf.io.matching_files(file_pattern)
    
    # Create dataset from multiple files
    dataset = tf.data.Dataset.from_tensor_slices(file_list)
    
    # Interleave reading from multiple files
    dataset = dataset.interleave(
        tf.data.TFRecordDataset,
        cycle_length=tf.data.AUTOTUNE,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    
    # Apply parsing and preprocessing
    dataset = (dataset
              .map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
              .cache()
              .shuffle(buffer_size=100)
              .batch(batch_size)
              .prefetch(tf.data.AUTOTUNE))
    
    return dataset

# Load sharded dataset
sharded_dataset = load_sharded_tfrecords("sharded_data-*-of-*.tfrecord")

print("Sharded dataset loaded successfully")
print(f"Number of batches: {len(list(sharded_dataset))}")

## 4. tf.keras Preprocessing Integration

In [None]:
# Text preprocessing with tf.keras
text_data = [
    "The quick brown fox jumps over the lazy dog",
    "Machine learning is transforming artificial intelligence",
    "TensorFlow makes deep learning accessible to everyone",
    "Data preprocessing is crucial for model performance",
    "Neural networks learn complex patterns from data"
]

# TextVectorization layer
text_vectorizer = tf.keras.utils.TextVectorization(
    max_tokens=1000,
    output_sequence_length=10,
    output_mode='int'
)

# Adapt the layer to the text data
text_vectorizer.adapt(text_data)

# Create dataset and apply preprocessing
text_dataset = tf.data.Dataset.from_tensor_slices(text_data)
vectorized_dataset = text_dataset.map(text_vectorizer)

print("Text preprocessing results:")
print("Original texts:")
for text in text_data[:3]:
    print(f"  '{text}'")

print("\nVectorized texts:")
for vectorized in vectorized_dataset.take(3):
    print(f"  {vectorized.numpy()}")

print(f"\nVocabulary size: {text_vectorizer.vocabulary_size()}")
print("Sample vocabulary:")
vocab = text_vectorizer.get_vocabulary()[:20]
for i, word in enumerate(vocab):
    print(f"  {i}: '{word}'")

In [None]:
# Image preprocessing with tf.keras
# Create sample image data
sample_images = tf.random.uniform((5, 100, 100, 3), maxval=255, dtype=tf.int32)
sample_images = tf.cast(sample_images, tf.uint8)

# Image preprocessing layers
image_preprocessing = tf.keras.Sequential([
    tf.keras.layers.Rescaling(1./255),  # Normalize to [0, 1]
    tf.keras.layers.Resizing(224, 224),  # Resize to standard size
    tf.keras.layers.RandomFlip("horizontal"),  # Data augmentation
    tf.keras.layers.RandomRotation(0.1),  # Random rotation
    tf.keras.layers.RandomZoom(0.1),  # Random zoom
])

# Apply preprocessing
def preprocess_images(images):
    """Apply preprocessing pipeline to images"""
    return image_preprocessing(images, training=True)

# Create image dataset
image_dataset = tf.data.Dataset.from_tensor_slices(sample_images)
preprocessed_dataset = image_dataset.map(preprocess_images)

# Visualize preprocessing effects
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# Original images
image_dataset_iter = iter(image_dataset)
preprocessed_iter = iter(preprocessed_dataset)

for i in range(3):
    # Original image
    original = next(image_dataset_iter)
    axes[0, i].imshow(original.numpy())
    axes[0, i].set_title(f"Original {i+1}")
    axes[0, i].axis('off')
    
    # Preprocessed image
    preprocessed = next(preprocessed_iter)
    axes[1, i].imshow(preprocessed.numpy())
    axes[1, i].set_title(f"Preprocessed {i+1}")
    axes[1, i].axis('off')

plt.tight_layout()
plt.show()

In [None]:
# Feature engineering with tf.keras preprocessing
# Create structured data
structured_data = {
    'numeric_feature_1': np.random.randn(100),
    'numeric_feature_2': np.random.randn(100) * 10 + 5,
    'categorical_feature_1': np.random.choice(['A', 'B', 'C', 'D'], 100),
    'categorical_feature_2': np.random.choice(['X', 'Y', 'Z'], 100),
    'text_feature': [f"Text sample {i} with random content" for i in range(100)]
}

# Create preprocessing layers for different feature types
numeric_normalizer_1 = tf.keras.utils.Normalization()
numeric_normalizer_2 = tf.keras.utils.Normalization()

categorical_encoder_1 = tf.keras.utils.StringLookup(output_mode='one_hot')
categorical_encoder_2 = tf.keras.utils.StringLookup(output_mode='one_hot')

text_encoder = tf.keras.utils.TextVectorization(
    max_tokens=100,
    output_sequence_length=5,
    output_mode='tf_idf'
)

# Adapt preprocessing layers
numeric_normalizer_1.adapt(structured_data['numeric_feature_1'])
numeric_normalizer_2.adapt(structured_data['numeric_feature_2'])
categorical_encoder_1.adapt(structured_data['categorical_feature_1'])
categorical_encoder_2.adapt(structured_data['categorical_feature_2'])
text_encoder.adapt(structured_data['text_feature'])

# Create preprocessing function
def preprocess_structured_data(features):
    """Comprehensive preprocessing for structured data"""
    
    processed_features = {}
    
    # Numeric features
    processed_features['norm_num_1'] = numeric_normalizer_1(features['numeric_feature_1'])
    processed_features['norm_num_2'] = numeric_normalizer_2(features['numeric_feature_2'])
    
    # Categorical features
    processed_features['cat_1_encoded'] = categorical_encoder_1(features['categorical_feature_1'])
    processed_features['cat_2_encoded'] = categorical_encoder_2(features['categorical_feature_2'])
    
    # Text features
    processed_features['text_encoded'] = text_encoder(features['text_feature'])
    
    return processed_features

# Create dataset from structured data
structured_dataset = tf.data.Dataset.from_tensor_slices(structured_data)
preprocessed_structured = structured_dataset.map(preprocess_structured_data)

# Inspect preprocessed structured data
print("Structured data preprocessing results:")
for sample in preprocessed_structured.take(1):
    for key, value in sample.items():
        print(f"{key}: shape={value.shape}, dtype={value.dtype}")
        if 'encoded' in key:
            print(f"  Sample values: {value.numpy()[:5]}...")
        else:
            print(f"  Sample values: {value.numpy()}")

## 5. Custom Dataset Classes and Complex Pipelines

In [None]:
# Custom dataset class for complex data loading
class CustomImageDataset:
    """Custom dataset class for image data with metadata"""
    
    def __init__(self, image_paths, labels, metadata=None):
        self.image_paths = image_paths
        self.labels = labels
        self.metadata = metadata or [{}] * len(image_paths)
        
    def __len__(self):
        return len(self.image_paths)
    
    def create_tf_dataset(self, batch_size=32, shuffle=True, augment=True):
        """Create tf.data.Dataset from the custom dataset"""
        
        # Create dataset from paths and labels
        dataset = tf.data.Dataset.from_tensor_slices({
            'image_path': self.image_paths,
            'label': self.labels,
            'metadata': self.metadata
        })
        
        if shuffle:
            dataset = dataset.shuffle(buffer_size=len(self.image_paths))
        
        # Apply preprocessing
        dataset = dataset.map(
            self._preprocess_function,
            num_parallel_calls=tf.data.AUTOTUNE
        )
        
        if augment:
            dataset = dataset.map(
                self._augment_function,
                num_parallel_calls=tf.data.AUTOTUNE
            )
        
        dataset = (dataset
                  .batch(batch_size)
                  .prefetch(tf.data.AUTOTUNE))
        
        return dataset
    
    def _preprocess_function(self, sample):
        """Preprocess a single sample"""
        # For demo, create random image instead of loading from path
        image = tf.random.uniform((224, 224, 3), maxval=255, dtype=tf.int32)
        image = tf.cast(image, tf.float32) / 255.0
        
        return {
            'image': image,
            'label': sample['label'],
            'metadata': sample['metadata']
        }
    
    def _augment_function(self, sample):
        """Apply data augmentation"""
        image = sample['image']
        
        # Random augmentations
        if tf.random.uniform([]) > 0.5:
            image = tf.image.flip_left_right(image)
        
        if tf.random.uniform([]) > 0.5:
            image = tf.image.random_brightness(image, 0.2)
        
        return {
            'image': image,
            'label': sample['label'],
            'metadata': sample['metadata']
        }

# Create custom dataset instance
dummy_paths = [f"image_{i}.jpg" for i in range(50)]
dummy_labels = np.random.randint(0, 5, 50)
dummy_metadata = [{'source': f'camera_{i%3}', 'quality': np.random.choice(['high', 'medium', 'low'])} 
                 for i in range(50)]

custom_dataset = CustomImageDataset(dummy_paths, dummy_labels, dummy_metadata)
tf_dataset = custom_dataset.create_tf_dataset(batch_size=8)

print("Custom dataset created successfully")
print("Dataset structure:")
for batch in tf_dataset.take(1):
    print(f"  Image batch shape: {batch['image'].shape}")
    print(f"  Label batch shape: {batch['label'].shape}")
    print(f"  Metadata batch: {len(batch['metadata'])}")
    print(f"  Sample metadata: {batch['metadata'][0]}")

In [None]:
# Multi-modal dataset pipeline
class MultiModalDataset:
    """Dataset combining images, text, and structured features"""
    
    def __init__(self):
        # Initialize preprocessing layers
        self.text_vectorizer = tf.keras.utils.TextVectorization(
            max_tokens=1000,
            output_sequence_length=50
        )
        
        self.image_preprocessor = tf.keras.Sequential([
            tf.keras.layers.Rescaling(1./255),
            tf.keras.layers.Resizing(224, 224)
        ])
        
        self.numeric_normalizer = tf.keras.utils.Normalization()
        
    def prepare_preprocessing_layers(self, text_data, numeric_data):
        """Adapt preprocessing layers to data"""
        self.text_vectorizer.adapt(text_data)
        self.numeric_normalizer.adapt(numeric_data)
    
    def create_multimodal_dataset(self, images, texts, numeric_features, labels, batch_size=16):
        """Create multi-modal dataset"""
        
        # Prepare preprocessing layers
        self.prepare_preprocessing_layers(texts, numeric_features)
        
        # Create dataset from all modalities
        dataset = tf.data.Dataset.from_tensor_slices({
            'image': images,
            'text': texts,
            'numeric': numeric_features,
            'label': labels
        })
        
        # Apply preprocessing
        dataset = dataset.map(
            self._preprocess_multimodal,
            num_parallel_calls=tf.data.AUTOTUNE
        )
        
        # Shuffle and batch
        dataset = (dataset
                  .shuffle(buffer_size=1000)
                  .batch(batch_size)
                  .prefetch(tf.data.AUTOTUNE))
        
        return dataset
    
    def _preprocess_multimodal(self, sample):
        """Preprocess all modalities"""
        return {
            'image': self.image_preprocessor(sample['image']),
            'text': self.text_vectorizer(sample['text']),
            'numeric': self.numeric_normalizer(sample['numeric']),
            'label': sample['label']
        }

# Generate multi-modal data
num_samples = 100
mm_images = tf.random.uniform((num_samples, 128, 128, 3), maxval=255, dtype=tf.int32)
mm_images = tf.cast(mm_images, tf.uint8)

mm_texts = [f"This is sample text description number {i} with content" for i in range(num_samples)]
mm_numeric = np.random.randn(num_samples, 5).astype(np.float32)
mm_labels = np.random.randint(0, 3, num_samples)

# Create multi-modal dataset
mm_dataset_creator = MultiModalDataset()
mm_dataset = mm_dataset_creator.create_multimodal_dataset(
    mm_images, mm_texts, mm_numeric, mm_labels
)

print("Multi-modal dataset structure:")
for batch in mm_dataset.take(1):
    print(f"  Image shape: {batch['image'].shape}")
    print(f"  Text shape: {batch['text'].shape}")
    print(f"  Numeric shape: {batch['numeric'].shape}")
    print(f"  Label shape: {batch['label'].shape}")

# Visualize multi-modal batch
def visualize_multimodal_batch(dataset):
    """Visualize a batch from multi-modal dataset"""
    
    fig, axes = plt.subplots(2, 4, figsize=(16, 8))
    
    for batch in dataset.take(1):
        for i in range(min(4, batch['image'].shape[0])):
            # Image
            axes[0, i].imshow(batch['image'][i].numpy())
            axes[0, i].set_title(f"Label: {batch['label'][i].numpy()}")
            axes[0, i].axis('off')
            
            # Numeric features visualization
            axes[1, i].bar(range(batch['numeric'].shape[1]), batch['numeric'][i].numpy())
            axes[1, i].set_title(f"Numeric Features")
            axes[1, i].set_xlabel("Feature Index")
    
    plt.tight_layout()
    plt.show()
    
    # Show text samples
    for batch in dataset.take(1):
        print("\nText samples (first 3 tokens of first 3 samples):")
        for i in range(min(3, batch['text'].shape[0])):
            tokens = batch['text'][i].numpy()[:3]
            print(f"  Sample {i}: {tokens}")

visualize_multimodal_batch(mm_dataset)

## 6. Advanced Data Pipeline Patterns

In [None]:
# Cross-validation data splitting
def create_cv_datasets(dataset, k_folds=5):
    """Create k-fold cross-validation datasets"""
    
    # Convert dataset to list for splitting
    data_list = list(dataset.unbatch())
    dataset_size = len(data_list)
    fold_size = dataset_size // k_folds
    
    cv_datasets = []
    
    for fold in range(k_folds):
        # Calculate fold boundaries
        start_idx = fold * fold_size
        end_idx = start_idx + fold_size if fold < k_folds - 1 else dataset_size
        
        # Create validation set for this fold
        val_data = data_list[start_idx:end_idx]
        
        # Create training set (everything else)
        train_data = data_list[:start_idx] + data_list[end_idx:]
        
        # Convert back to tf.data.Dataset
        train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
        val_dataset = tf.data.Dataset.from_tensor_slices(val_data)
        
        cv_datasets.append({
            'train': train_dataset,
            'val': val_dataset,
            'fold': fold
        })
    
    return cv_datasets

# Example with simple dataset
simple_data = list(range(100))
simple_labels = np.random.randint(0, 3, 100)
simple_dataset = tf.data.Dataset.from_tensor_slices((simple_data, simple_labels))

cv_datasets = create_cv_datasets(simple_dataset)

print("Cross-validation datasets created:")
for i, cv_data in enumerate(cv_datasets):
    train_size = len(list(cv_data['train']))
    val_size = len(list(cv_data['val']))
    print(f"  Fold {i}: Train size={train_size}, Val size={val_size}")

In [None]:
# Data pipeline with error handling and monitoring
class MonitoredDataPipeline:
    """Data pipeline with comprehensive monitoring and error handling"""
    
    def __init__(self):
        self.metrics = {
            'samples_processed': 0,
            'errors': 0,
            'processing_times': []
        }
        
    def create_monitored_pipeline(self, raw_data, batch_size=32):
        """Create pipeline with monitoring and error handling"""
        
        # Create base dataset
        dataset = tf.data.Dataset.from_tensor_slices(raw_data)
        
        # Add monitoring wrapper
        dataset = dataset.map(
            self._monitor_and_process,
            num_parallel_calls=tf.data.AUTOTUNE
        )
        
        # Filter out failed samples (None values)
        dataset = dataset.filter(lambda x: tf.not_equal(tf.size(x), 0))
        
        # Batch and optimize
        dataset = (dataset
                  .batch(batch_size)
                  .prefetch(tf.data.AUTOTUNE))
        
        return dataset
    
    def _monitor_and_process(self, sample):
        """Process sample with monitoring and error handling"""
        try:
            # Simulate processing
            import time
            start_time = time.time()
            
            # Potential error-prone processing
            if tf.random.uniform([]) < 0.05:  # 5% error rate
                return tf.constant([], dtype=tf.float32)  # Return empty tensor for error
            
            # Normal processing
            processed = tf.cast(sample, tf.float32) * 2.0
            
            # Record metrics
            processing_time = time.time() - start_time
            self.metrics['processing_times'].append(processing_time)
            self.metrics['samples_processed'] += 1
            
            return processed
            
        except Exception as e:
            self.metrics['errors'] += 1
            return tf.constant([], dtype=tf.float32)  # Return empty tensor for error
    
    def get_metrics(self):
        """Get pipeline metrics"""
        if self.metrics['processing_times']:
            avg_time = np.mean(self.metrics['processing_times'])
            std_time = np.std(self.metrics['processing_times'])
        else:
            avg_time = std_time = 0
            
        return {
            'samples_processed': self.metrics['samples_processed'],
            'errors': self.metrics['errors'],
            'avg_processing_time': avg_time,
            'std_processing_time': std_time,
            'error_rate': self.metrics['errors'] / max(1, self.metrics['samples_processed'] + self.metrics['errors'])
        }

# Test monitored pipeline
monitor_pipeline = MonitoredDataPipeline()
test_data = np.random.randn(200).astype(np.float32)
monitored_dataset = monitor_pipeline.create_monitored_pipeline(test_data)

# Process some batches
processed_batches = 0
for batch in monitored_dataset.take(5):
    processed_batches += 1
    print(f"Processed batch {processed_batches}, batch size: {batch.shape[0]}")

# Get metrics
metrics = monitor_pipeline.get_metrics()
print("\nPipeline metrics:")
for key, value in metrics.items():
    if 'time' in key:
        print(f"  {key}: {value:.6f}")
    else:
        print(f"  {key}: {value}")

## 7. Production Pipeline Best Practices

In [None]:
# Configuration-driven pipeline
class ConfigurableDataPipeline:
    """Production-ready configurable data pipeline"""
    
    def __init__(self, config):
        self.config = config
        self.preprocessing_layers = {}
        self._setup_preprocessing()
    
    def _setup_preprocessing(self):
        """Setup preprocessing layers based on configuration"""
        
        if 'text_preprocessing' in self.config:
            text_config = self.config['text_preprocessing']
            self.preprocessing_layers['text'] = tf.keras.utils.TextVectorization(
                max_tokens=text_config.get('max_tokens', 1000),
                output_sequence_length=text_config.get('sequence_length', 50)
            )
        
        if 'image_preprocessing' in self.config:
            img_config = self.config['image_preprocessing']
            layers = [tf.keras.layers.Rescaling(1./255)]
            
            if 'target_size' in img_config:
                size = img_config['target_size']
                layers.append(tf.keras.layers.Resizing(size[0], size[1]))
            
            if img_config.get('augment', False):
                layers.extend([
                    tf.keras.layers.RandomFlip("horizontal"),
                    tf.keras.layers.RandomRotation(0.1),
                    tf.keras.layers.RandomZoom(0.1)
                ])
            
            self.preprocessing_layers['image'] = tf.keras.Sequential(layers)
    
    def create_pipeline(self, data_source, mode='train'):
        """Create optimized pipeline based on configuration"""
        
        # Load data based on source type
        if self.config['data_source']['type'] == 'tfrecord':
            dataset = self._load_tfrecord(data_source)
        elif self.config['data_source']['type'] == 'directory':
            dataset = self._load_directory(data_source)
        else:
            dataset = self._load_tensor_slices(data_source)
        
        # Apply preprocessing
        dataset = dataset.map(
            lambda x, y: self._preprocess_sample(x, y, mode),
            num_parallel_calls=tf.data.AUTOTUNE
        )
        
        # Configure pipeline based on mode
        if mode == 'train':
            dataset = dataset.shuffle(self.config['training']['shuffle_buffer'])
            dataset = dataset.repeat(self.config['training'].get('repeat', 1))
        
        # Batch and optimize
        batch_size = self.config['training']['batch_size']
        dataset = (dataset
                  .batch(batch_size)
                  .prefetch(tf.data.AUTOTUNE))
        
        return dataset

# Production configuration example
production_config = {
    'data_source': {
        'type': 'tfrecord',
        'pattern': 'data/*.tfrecord'
    },
    'text_preprocessing': {
        'max_tokens': 10000,
        'sequence_length': 128
    },
    'image_preprocessing': {
        'target_size': [224, 224],
        'augment': True
    },
    'training': {
        'batch_size': 32,
        'shuffle_buffer': 1000,
        'repeat': 1
    }
}

print("Production pipeline configuration created")
print(f"Configuration keys: {list(production_config.keys())}")

# Example usage would be:
# pipeline = ConfigurableDataPipeline(production_config)
# train_dataset = pipeline.create_pipeline(train_data, mode='train')

## 8. Performance Benchmarking and Optimization

In [None]:
# Comprehensive pipeline benchmarking
class PipelineBenchmark:
    """Benchmark different pipeline configurations"""
    
    def __init__(self):
        self.results = {}
    
    def benchmark_configurations(self, data, configs, num_batches=50):
        """Benchmark multiple pipeline configurations"""
        
        for config_name, config in configs.items():
            print(f"Benchmarking {config_name}...")
            
            # Create dataset with configuration
            dataset = self._create_dataset_with_config(data, config)
            
            # Measure performance
            times = []
            for _ in range(3):  # Multiple runs for accuracy
                start_time = time.time()
                for i, batch in enumerate(dataset):
                    if i >= num_batches:
                        break
                    # Simulate model consumption
                    _ = tf.reduce_mean(batch[0])
                end_time = time.time()
                times.append(end_time - start_time)
            
            avg_time = np.mean(times)
            self.results[config_name] = {
                'avg_time': avg_time,
                'batches_per_sec': num_batches / avg_time,
                'config': config
            }
    
    def _create_dataset_with_config(self, data, config):
        """Create dataset with specific configuration"""
        dataset = tf.data.Dataset.from_tensor_slices(data)
        
        # Apply transformations based on config
        if config.get('map_parallel'):
            dataset = dataset.map(
                lambda x: x * 2, 
                num_parallel_calls=tf.data.AUTOTUNE
            )
        else:
            dataset = dataset.map(lambda x: x * 2)
        
        if config.get('cache'):
            dataset = dataset.cache()
        
        if config.get('shuffle'):
            dataset = dataset.shuffle(config.get('shuffle_buffer', 1000))
        
        dataset = dataset.batch(config.get('batch_size', 32))
        
        if config.get('prefetch'):
            dataset = dataset.prefetch(tf.data.AUTOTUNE)
        
        return dataset
    
    def print_results(self):
        """Print benchmark results"""
        print("\nBenchmark Results:")
        print("-" * 60)
        
        sorted_results = sorted(
            self.results.items(), 
            key=lambda x: x[1]['batches_per_sec'], 
            reverse=True
        )
        
        for config_name, result in sorted_results:
            print(f"{config_name:20} | {result['batches_per_sec']:8.2f} batches/sec | {result['avg_time']:6.3f}s total")

# Define benchmark configurations
benchmark_configs = {
    'baseline': {
        'batch_size': 32
    },
    'with_caching': {
        'batch_size': 32,
        'cache': True
    },
    'with_prefetch': {
        'batch_size': 32,
        'prefetch': True
    },
    'with_parallel': {
        'batch_size': 32,
        'map_parallel': True
    },
    'optimized': {
        'batch_size': 32,
        'cache': True,
        'prefetch': True,
        'map_parallel': True
    }
}

# Run benchmarks
benchmark_data = (np.random.randn(1000, 100, 100, 3).astype(np.float32), 
                 np.random.randint(0, 10, 1000))

benchmarker = PipelineBenchmark()
benchmarker.benchmark_configurations(benchmark_data, benchmark_configs, num_batches=30)
benchmarker.print_results()

## Summary

**File Location:** `notebooks/01_tensorflow_foundations/02_data_pipelines_tfrecords.ipynb`

This comprehensive notebook covered the essential aspects of TensorFlow data pipelines:

### Key Concepts Mastered:
1. **tf.data API Fundamentals**: Dataset creation, transformations, and chaining operations
2. **Performance Optimization**: Caching, prefetching, parallel processing, and interleaving
3. **TFRecords**: Creating, writing, reading, and sharding for efficient data storage
4. **tf.keras Preprocessing**: Text vectorization, image preprocessing, and structured data handling
5. **Custom Datasets**: Building flexible, reusable dataset classes
6. **Multi-modal Pipelines**: Combining images, text, and structured data
7. **Production Patterns**: Cross-validation, monitoring, error handling, and configuration-driven pipelines
8. **Performance Benchmarking**: Measuring and optimizing pipeline throughput

### Critical Performance Insights:
- **Prefetching** overlaps data preprocessing with model training
- **Caching** eliminates redundant computations across epochs  
- **Parallel mapping** utilizes multiple CPU cores effectively
- **TFRecords** provide optimized binary format for large datasets
- **Proper batching** maximizes GPU utilization

### Production-Ready Features:
- Error handling and monitoring capabilities
- Configuration-driven pipeline construction
- Multi-modal data integration
- Cross-validation data splitting
- Comprehensive performance benchmarking

### Next Steps:
- Apply these pipelines in debugging and profiling (Notebook 03)
- Integrate with tf.keras models for end-to-end training
- Scale to distributed training scenarios

This foundation ensures efficient, scalable data handling for any TensorFlow project!