# How to TFRecord: A Comprehensive Guide

**TFRecord** is a versatile binary file format ideal for storing data. Its quick read speeds and streaming capabilities make it perfect for handling large datasets, especially those that cannot be entirely loaded into memory. TFRecords can store a variety of complex data types ranging from images and lists of floats to serialized tensors, thus aligning with the diverse data types encountered in machine learning workflows.

Dive deep into the world of TFRecord and understand foundational concepts like:

* Serialization: Converting data structures or object states into a storable format.

* BytesList: A list of byte strings used in TensorFlow's TFRecords format, essential when storing byte strings or raw content from image files.


**Serialization** is the process of converting data structures or object state into a format that can be stored and reconstructed later in the same or another computer environment. In the context of TensorFlow, serialization is often used to convert tensors into a binary string format that can be written to disk or sent over a network.

**BytesList** is a type of list used in TensorFlow's TFRecords format. A `BytesList` is a list of byte strings. When your data is a byte string (such as an image file's raw contents) or a string, you must first convert it into a BytesList in order to store it in a TFRecord.



In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.train import BytesList, Int64List, Feature, Features, Example

# Example Data

In [2]:
# Code which generated X, Y and Rec_norm (a parameter I used by my functional api)

X = np.ones((10,10,10,10,1))
Y = np.ones((10,2))
Rec_norm = np.ones((10,1))

# Setting the device to CPU.
with tf.device('CPU'):
    # Convert your data into TensorFlow tensors.
    X = tf.convert_to_tensor(X)
    Y = tf.convert_to_tensor(Y)
    Rec_norm = tf.convert_to_tensor(Rec_norm)

# Create TensorFlow Dataset objects from the tensors. 
features_dataset = tf.data.Dataset.from_tensor_slices(X)
rec_norm_dataset = tf.data.Dataset.from_tensor_slices(Rec_norm)
labels_dataset = tf.data.Dataset.from_tensor_slices(Y)

# Create a combined dataset by zipping together the three datasets.
dataset = tf.data.Dataset.zip((features_dataset, labels_dataset, rec_norm_dataset))

In [11]:
print("Label data type before serialization:", Y.dtype)
serialized_label = tf.io.serialize_tensor(Y)
print("Serialized label data type:", serialized_label.dtype)

# Deserialize and check the data type
deserialized_label = tf.io.parse_tensor(serialized_label, out_type=Y.dtype)
print("Deserialized label data type:", deserialized_label.dtype)


Label data type before serialization: <dtype: 'float64'>
Serialized label data type: <dtype: 'string'>
Deserialized label data type: <dtype: 'float64'>


# Functions

In [3]:
# Helper function to create a byte feature for your tf.Example. 
# This is used when your data is a byte or a string.

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    # If the value is a Tensor, we get its numpy value.
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    # Create and return a Feature with BytesList.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


In [4]:
# Helper function to create an int64 feature for your tf.Example. 
# This is used when your data is a boolean or integer.

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    # Create and return a Feature with Int64List.
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


In [5]:
def combined_serialize_example(f, l, r):
    """
    Combines the processes of serializing the data and wrapping it in a TensorFlow operation.
    """
    
    
    def serialize_content(feature, label, rec_norm):
        # Create a dictionary mapping the feature name to the tf.train.Example-compatible data type.
        
        feature_dict = {
          'feature': _bytes_feature(tf.io.serialize_tensor(feature)),
          'label': _bytes_feature(tf.io.serialize_tensor(label)),
          'rec_norm': _bytes_feature(tf.io.serialize_tensor(rec_norm)),
        }

        # Create a Features message using tf.train.Example.
        example_proto = tf.train.Example(features=tf.train.Features(feature=feature_dict))
        
        # Return the serialized Example string.
        return example_proto.SerializeToString()
    
    # Using tf.py_function to wrap the serialize_content function
    tf_string = tf.py_function(serialize_content, 
        (f, l, r),  # pass these args to the embedded function.
        tf.string   # the return type is `tf.string`.
    )
    
    return tf.reshape(tf_string, ()) # The result is a scalar


## Now using the functions defined above

In [6]:
# Map the tf_serialize_example function over the dataset. This applies the function to each example in the dataset.
serialized_features_dataset = dataset.map(combined_serialize_example)

## Save

In [7]:
# Define the location and filename where the TFRecord will be saved.
filename_save = "E:\\"  

writer = tf.io.TFRecordWriter(filename_save+"Test"+str(".tfrecord"))

# Iterate over the serialized dataset and write each example into the TFRecord file.
for serialized_example in serialized_features_dataset:
    
    writer.write(serialized_example.numpy())
    
# Close the writer to properly save the file.
writer.close()

## Load

### Preparation

In [31]:
def _parse_function(example_proto):
    # Create a dictionary describing the features.
    feature_description = {
      'feature': tf.io.FixedLenFeature([], tf.string),
      'label': tf.io.FixedLenFeature([], tf.string),
      'rec_norm': tf.io.FixedLenFeature([], tf.string)
    }
    # Parse the input tf.train.Example proto using the dictionary above.
    parsed_example = tf.io.parse_single_example(example_proto, feature_description)
    
    return  (tf.io.parse_tensor(parsed_example['feature'], out_type=tf.float64), 
            tf.io.parse_tensor(parsed_example['label'], out_type=tf.float64), 
            tf.io.parse_tensor(parsed_example['rec_norm'], out_type=tf.float64))

def set_shapes(x, y, r):
    x_shape = (10, 10, 10, 1)
    y_shape = (2,)
    r_shape = (1,)  
    
    x.set_shape(x_shape)
    y.set_shape(y_shape)
    r.set_shape(r_shape)
    
    return (x,r), y #here x and r are together returned for the functinal api

### Loading

In [32]:
train_dataset_class = tf.data.TFRecordDataset(filename_save+"Test"+str(".tfrecord"))

# Parse the datasets.
class_dataset = train_dataset_class.map(_parse_function)

dataset = tf.data.Dataset.sample_from_datasets([class_dataset])

# your dataset creation code

dataset = dataset.map(set_shapes)


In [33]:
dataset = dataset.batch(2)  # Replace with your desired batch size.

### Additional transformations:

#### repeat()

The dataset.repeat() method is used to repeat the dataset indefinitely. 

Without any arguments, this method will cause the dataset to be repeated in an endless cycle, meaning that it will never run out of data. 

This is particularly useful when training models for several epochs, as it ensures that the model can continue to receive data from the dataset. 

If you need to repeat the dataset a specific number of times, you can pass an integer argument to repeat(n), where n is the number of repetitions."

In [35]:
dataset = dataset.repeat()

#### prefetch(tf.data.AUTOTUNE)
The dataset.prefetch(tf.data.AUTOTUNE) method is used to optimize the pipeline's performance. 

It prefetches a certain number of batches or elements from the dataset, allowing subsequent steps to be processed while the current step is still executing. 

The tf.data.AUTOTUNE argument allows TensorFlow to automatically determine the optimal number of batches to prefetch, which can significantly improve the efficiency of data feeding, especially when dealing with large datasets or complex transformations. 

This is crucial for keeping the data pipeline running smoothly and avoiding bottlenecks, thereby improving training speed.

In [36]:
dataset = dataset.prefetch(tf.data.AUTOTUNE) 

#### Simple Check

In [34]:
for x, y in dataset.take(1):
    print("works")
    print(x[0].shape,x[1].shape, y.shape)

works
(2, 10, 10, 10, 1) (2, 1) (2, 2)


# Usage in a model

In [24]:
from tensorflow.keras.layers import Input, Conv3D, MaxPooling3D, BatchNormalization, Dropout, Flatten, Dense, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint

In [25]:
X = np.zeros((0, 10,10, 10, 1))
pad = 'same'
kernels = 8

input_img = Input(shape=X.shape[1:])
x = Conv3D(kernels, (3,3,3), activation="relu", padding=pad)(input_img)
x = Conv3D(kernels, (3,3,3), activation='relu', padding=pad)(x)
x = MaxPooling3D(pool_size=(2,2,2))(x)
x = BatchNormalization()(x)
x = Dropout(0.20)(x)

x = Flatten()(x)

input_rec_norm = Input(shape=(1,))

x = Dense(32, activation="relu")(x)
x = Concatenate()([x, input_rec_norm])
x = BatchNormalization()(x)
x = Dropout(0.15)(x)

x = Dense(32, activation="relu")(x)
x = BatchNormalization()(x)
x = Dropout(0.15)(x)

outputs = Dense(2, activation='softmax', name='visualized_layer')(x)

model = Model(inputs=[input_img, input_rec_norm], outputs=outputs)

model.compile(loss="binary_crossentropy",
              optimizer=tf.keras.optimizers.Adam(),
              metrics=['accuracy'])

filepath="models/Testmode_{val_accuracy:.2f}.model"  
checkpoint = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 10, 10, 10,  0           []                               
                                 1)]                                                              
                                                                                                  
 conv3d (Conv3D)                (None, 10, 10, 10,   224         ['input_1[0][0]']                
                                8)                                                                
                                                                                                  
 conv3d_1 (Conv3D)              (None, 10, 10, 10,   1736        ['conv3d[0][0]']                 
                                8)                                                            

In [38]:
# Assuming full_dataset is your complete dataset

# Split the dataset manually
train_dataset = dataset.take(4)  # Taking first 8 for training
val_dataset = dataset.skip(4)    # Skipping first 8, taking the rest for validation

# Then use train_dataset and val_dataset in model.fit


In [39]:
history = model.fit(
    dataset,  # The train dataset
    epochs=10,  # Number of epochs to train
    steps_per_epoch=4,  # Number of batches to consider as one epoch
    validation_data=val_dataset,  # Validation dataset
    validation_steps=1,# Number of validation batches to consider for validation metrics
    #callbacks=[checkpoint]
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


# Additional Material

# How many data-entries do I have stored?

Can take a very long time, depending on how many event you have. Use tqdm to measure the duration.

In [37]:
import tensorflow as tf

def count_records(tfrecord_filename):
    return sum(1 for _ in tf.data.TFRecordDataset(tfrecord_filename))

from tqdm.notebook import tqdm


train_filenames=[filename_save+"Test"+str(".tfrecord")]

tfrecord_files = train_filenames

total_records = sum(count_records(tfrecord) for tfrecord in tqdm(tfrecord_files))
print(f"Total number of records: {total_records}")


  0%|          | 0/1 [00:00<?, ?it/s]

Total number of records: 10


In [None]:
dataset_class1 ="E:\DSNB/"#All

dataset_class1_names=os.listdir(dataset_class1)
class1_files=[]
for i in dataset_class1_names:
    class1_files.append(dataset_class1+i)


dataset_class1 ="E:\ATMO/"#All
dataset_class1_names=os.listdir(dataset_class1)
class2_files=[]
for i in dataset_class1_names:
    class2_files.append(dataset_class1+i)
  
    
train_split_index_class1 = int(0.85 * len(class1_files))
train_split_index_class2 = int(0.85 * len(class2_files))


train_filenames_class1 = class1_files[:train_split_index_class1]
train_filenames_class2 = class2_files[:train_split_index_class2]
validation_filenames_class1 = class1_files[train_split_index_class1:]
validation_filenames_class2 = class2_files[train_split_index_class2:]


train_dataset_class1 = tf.data.TFRecordDataset(train_filenames_class1)
train_dataset_class2 = tf.data.TFRecordDataset(train_filenames_class2)
validation_dataset_class1 = tf.data.TFRecordDataset(validation_filenames_class1)
validation_dataset_class2 = tf.data.TFRecordDataset(validation_filenames_class2)

# Parse the datasets.
class1_dataset = train_dataset_class1.map(_parse_function)
class2_dataset = train_dataset_class2.map(_parse_function)

# Parse the datasets.
class1_dataset_val = validation_dataset_class1.map(_parse_function)
class2_dataset_val = validation_dataset_class2.map(_parse_function)


dataset = tf.data.Dataset.sample_from_datasets([class1_dataset, class2_dataset])
val_dataset = tf.data.Dataset.sample_from_datasets([class1_dataset_val, class2_dataset_val])

# your dataset creation code

dataset = dataset.map(set_shapes)
val_dataset = val_dataset.map(set_shapes)

dataset = dataset.repeat()
val_dataset = val_dataset.repeat()

# Add any additional transformations you need.
dataset = dataset.batch(128)  # Replace with your desired batch size.
dataset = dataset.prefetch(tf.data.AUTOTUNE) 

# Add any additional transformations you need.
val_dataset = val_dataset.batch(128)  # Replace with your desired batch size.
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE) 