
# Data Transformation and TFRecord Generation

This notebook is the critical data conversion step, transforming the consolidated file path indices (from `all_image_mask_pairs.csv`) into the highly efficient **TFRecord** binary format. This format is native to TensorFlow and enables optimized, large-scale data loading, improved I/O performance, and seamless distribution across multiple workers or devices during model training.

---

## Key Functions and Methodology

## 1. Data Serialization and Sharding
The main function, `create_tfrecord`, reads the master CSV index and the multi-class label mapping. It performs the following steps:
* **Shuffling:** The dataset is globally shuffled before sharding to ensure class balance across the output files.
* **Byte Conversion:** It iterates through each record, reads the raw image data (and mask data, if present) directly into byte strings, and converts the class name into its corresponding integer index.
* **Sharding:** The entire dataset is partitioned into multiple (`num_shards=10` by default) distinct `.tfrecord` files. This strategy facilitates parallel data ingestion, which is essential for reducing I/O bottlenecks and improving GPU utilization during training.

## 2. Robust Data Parsing (`_parse_function`)
As a prototype and checking step to see that tfrecords are created successfully, a key component is the `_parse_function`, which defines how the serialized data should be read back into memory by the `tf.data` pipeline. This function ensures the pipeline is robust and versatile for both segmentation and classification models:
* **Feature Mapping:** It maps the raw byte strings and integers back to the original data structure (`image`, `mask`, `class`).
* **Handling Missing Masks:** It implements a **conditional logic (`tf.cond`)** to detect records where the segmentation mask was absent (encoded as an empty byte string). In this case, instead of failing, it substitutes a dummy tensor of zeros for the mask, allowing models (like classification models) that don't need the mask to still process the image record seamlessly.

This notebook successfully converts the entire dataset into a high-performance, sharded format ready for the segmentation and classification training tasks.

---

In [1]:
# Import necessary libraries
from collections import Counter
import json
import math
import numpy as np
import pandas as pd
import tensorflow as tf

2025-11-11 18:48:10.924239: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-11-11 18:48:11.802126: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-11-11 18:48:14.665364: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


## Section 1: TFRecord Serialization and Sharding Logic

This section defines all the necessary helper functions and the primary function, `create_tfrecord`, required to serialize the raw image data and corresponding labels into the **TFRecord** binary format. This serialization process involves converting file paths into raw bytes and class names into integer features, and then packaging them into sharded files for optimized dataset loading in TensorFlow.

---

### 1.1 TF Feature Encoding Utilities

This group of helper functions prepares individual data elements (images, masks, and labels) for serialization into the `tf.train.Example` protocol buffer format.

* **`_bytes_feature(value)`**: Encodes raw byte strings (used for images and masks) as a `tf.train.Feature`. It includes a check to automatically convert TensorFlow `constant` objects to NumPy format if necessary.
* **`_int64_feature(value)`**: Encodes scalar integer values (used for the class label) as a `tf.train.Feature`.
* **`image_to_bytes(path)`**: A simple file utility that opens an image file in binary read mode (`'rb'`) and returns its entire content as a raw byte string. This is crucial for embedding the image data directly into the TFRecord file, eliminating subsequent disk lookups.

In [7]:
def _bytes_feature(value):
    '''
    Converts a value (bytes or Tensor) to a tf.train.Feature containing a bytes_list.
    This is used for serializing raw image and mask data.
    '''
    # Check if the value is a TensorFlow constant, and if so, convert it to a NumPy array (raw bytes)
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    # Create and return the TF Feature object
    return tf.train.Feature(bytes_list= tf.train.BytesList(value= [value]))

def _int64_feature(value):
    '''
    Converts a scalar integer value to a tf.train.Feature containing an int64_list.
    This is used for serializing class labels.
    '''
    # Create and return the TF Feature object
    return tf.train.Feature(int64_list= tf.train.Int64List(value= [value]))

def image_to_bytes(path):
    '''
    Reads the content of a file path (typically an image or mask) into raw bytes.
    '''
    # Open the file in binary read ('rb') mode
    with open(path, 'rb') as f:
        # Read the entire file content
        return f.read()

### 1.2 `create_tfrecord` Function (Sharding and Writing)

This is the core function that orchestrates the entire TFRecord creation process, taking the master CSV index and converting it into multiple sharded binary files.

#### Execution Steps:
1.  **Load and Shuffle:** It reads the `all_image_mask_pairs.csv` using pandas, then performs a global **random shuffle** (`df.sample(frac= 1)`) on the entire dataset to ensure that the resulting shards are class-balanced.
2.  **Sharding Calculation:** It calculates the `shard_size` by dividing the total number of records by the requested `num_shards` (defaulting to 10) and using `math.ceil` to ensure all records are included.
3.  **Iteration and Serialization:** The function loops through the defined number of shards. For each shard:
    * It slices the shuffled DataFrame (`shard_df`) based on `start_idx` and `end_idx`.
    * It initializes a `tf.io.TFRecordWriter` for the shard's output path (`./data/tfrecords/data_XX.tfrecord`).
    * It iterates over the rows in `shard_df`, converts the image and mask paths to byte strings, and maps the class name to its integer ID (`label_int`).
    * **Mask Handling:** It explicitly checks if a `mask_path` is present (`pd.notna`). If not, an empty byte string (`b''`) is used for the mask feature, which will be handled during the parsing step.
    * **Writing Example:** The features are bundled into a `tf.train.Example` and serialized before being written to the TFRecord file.

In [8]:
def create_tfrecord(csv_path, output_dir, class_mapping, num_shards= 10):
    '''
    Converts the dataset index (from CSV) and raw image data into sharded TFRecord files.
    
    This function reads the image paths, loads the image/mask bytes, converts labels 
    to integers, and writes the serialized data to a specified number of TFRecord shards.

    Args:
        csv_path (str): Path to the CSV index file containing image/mask paths and classes.
        output_dir (str): Directory where the TFRecord shards will be saved.
        class_mapping (dict): Dictionary mapping class names to integer labels.
        num_shards (int, optional): The number of output TFRecord files to create. Defaults to 10.
    '''
    # Read the master CSV file containing all image/mask paths
    df = pd.read_csv(csv_path)
    # Globally shuffle the DataFrame randomly and reset the index for balanced sharding
    df = df.sample(frac= 1).reset_index(drop= True)
    
    # Calculate the size of each shard (using ceiling to ensure all rows are included)
    shard_size = math.ceil(len(df) / num_shards)
    
    # Iterate through the desired number of shards
    for shard_id in range(num_shards):
        # Determine the start and end indices for the current shard
        start_idx = shard_id * shard_size
        end_idx = min((shard_id + 1) * shard_size, len(df))
        # Extract the subset of the DataFrame for the current shard
        shard_df = df.iloc[start_idx:end_idx]
        
        # Construct the output file path for the current shard (e.g., data_00.tfrecord)
        tfrecord_path = f'{output_dir}/data_{shard_id:02d}.tfrecord'
        
        # Initialize the TFRecord writer
        with tf.io.TFRecordWriter(tfrecord_path) as writer:
            # Iterate over each row (image record) in the current shard
            for _, row in shard_df.iterrows():
                # Convert image path to raw bytes
                image_bytes = image_to_bytes(row['image_path'])
                
                # Convert mask path to raw bytes, or set to empty bytes if mask is missing (NaN)
                mask_bytes = image_to_bytes(row['mask_path']) if pd.notna(row['mask_path']) else b''
                
                # Look up the integer label using the class mapping
                label_int = class_mapping[row['class']]
                
                # Assemble the features dictionary using the helper functions
                feature = {
                    'image': _bytes_feature(image_bytes),
                    'mask': _bytes_feature(mask_bytes),
                    'class': _int64_feature(label_int)
                }
                
                # Create a tf.train.Example protocol buffer
                example = tf.train.Example(features= tf.train.Features(feature= feature))
                
                # Serialize the Example and write it to the TFRecord file
                writer.write(example.SerializeToString())
                
            # Print a confirmation message once the shard is saved
            print(f"Saved shard {shard_id+1}/{num_shards} to {tfrecord_path}")

### 1.3 Execution of TFRecord Generation

This final block executes the `create_tfrecord` function, initiating the entire data transformation process. It defines the necessary input and output paths and loads the class mapping configuration required for serialization.

* **Input Data:** The script specifies that the paths should be read from the master index file: `csv_path = './data/all_image_mask_pairs.csv'`.
* **Output Directory:** The resulting sharded TFRecord files are directed to `output_dir = './data/tfrecords'`.
* **Mapping Loading:** It loads the four-class index mapping (COVID, Normal, Viral Pneumonia, Lung Opacity) from the `class_mapping.json` file created in the previous notebook.
* **Function Call:** The `create_tfrecord` function is invoked with the specified paths, the loaded mapping, and the sharding parameter `num_shards=10`. The resulting output confirms the successful creation of all 10 sharded TFRecord files.

In [9]:
# --- TFRecord Generation Execution ---

# Define the input path for the master index CSV file
csv_path = './data/all_image_mask_pairs.csv'
# Define the output directory where the sharded TFRecord files will be saved
output_dir = './data/tfrecords'

# Load the class-to-index mapping from the JSON file created earlier
with open('./data/class_mapping.json', 'r') as f:
    class_mapping = json.load(f)
    
# Execute the main function to read the CSV, serialize the data, and write 10 sharded TFRecord files
create_tfrecord(csv_path, output_dir, class_mapping, num_shards= 10)

Saved shard 1/10 to ./data/tfrecords/data_00.tfrecord
Saved shard 2/10 to ./data/tfrecords/data_01.tfrecord
Saved shard 3/10 to ./data/tfrecords/data_02.tfrecord
Saved shard 4/10 to ./data/tfrecords/data_03.tfrecord
Saved shard 5/10 to ./data/tfrecords/data_04.tfrecord
Saved shard 6/10 to ./data/tfrecords/data_05.tfrecord
Saved shard 7/10 to ./data/tfrecords/data_06.tfrecord
Saved shard 8/10 to ./data/tfrecords/data_07.tfrecord
Saved shard 9/10 to ./data/tfrecords/data_08.tfrecord
Saved shard 10/10 to ./data/tfrecords/data_09.tfrecord


---
## Section 2: TFRecord Validation and Data Parsing

This section is dedicated to validating the structural integrity and balanced distribution of the newly created TFRecord files. It defines the crucial deserialization logic required for the `tf.data` pipeline and performs checks to ensure data is correctly parsed and that the earlier global shuffling resulted in balanced class distribution across the shards.

---

### 2.1 Data Parsing Function (`_parse_function`) and Structural Check

The `_parse_function` is the fundamental component for reading the binary TFRecord files into usable tensors within the TensorFlow environment.

* **Deserialization:** It defines the expected schema of the serialized data using `tf.io.FixedLenFeature` (`image` as string/bytes, `mask` as string/bytes, and `class` as `int64`) and uses `tf.io.parse_single_example` to extract the raw features.
* **Image Decoding:** The raw byte features for the image are decoded into a 3-channel tensor using `tf.image.decode_png(..., channels=3)`.
* **Critical Mask Handling:** It implements a robust mechanism using **`tf.cond`** to handle records where a segmentation mask was not present (encoded as an empty byte string).
    * If the mask byte string length is zero, it returns a tensor of zeros (`tf.zeros_like`) to act as a dummy mask.
    * Otherwise, it decodes the mask into a 1-channel tensor (`tf.image.decode_png(..., channels=1)`).
---

#### Structural Verification
A test is performed on the first shard (`data_00.tfrecord`) to confirm the function correctly returns the expected tensor shapes and integer labels.

| Element | Example Output Shape / Value | Comment |
| :--- | :--- | :--- |
| **Image** | `(299, 299, 3)` | Image size and 3 color channels |
| **Mask** | `(256, 256, 1)` | Mask size and 1 channel |
| **Label** | `1`, `3`, `0`, etc. | Integer class index confirmed |

In [2]:
def _parse_function(proto):
    '''
    Deserializes a single serialized tf.train.Example (protocol buffer) 
    from a TFRecord file back into image, mask, and label tensors.

    This function is the core of the tf.data input pipeline.
    '''
    # Define the expected features (schema) serialized in the TFRecord file
    feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string), # Raw image bytes
        'mask': tf.io.FixedLenFeature([], tf.string),  # Raw mask bytes (can be empty)
        'class': tf.io.FixedLenFeature([], tf.int64),  # Integer class label
    }
    
    # Parse the single serialized example using the defined schema
    parsed_features = tf.io.parse_single_example(proto, feature_description)
    
    # Decode the raw image bytes (PNG format) into a 3-channel tensor
    image = tf.image.decode_png(parsed_features['image'], channels=3)
    
    # Extract the raw mask bytes
    mask_raw = parsed_features['mask']
    
    # Conditional logic (tf.cond) to handle records with missing segmentation masks
    # Check if the mask byte string length is zero (b'')
    mask = tf.cond(
        tf.equal(tf.strings.length(mask_raw), 0),
        # True branch: Create a dummy mask of zeros with the same height/width as the image, but 1 channel
        lambda: tf.zeros_like(image[..., :1]),  # dummy mask with 1 channel zeros
        # False branch: Decode the raw mask bytes (PNG format) into a 1-channel tensor
        lambda: tf.image.decode_png(mask_raw, channels=1)
    )
    
    # Extract the integer class label
    label = parsed_features['class']
    
    # Return the processed tensors
    return image, mask, label

In [15]:
# --- TFRecord Validation: Structural Check ---

# Load the first TFRecord shard into a tf.data.Dataset
dataset1 = tf.data.TFRecordDataset(['./data/tfrecords/data_00.tfrecord'])

# Apply the parsing function to map the raw serialized data into tensors (image, mask, label)
dataset1 = dataset1.map(_parse_function)

# Iterate through the first 5 records to verify successful parsing and tensor shapes
for (img, mask, label) in dataset1.take(5):
    # Print the shape of the image and mask tensors, and the numpy value of the label
    print(img.shape, mask.shape, label.numpy())

(299, 299, 3) (256, 256, 1) 1
(299, 299, 3) (256, 256, 1) 3
(299, 299, 3) (256, 256, 1) 1
(299, 299, 3) (256, 256, 1) 0
(299, 299, 3) (256, 256, 1) 3


---
### 2.2 Randomness and Class Distribution Check

To validate that the global shuffling before sharding was successful, a check is performed on the class balance of a single, randomly selected shard (in this case, the last one, `data_09.tfrecord`).

* **`class_counts_stream` Function:** This helper function iterates through the dataset and uses Python's `collections.Counter` to tally the occurrence of each class index.
* **Result:** The class distribution check on `data_09.tfrecord` shows a mixed set of class indices, confirming that the data was successfully shuffled and distributed across the sharded files, which is necessary for efficient and balanced distributed training.

$$\text{Counts} = \{\text{1: 1061, 3: 583, 0: 354, 2: 114}\}$$
(Where 1=**Normal**, 3=**Lung\_Opacity**, 0=**COVID**, 2=**Viral Pneumonia**)

In [None]:
def class_counts_stream(dataset):
    '''
    Iterates through a dataset and calculates the count of each unique class label.
    This is used to verify the effectiveness of the global shuffling before sharding.
    '''
    # Initialize a Counter object
    cnt = Counter()
    # Iterate through the dataset, ignoring the image and mask tensors
    for _, _, label in dataset:
        # label is a tf.Tensor scalar -> convert to Python int
        # Convert the TensorFlow scalar Tensor to a Python integer before incrementing the count
        cnt[int(label.numpy())] += 1
    # Return the Counter object containing the class counts
    return cnt

In [8]:
dataset2 = tf.data.TFRecordDataset(['./data/tfrecords/data_09.tfrecord'])
dataset2 = dataset1.map(_parse_function)

counts = class_counts_stream(dataset2)
print(counts)

Counter({1: 1061, 3: 583, 0: 354, 2: 114})
