# N06 - TFRecordsDataset Creation

- To meet the memory demand of point cloud dataset for large number of samples, the dataset has to be loaded lazily
- The `TFRecord` format is a simple format for storing a sequence of binary records with native support from tensorflow

In [1]:
import os
import re
import glob

from aliad.data import PointCloudDataset

input_dir = "/pscratch/sd/c/chlcheng/dataset/anomaly_detection/LHC_Olympics_2020/LHCO_RnD_qq/col_based"
regex = re.compile("point_cloud_W_qq_(?P<m1>[0-9]*)_(?P<m2>[0-9]*).parquet")
files = glob.glob(os.path.join(input_dir, "SR_*.parquet"))
filenames = {}
class_labels = {0: [], 1: []}
aux_features = {}
for file in files:
    if 'official' in file:
        continue
    match = regex.search(file)
    if not match:
        continue
    m1, m2 = match.group('m1'), match.group('m2')
    key = f"W_{m1}_{m2}"
    filenames[key] = file
    aux_features[key] = {"m1": float(m1), "m2": float(m2)}
    class_labels[1].append(key)
filenames['QCD'] = os.path.join(input_dir, "SR_point_cloud_QCD_qq.parquet")
filenames['extra_QCD'] = os.path.join(input_dir, "SR_point_cloud_extra_QCD_qq.parquet")
aux_features['QCD']       = {"m1": 0, "m2": 0}
aux_features['extra_QCD'] = {"m1": 0, "m2": 0}
class_labels[0] = ['QCD', 'extra_QCD']

2023-11-15 12:35:33.409533: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
import awkward as ak
import tensorflow as tf

print(f'awkward version     : {ak.__version__}')
print(f'tensorflow version  : {tf.__version__}')

awkward version     : 2.4.6
tensorflow version  : 2.12.1


In [3]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')
#if len(physical_devices) > 0:
#    tf.config.experimental.set_memory_growth(physical_devices[0], True)
physical_devices

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [4]:
!nvidia-smi

Wed Nov 15 12:35:41 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA A100-PCI...  On   | 00000000:C3:00.0 Off |                    0 |
| N/A   37C    P0    43W / 250W |  37522MiB / 40960MiB |     99%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [5]:
from aliad.data import PointCloudDataset

In [6]:
feature_dict = {
        "part_coords"   : ["part_delta_eta", "part_delta_phi"],
        "part_features" : ["part_pt", "part_delta_eta", "part_delta_phi", "part_delta_R"],
        "jet_features"  : ["jet_pt", "jet_eta", "jet_phi", "jet_m", "N", "tau12", "tau23"] 
}
dataset = PointCloudDataset(class_labels=class_labels, feature_dict=feature_dict,
                            num_jets=2, pad_size=300, shuffle=False)

In [7]:
output_dir = "/pscratch/sd/c/chlcheng/dataset/anomaly_detection/LHC_Olympics_2020/LHCO_RnD_qq/tfrecords"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
outname = os.path.join(output_dir, 'SR_point_cloud_train_features_all_signals_all_backgrounds.tfrec')

In [8]:
dataset.convert_to_tfrecords(filenames, outname=outname, aux_features=aux_features)

[INFO] Created TFRecordWriter for the output "/pscratch/sd/c/chlcheng/dataset/anomaly_detection/LHC_Olympics_2020/LHCO_RnD_qq/tfrecords/SR_point_cloud_train_features_all_signals_all_backgrounds_new.tfrec"
[INFO] Writing to tfrecord for the sample "QCD"
[INFO] Loading dataset from "/pscratch/sd/c/chlcheng/dataset/anomaly_detection/LHC_Olympics_2020/LHCO_RnD_qq/col_based/SR_point_cloud_QCD_qq.parquet"
[INFO] Preparing data for the sample "QCD" (class = 0)
[INFO] Size of sample data: 121351
[INFO] Working on feature type "part_coords"
[INFO] Jet index: 1
[INFO] Loading data for the feature "part_delta_eta"
[INFO] Loading data for the feature "part_delta_phi"
[INFO] Jet index: 2
[INFO] Loading data for the feature "part_delta_eta"
[INFO] Loading data for the feature "part_delta_phi"
[INFO] Working on feature type "part_features"
[INFO] Jet index: 1
[INFO] Loading data for the feature "part_pt"
[INFO] Loading data for the feature "part_delta_eta"
[INFO] Loading data for the feature "part_de

## Load TFRecord Dataset

First we check the (custom made) metadata of the tfrecord dataset

In [9]:
import json
from aliad.interface.tensorflow.dataset import get_ndarray_tfrecord_example_parser

metadata_filename = f'{os.path.splitext(outname)[0]}_metadata.json'
metadata = json.load(open(metadata_filename))

In [10]:
metadata['features']

{'part_coords': {'shape': [2, 300, 2], 'dtype': 'float64'},
 'part_features': {'shape': [2, 300, 4], 'dtype': 'float64'},
 'jet_features': {'shape': [2, 7], 'dtype': 'float64'},
 'part_masks': {'shape': [2, 300], 'dtype': 'bool'},
 'label': {'shape': [1], 'dtype': 'int64'},
 'weight': {'shape': [1], 'dtype': 'float64'}}

In [11]:
metadata['sample_size']

{'QCD': 121351,
 'extra_QCD': 612853,
 'W_50_450': 72728,
 'W_400_50': 44016,
 'W_500_600': 59319,
 'W_500_250': 68873,
 'W_350_500': 69686,
 'W_250_450': 72477,
 'W_450_350': 70912,
 'W_600_100': 48551,
 'W_50_100': 76327,
 'W_450_500': 67605,
 'W_100_400': 74261,
 'W_450_200': 70539,
 'W_600_300': 62038,
 'W_50_150': 76482,
 'W_500_500': 65754,
 'W_300_250': 74743,
 'W_450_600': 61071,
 'W_50_300': 75557,
 'W_200_500': 71231,
 'W_150_200': 76421,
 'W_500_50': 33154,
 'W_50_50': 75286,
 'W_150_300': 75953,
 'W_100_300': 75777,
 'W_400_450': 70527,
 'W_400_150': 70112,
 'W_100_500': 71493,
 'W_350_50': 49591,
 'W_250_150': 74473,
 'W_100_450': 73157,
 'W_350_250': 73947,
 'W_600_450': 60922,
 'W_150_400': 74239,
 'W_150_50': 70476,
 'W_50_500': 70855,
 'W_200_550': 68501,
 'W_250_600': 64278,
 'W_100_100': 76373,
 'W_400_550': 66384,
 'W_200_200': 75929,
 'W_150_600': 64707,
 'W_150_450': 72899,
 'W_300_500': 70313,
 'W_500_100': 58176,
 'W_550_50': 27970,
 'W_300_400': 73486,
 'W_100_

In [15]:
from tensorflow.keras import Input
Inputs = {}
for label, feature_metadata  in metadata['features'].items():
    if feature_metadata['dtype'] == 'float64':
        feature_metadata['dtype'] = 'float32'
    Inputs[label] = Input(name=label, **feature_metadata)

In [16]:
from aliad.interface.tensorflow.dataset import get_ndarray_tfrecord_example_parser
parse_tfrecord_fn = get_ndarray_tfrecord_example_parser(**Inputs)

In [18]:
# load back the dataset
def prepare_sample(features):
    return (features['part_coords'], features['part_features'], features['jet_features'], features['part_masks']), features['label']
batchsize = 32    
ds = (tf.data.TFRecordDataset([outname], num_parallel_reads=tf.data.AUTOTUNE)
      .map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
      .map(prepare_sample, num_parallel_calls=tf.data.AUTOTUNE)
      .shuffle(batchsize * 10)
      .batch(batchsize)
      .prefetch(tf.data.AUTOTUNE)
     )

In [19]:
ds_test = [d for d in ds.take(10)]

2023-11-15 12:49:27.583462: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
2023-11-15 12:49:27.584042: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]


In [23]:
# first event
ds_test[0]

((<tf.Tensor: shape=(32, 2, 300, 2), dtype=float32, numpy=
  array([[[[-0.896394  , -0.32757178],
           [-0.86130214, -0.2166814 ],
           [ 0.10729363, -0.82305145],
           ...,
           [ 0.        ,  0.        ],
           [ 0.        ,  0.        ],
           [ 0.        ,  0.        ]],
  
          [[ 0.12015291,  0.8678042 ],
           [ 0.2174759 ,  0.82853776],
           [ 0.7607871 , -0.33233893],
           ...,
           [ 0.        ,  0.        ],
           [ 0.        ,  0.        ],
           [ 0.        ,  0.        ]]],
  
  
         [[[-0.49549302, -0.8282139 ],
           [-0.5323343 , -0.5155043 ],
           [ 0.09060492,  0.7136734 ],
           ...,
           [ 0.        ,  0.        ],
           [ 0.        ,  0.        ],
           [ 0.        ,  0.        ]],
  
          [[ 0.30184928, -0.12075301],
           [ 0.3024808 ,  0.11285213],
           [ 0.30596673, -0.05456193],
           ...,
           [ 0.        ,  0.        ],
   

## Sharding TFRecordDataset

- For best I/O performance, we split the dataset into multiple files (shards)

In [36]:
# notice that the data are not well shuffled, we need to first shuffle it manually
ds = (tf.data.TFRecordDataset([outname], num_parallel_reads=tf.data.AUTOTUNE)
      .map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
      .map(prepare_sample, num_parallel_calls=tf.data.AUTOTUNE))

In [None]:
y_targets = [target.numpy() for _, target in iter(ds)]
#X_indices = np.arange(len(y_targets))

2023-11-15 13:00:09.685507: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
2023-11-15 13:00:09.685711: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]


In [35]:
y_targets.shape

(200000, 1)

In [25]:
import numpy as np
y_targets = np.array([target.numpy() for _, target in iter(ds)])
X_indices = np.arange(len(y_targets))

2023-11-15 12:54:03.019107: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
2023-11-15 12:54:03.019398: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (328387,) + inhomogeneous part.

In [None]:
def print_target_counts(y):
    _, y_counts = np.unique(y, return_counts=True)
    y_total = len(y)
    y_0_count = y_counts[0]
    y_1_count = y_counts[1]
    y_1_percent = y_1_count / y_total * 100.0
    print("Total={1:5d}, 0={2:5d}, 1={3:3d}, ratio of 1={4:.2f}%".format(
        label, y_total, y_0_count, y_1_count, y_1_percent))
print_target_counts(y_targets)

In [None]:
num_shards = 200
signal_size = 

In [None]:
from sklearn.model_selection import train_test_split

# Split the generated indices and target values by train_test_split().
# The ratio of target values should be kept in the splitted datasets.
X_train_indices, X_val_indices, y_train_targets, y_val_targets = train_test_split(
    X_indices, y_targets, test_size=0.1, stratify=y_targets, random_state=53)

print_target_counts(y_train_targets, "Training")
print_target_counts(y_val_targets, "Validation")

In [None]:
def get_selected_dataset(ds, X_indices_np):
    # Make a tensor of type tf.int64 to match the one by Dataset.enumerate(). 
    X_indices_ts = tf.constant(X_indices_np, dtype=tf.int64)
    
    def is_index_in(index, rest):
        # Returns True if the specified index value is included in X_indices_ts.
        #
        # '==' compares the specified index value with each values in X_indices_ts.
        # The result is a boolean tensor, looks like [ False, True, ..., False ].
        # reduce_any() returns Ture if True is included in the specified tensor.
        return tf.math.reduce_any(index == X_indices_ts)
    
    def drop_index(index, rest):
        return rest

    # Dataset.enumerate() is similter to Python's enumerate().
    # The method adds indices to each elements. Then, the elements are filtered
    # by using the specified indices. Finally unnecessary indices are dropped.
    selected_ds = ds \
        .enumerate() \
        .filter(is_index_in) \
        .map(drop_index)
    return selected_ds

In [None]:
splitted_train_ds = get_selected_dataset(vanilla_training_ds, X_train_indices)
splitted_val_ds = get_selected_dataset(vanilla_training_ds, X_val_indices)

print(splitted_train_ds)
print(splitted_val_ds)