In [1]:
from src.consts import CLEARED_DATASET
from src.consts import TF_RECORD_DATASET

# Packages Imports
import os

import numpy as np
import pandas as pd
import tensorflow as tf
from tqdm import tqdm
from PIL import Image

# Types
from typing import Dict, List

In [2]:
# Load Data
data_entry_path = f"{CLEARED_DATASET}/Data_Entry_2017.csv"
data_entry_df = pd.read_csv(data_entry_path, delimiter=',')

bounding_box_path = f"{CLEARED_DATASET}/BBox_List_2017.csv"
bounding_box_df = pd.read_csv(bounding_box_path, delimiter=',')

images_path = f"{CLEARED_DATASET}/images"

In [3]:
def build_labels_map(data_entry_df: pd.DataFrame) -> pd.DataFrame:
    """
    Build a mapping of unique labels to indices from a DataFrame containing medical findings.
    """
    if "Finding Labels" not in data_entry_df.columns:
        raise KeyError("The 'Finding Labels' column is missing from the input DataFrame.")

    unique_labels = (data_entry_df["Finding Labels"].str.split("|").explode().dropna().unique())
    label_to_index: Dict[str, int] = {label: idx for idx, label in enumerate(sorted(unique_labels))}
    return pd.DataFrame(list(label_to_index.items()), columns=["Label", "Index"])

In [4]:
# Store Labels Mapping
from src.labels_utils import get_labels_dict

label_df = build_labels_map(data_entry_df)
label_df.to_csv(f"{TF_RECORD_DATASET}/label_mappings.csv", index=False)

labels_dict = get_labels_dict(f"{TF_RECORD_DATASET}/label_mappings.csv")
print(labels_dict)

{'Atelectasis': 0, 'Cardiomegaly': 1, 'Consolidation': 2, 'Edema': 3, 'Effusion': 4, 'Emphysema': 5, 'Fibrosis': 6, 'Hernia': 7, 'Infiltration': 8, 'Mass': 9, 'No Finding': 10, 'Nodule': 11, 'Pleural_Thickening': 12, 'Pneumonia': 13, 'Pneumothorax': 14}


In [5]:
from src.data_entry_utils import extract_patient_info, extract_image_info
from src.image_utils import get_image_bytes
from src.labels_utils import encode_labels
from src.tensorflow_utils import to_bytes_feature, to_int64_feature, to_float_feature

def create_tf_example(image_path, data_entry_df, bounding_boxes_df):
    image_index = os.path.basename(image_path)
    row = data_entry_df[data_entry_df["Image Index"] == image_index]
    
    if row.empty:
        print(f"Skipping: {image_index} not found in DataEntry.csv")
        return None

    image_bytes = get_image_bytes(image_path)
    if image_bytes is None:
        print(f"Skipping missing file: {image_path}")
        return None

    image_info = extract_image_info(row.iloc[0])
    patient_info = extract_patient_info(row.iloc[0])
    finding_labels = image_info["finding_labels"]
    encoded_labels = encode_labels(finding_labels, labels_dict)

    # bbox_data = extract_bbox_data(bounding_boxes_df, image_index)

    feature = {
        "image": to_bytes_feature(image_bytes),
        "image_index": to_bytes_feature(image_index.encode()),
        "finding_labels": to_int64_feature(encoded_labels),
        "finding_labels_array": tf.train.Feature(bytes_list=tf.train.BytesList(value=[l.encode() for l in finding_labels])),
        # "bbox_finding_labels": _int64_feature(bbox_data["encoded_bbox_labels"]),
        # "bbox_finding_labels_array": tf.train.Feature(bytes_list=tf.train.BytesList(value=[l.encode() for l in bbox_data["bbox_finding_labels"]])),
        # "x": _float_feature(bbox_data["x_coords"]),
        # "y": _float_feature(bbox_data["y_coords"]),
        # "w": _float_feature(bbox_data["widths"]),
        # "h": _float_feature(bbox_data["heights"]),
        "patient_id": to_int64_feature([patient_info["patient_id"]]),
        "patient_age": to_int64_feature([patient_info["patient_age"]]),
        "patient_gender": to_bytes_feature(patient_info["patient_gender"].encode()),
        "view_position": to_bytes_feature(image_info["view_position"].encode()),
        "image_width": to_float_feature([image_info["image_width"]]),
        "image_height": to_float_feature([image_info["image_height"]]),
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))

# def extract_bbox_data(bounding_boxes_df, image_index):
#     bbox = bounding_boxes_df[bounding_boxes_df["Image Index"] == image_index]
#     if not bbox.empty:
#         bbox_finding_labels = bbox["Finding Label"].apply(process_finding_labels)
#         bbox_finding_labels = sorted(set(label for sublist in bbox_finding_labels for label in sublist))
#         return {
#             "encoded_bbox_labels": encode_labels(bbox_finding_labels),
#             "bbox_finding_labels": bbox_finding_labels,
#             "x_coords": bbox["x"].tolist(),
#             "y_coords": bbox["y"].tolist(),
#             "widths": bbox["w"].tolist(),
#             "heights": bbox["h"].tolist()
#         }
#     return {
#         "encoded_bbox_labels": encode_labels(["No Finding"]),
#         "bbox_finding_labels": ["No Finding"],
#         "x_coords": [], "y_coords": [], "widths": [], "heights": []
#     }

# # Define TFRecord output file
# tfrecord_filename = "chest_xray_data.tfrecord"

# with tf.io.TFRecordWriter(tfrecord_filename) as writer:
#     for image_path, pixel_intensity, ssim_value in tqdm(zip(image_paths, pixel_intensities, ssim_results), total=len(image_paths)):
#         example = create_tf_example(image_path, pixel_intensity, ssim_value, data_entry_df, bounding_boxes_df)
#         if example is not None:
#             writer.write(example.SerializeToString())

# print(f"TFRecord saved as {tfrecord_filename}")

In [6]:
image_files = set()
for image in os.listdir(images_path):
    image_files.add(os.path.join(images_path, image))

In [7]:
for image_path in tqdm(image_files):
    tf_example = create_tf_example(image_path, data_entry_df, bounding_box_df)
    print(tf_example)
    break

  0%|                                                                                                                                                                                      | 0/102697 [00:00<?, ?it/s]

features {
  feature {
    key: "view_position"
    value {
      bytes_list {
        value: "PA"
      }
    }
  }
  feature {
    key: "patient_id"
    value {
      int64_list {
        value: 26771
      }
    }
  }
  feature {
    key: "patient_gender"
    value {
      bytes_list {
        value: "F"
      }
    }
  }
  feature {
    key: "patient_age"
    value {
      int64_list {
        value: 64
      }
    }
  }
  feature {
    key: "image"
    value {
      bytes_list {
        value: "\211PNG\r\n\032\n\000\000\000\rIHDR\000\000\004\000\000\000\004\000\010\000\000\000\000Zvt_\000\000\000\007tIME\007\340\t\005\000\000)\242\354\273\r\000\000 \000IDATx\234\}\333\222$I\256\033\000FV\365\314\356\221\354\230L/z\320\377\377\237L\322ng8\241\007\220\221\325\352\335\351KUef\204;\t\202 \235\301\377i\303m\000\006a6\t\300\004\346o\000\001 \177\020\006=\377k\022t\213\006dc^H\230\350\371q\034\201tk\336\206\266\320\000M\3004\010\3020-\023\340\201\0006H\360P\260A\013\246iSp.\306\240A\200&


