# **CHAPTER 13**
# **Loading and Preprocessing Data with TensorFlow**

**The Data API**

This subchapter introduces the TensorFlow Data API, which is designed to build efficient, scalable, and flexible input pipelines. The Data API allows datasets to be streamed from disk, transformed, shuffled, batched, and prefetched, enabling models to train efficiently even with very large datasets that do not fit into memory.
A tf.data.Dataset represents a sequence of elements, where each element can be a tensor, tuple, or dictionary of tensors. The API emphasizes lazy evaluation, meaning transformations are only executed when data is actually consumed.


**Creating Datasets from Tensors**

This section explains how to create datasets directly from tensors or NumPy arrays. This approach is suitable for small to medium-sized datasets that fit entirely in memory.


In [2]:
import tensorflow as tf

X = tf.range(10)  # buat tensor data
dataset = tf.data.Dataset.from_tensor_slices(X)
print(dataset)


<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>


In [3]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


**Chaining Transformations**

The Data API supports chaining multiple transformations to build complex pipelines. Common transformations include:
•	repeat()
•	batch()
•	map()
•	shuffle()
•	prefetch()


In [4]:
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [5]:
dataset = dataset.map(lambda x: x * 2)

In [6]:
dataset = dataset.apply(tf.data.experimental.unbatch())

Instructions for updating:
Use `tf.data.Dataset.unbatch()`.


In [7]:
dataset = dataset.filter(lambda x: x < 10)

In [8]:
for item in dataset.take(3):
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)


**Shuffling the Data**

Shuffling is critical for training neural networks to avoid learning artifacts from data ordering. TensorFlow shuffles data using a fixed-size buffer, ensuring randomness without loading the entire dataset into memory.


In [9]:
dataset = tf.data.Dataset.range(10).repeat(3) # 0 to 9, three times
dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)


**Preprocessing the Data**

This subchapter focuses on applying preprocessing operations directly inside the input pipeline using the map() method. Preprocessing may include scaling, normalization, feature engineering, or data augmentation.

In [13]:
import tensorflow as tf

# Contoh untuk 8 fitur
X_mean = tf.constant([0.5, 0.4, 0.3, 0.2, 0.1, 0.0, -0.1, -0.2], dtype=tf.float32)
X_std  = tf.constant([1.0, 1.1, 0.9, 1.2, 1.0, 1.3, 0.8, 1.0], dtype=tf.float32)

n_inputs = 8

def preprocess(line):
    defs = [0.] * n_inputs + [0.]  # default value untuk CSV
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y


In [14]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 3.7083001e+00,  3.9636360e+01,  5.5813336e+00,  5.9758335e-01,
         8.4590002e+02,  1.7976923e+00,  4.6962498e+01, -1.2200000e+02],
       dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

**Putting It All Together**

This section combines multiple transformations into a single pipeline. The order of operations is important for efficiency.
Typical pipeline structure:
1.	Shuffle
2.	Repeat
3.	Map (preprocess)
4.	Batch
5.	Prefetch


In [15]:
def csv_reader_dataset(
    filepaths,
    repeat=1,
    n_readers=5,
    n_read_threads=None,
    shuffle_buffer_size=10000,
    n_parse_threads=5,
    batch_size=32
):
    # Buat dataset dari daftar file CSV
    dataset = tf.data.Dataset.list_files(filepaths)

    # Baca file secara paralel
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),  # skip header
        cycle_length=n_readers,
        num_parallel_calls=n_read_threads
    )

    # Parse tiap baris CSV
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)

    # Shuffle dan ulang dataset sesuai repeat
    dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)

    # Batch dan prefetch untuk efisiensi
    dataset = dataset.batch(batch_size).prefetch(1)

    return dataset


**Prefetching the Data**

Prefetching overlaps data preprocessing and model execution, significantly improving performance. While the model is training on one batch, the next batch is prepared in parallel.


**Using the Data API with Keras**

This subchapter explains how seamlessly tf.data.Dataset integrates with Keras models. Datasets can be passed directly to model.fit(), model.evaluate(), and model.predict().

In [21]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

In [22]:
train_filepaths = ["data/train1.csv", "data/train2.csv"]  # ganti dengan path file training
valid_filepaths = ["data/valid.csv"]                       # ganti dengan path file validasi
test_filepaths  = ["data/test.csv"]                        # ganti dengan path file test


In [23]:
n_inputs = 8   # jumlah fitur
# mean dan std tiap fitur dari training set, bisa dihitung dari numpy
X_mean = tf.constant([0.5]*n_inputs, dtype=tf.float32)
X_std  = tf.constant([0.2]*n_inputs, dtype=tf.float32)


In [24]:
def preprocess(line):
    # default value per kolom: semua 0.0
    defs = [0.0] * n_inputs + [0.0]  # label terakhir
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    # standardisasi fitur
    x = (x - X_mean) / X_std
    return x, y

In [25]:
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
                       n_read_threads=None, shuffle_buffer_size=10000,
                       n_parse_threads=5, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),  # skip header
        cycle_length=n_readers,
        num_parallel_calls=n_read_threads
    )
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
    return dataset.batch(batch_size).prefetch(1)


**Loading Data from Files**

TensorFlow supports loading data from various file formats, including:
•	CSV
•	Binary files
•	TFRecord (recommended for large-scale training)
This section emphasizes the importance of efficient file formats for performance and portability.

**The TFRecord Format**

TFRecord is TensorFlows preferred binary format for storing large datasets. It is optimized for sequential reading and works well with distributed systems.


In [27]:
import tensorflow as tf

with tf.io.TFRecordWriter("my_data.tfrecord") as f:
    f.write(b"This is the first record")
    f.write(b"And this is the second record")


In [28]:
import tensorflow as tf

filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)

for item in dataset:
    print(item)


tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'And this is the second record', shape=(), dtype=string)


**Compressed TFRecord Files**

It can sometimes be useful to compress your TFRecord files, especially if they need to
be loaded via a network connection. You can create a compressed TFRecord file by
setting the options argument:

In [29]:
import tensorflow as tf

# Opsi untuk menulis TFRecord dengan kompresi GZIP
options = tf.io.TFRecordOptions(compression_type="GZIP")

with tf.io.TFRecordWriter("my_compressed.tfrecord", options=options) as f:
    f.write(b"This is the first record")
    f.write(b"And this is the second record")


In [30]:
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
compression_type="GZIP")

**A Brief Introduction to Protocol Buffers**

This is a portable, extensi‐
ble, and efficient binary format developed at Google back in 2001 and made open
source in 2008; protobufs are now widely used, in particular in gRPC, Google’s
remote procedure call system.

**TensorFlow Protobufs**

The main protobuf typically used in a TFRecord file is the Example protobuf, which
represents one instance in a dataset. It contains a list of named features, where each
feature can either be a list of byte strings, a list of floats, or a list of integers.

In [39]:
import tensorflow as tf
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example

In [40]:
person_example = Example(
    features=Features(
        feature={
            "name": Feature(bytes_list=BytesList(value=[b"Alice"])),
            "id": Feature(int64_list=Int64List(value=[123])),
            "emails": Feature(bytes_list=BytesList(value=[b"a@b.com", b"c@d.com"]))
        }
    )
)

In [41]:
tfrecord_filename = "my_contacts.tfrecord"
with tf.io.TFRecordWriter(tfrecord_filename) as f:
    f.write(person_example.SerializeToString())

print(f"TFRecord file '{tfrecord_filename}' berhasil dibuat!")

TFRecord file 'my_contacts.tfrecord' berhasil dibuat!


In [42]:
dataset = tf.data.TFRecordDataset([tfrecord_filename])

In [43]:
def parse_example(serialized_example):
    # Deskripsikan struktur data
    feature_description = {
        "name": tf.io.FixedLenFeature([], tf.string),
        "id": tf.io.FixedLenFeature([], tf.int64),
        "emails": tf.io.VarLenFeature(tf.string)
    }
    parsed = tf.io.parse_single_example(serialized_example, feature_description)
    # Untuk VarLenFeature kita konversi ke dense tensor
    parsed["emails"] = tf.sparse.to_dense(parsed["emails"])
    return parsed

parsed_dataset = dataset.map(parse_example)

In [44]:
for record in parsed_dataset:
    print(record)

{'emails': <tf.Tensor: shape=(2,), dtype=string, numpy=array([b'a@b.com', b'c@d.com'], dtype=object)>, 'id': <tf.Tensor: shape=(), dtype=int64, numpy=123>, 'name': <tf.Tensor: shape=(), dtype=string, numpy=b'Alice'>}


**Loading and Parsing Examples**

To load the serialized Example protobufs, we will use a tf.data.TFRecordDataset
once again, and we will parse each Example using tf.io.parse_single_example().
This is a TensorFlow operation, so it can be included in a TF Function. It requires at
least two arguments: a string scalar tensor containing the serialized data, and a
description of each feature.

In [45]:
import tensorflow as tf

In [46]:
feature_description = {
    "name": tf.io.FixedLenFeature([], tf.string, default_value=""),
    "id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
    "emails": tf.io.VarLenFeature(tf.string),  # bisa berbeda panjang per contoh
}

In [47]:
tfrecord_file = "my_contacts.tfrecord"
dataset = tf.data.TFRecordDataset([tfrecord_file])

for serialized_example in dataset:
    parsed_example = tf.io.parse_single_example(serialized_example, feature_description)

    # Untuk VarLenFeature (emails), bisa akses .values langsung atau konversi ke dense
    emails_dense = tf.sparse.to_dense(parsed_example["emails"], default_value=b"")

    print("Name:", parsed_example["name"].numpy())
    print("ID:", parsed_example["id"].numpy())
    print("Emails:", emails_dense.numpy())
    print("---")

Name: b'Alice'
ID: 123
Emails: [b'a@b.com' b'c@d.com']
---


In [48]:
batch_size = 10
batched_dataset = tf.data.TFRecordDataset([tfrecord_file]).batch(batch_size)

for serialized_examples in batched_dataset:
    parsed_examples = tf.io.parse_example(serialized_examples, feature_description)

    # Convert VarLenFeature to dense tensors
    emails_dense_batch = tf.sparse.to_dense(parsed_examples["emails"], default_value=b"")

    print("Batch Names:", parsed_examples["name"].numpy())
    print("Batch IDs:", parsed_examples["id"].numpy())
    print("Batch Emails:", emails_dense_batch.numpy())
    print("===")

Batch Names: [b'Alice']
Batch IDs: [123]
Batch Emails: [[b'a@b.com' b'c@d.com']]
===


**Handling Lists of Lists Using the SequenceExample Protobuf**

In [49]:
import tensorflow as tf
from tensorflow.train import BytesList, FloatList, Int64List, Feature, Features, FeatureList, FeatureLists, SequenceExample

In [50]:
context_features = Features(feature={
    "user_id": Feature(int64_list=Int64List(value=[123])),
    "username": Feature(bytes_list=BytesList(value=[b"Alice"]))
})


In [51]:
content_feature_list = FeatureList(feature=[
    Feature(bytes_list=BytesList(value=[b"Hello"])),
    Feature(bytes_list=BytesList(value=[b"World"]))
])
comments_feature_list = FeatureList(feature=[
    Feature(bytes_list=BytesList(value=[b"Nice"])),
    Feature(bytes_list=BytesList(value=[b"Post"]))
])

sequence_features = FeatureLists(feature_list={
    "content": content_feature_list,
    "comments": comments_feature_list
})

sequence_example = SequenceExample(
    context=context_features,
    feature_lists=sequence_features
)

In [52]:
with tf.io.TFRecordWriter("my_sequence.tfrecord") as f:
    f.write(sequence_example.SerializeToString())

In [54]:
context_feature_description = {
    "user_id": tf.io.FixedLenFeature([], tf.int64),
    "username": tf.io.FixedLenFeature([], tf.string)
}

In [55]:
sequence_feature_description = {
    "content": tf.io.VarLenFeature(tf.string),
    "comments": tf.io.VarLenFeature(tf.string)
}

dataset = tf.data.TFRecordDataset(["my_sequence.tfrecord"])

for serialized_seq_example in dataset:
    parsed_context, parsed_sequence = tf.io.parse_single_sequence_example(
        serialized_seq_example,
        context_features=context_feature_description,
        sequence_features=sequence_feature_description
    )

    # Konversi VarLenFeature ke RaggedTensor untuk sequence features
    parsed_content = tf.RaggedTensor.from_sparse(parsed_sequence["content"])
    parsed_comments = tf.RaggedTensor.from_sparse(parsed_sequence["comments"])

    # Tampilkan hasil
    print("Context:", {k: v.numpy() for k, v in parsed_context.items()})
    print("Content:", parsed_content.numpy())
    print("Comments:", parsed_comments.numpy())
    print("---")

Context: {'user_id': np.int64(123), 'username': b'Alice'}
Content: [[b'Hello']
 [b'World']]
Comments: [[b'Nice']
 [b'Post']]
---


**Preprocessing the Input Features**

Preparing your data for a neural network requires converting all features into numerical features, generally normalizing them, and more. In particular, if your data contains categorical features or text features, they need to be converted to numbers. This
can be done ahead of time when preparing your data files, using any tool you like.

In [56]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

In [58]:
import numpy as np

X_train = np.random.rand(100, 8)
y_train = np.random.rand(100, 1)


In [59]:
means = np.mean(X_train, axis=0, keepdims=True)
stds = np.std(X_train, axis=0, keepdims=True)
eps = keras.backend.epsilon()

model = keras.models.Sequential([
    keras.layers.Lambda(lambda inputs: (inputs - means) / (stds + eps)),
    keras.layers.Dense(32, activation="relu"),
    keras.layers.Dense(1)
])

model.compile(optimizer="adam", loss="mse", metrics=["mae"])
model.fit(X_train, y_train, epochs=5, batch_size=16)

Epoch 1/5
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 11ms/step - loss: 0.2211 - mae: 0.3891
Epoch 2/5
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - loss: 0.1889 - mae: 0.3515 
Epoch 3/5
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - loss: 0.1711 - mae: 0.3312 
Epoch 4/5
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.1554 - mae: 0.3123
Epoch 5/5
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - loss: 0.1522 - mae: 0.3182 


<keras.src.callbacks.history.History at 0x7db534607260>

In [60]:
model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.2)

Epoch 1/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 198ms/step - loss: 0.1581 - mae: 0.3254 - val_loss: 0.1068 - val_mae: 0.2790
Epoch 2/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 69ms/step - loss: 0.1588 - mae: 0.3334 - val_loss: 0.1061 - val_mae: 0.2795
Epoch 3/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 154ms/step - loss: 0.1420 - mae: 0.3061 - val_loss: 0.1065 - val_mae: 0.2809
Epoch 4/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step - loss: 0.1304 - mae: 0.2979 - val_loss: 0.1078 - val_mae: 0.2829
Epoch 5/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step - loss: 0.1189 - mae: 0.2849 - val_loss: 0.1093 - val_mae: 0.2846
Epoch 6/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step - loss: 0.1266 - mae: 0.2897 - val_loss: 0.1109 - val_mae: 0.2859
Epoch 7/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step - loss: 0.1180 - mae:

<keras.src.callbacks.history.History at 0x7db534770950>

In [61]:
class Standardization(keras.layers.Layer):
    def adapt(self, data_sample):
        self.means_ = np.mean(data_sample, axis=0, keepdims=True)
        self.stds_ = np.std(data_sample, axis=0, keepdims=True)

    def call(self, inputs):
        eps = keras.backend.epsilon()
        return (inputs - self.means_) / (self.stds_ + eps)

In [62]:
std_layer = Standardization()
std_layer.adapt(X_train[:500])

In [63]:
model2 = keras.Sequential([
    std_layer,
    keras.layers.Dense(32, activation="relu"),
    keras.layers.Dense(1)
])

model2.compile(
    optimizer="adam",
    loss="mse",
    metrics=["mae"]
)

model2.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.2)

Epoch 1/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 199ms/step - loss: 0.8099 - mae: 0.7506 - val_loss: 0.6761 - val_mae: 0.6992
Epoch 2/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 75ms/step - loss: 0.6896 - mae: 0.7006 - val_loss: 0.6259 - val_mae: 0.6748
Epoch 3/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms/step - loss: 0.6823 - mae: 0.6887 - val_loss: 0.5805 - val_mae: 0.6511
Epoch 4/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 69ms/step - loss: 0.6084 - mae: 0.6537 - val_loss: 0.5399 - val_mae: 0.6283
Epoch 5/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 57ms/step - loss: 0.5744 - mae: 0.6328 - val_loss: 0.5029 - val_mae: 0.6056
Epoch 6/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step - loss: 0.4737 - mae: 0.5552 - val_loss: 0.4698 - val_mae: 0.5846
Epoch 7/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step - loss: 0.3986 - mae: 

<keras.src.callbacks.history.History at 0x7db53478e990>

**Summary**

Chapter 13 focuses on building robust, efficient, and scalable data pipelines using TensorFlow. Key takeaways include:
•	Using tf.data.Dataset for streaming data
•	Applying transformations lazily
•	Leveraging TFRecord for performance
•	Integrating preprocessing directly into models
This chapter is essential for training deep learning models on real-world, large-scale datasets.
