## Workspace setup

In [1]:
from multiprocessing import Process, Queue
import tensorflow as tf
import numpy as np
from os.path import isfile
import io_functions as io


from tensorflow.data import Dataset, TFRecordDataset
from tensorflow.io import TFRecordWriter, TFRecordOptions
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Example, Features, Feature

from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

2023-11-21 15:39:56.562711: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## TFRecord creation

In [2]:
def _bytes_feature(value):
  """Returns a bytes_list from a string / byte."""
  if isinstance(value, type(tf.constant(0))):
    value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def serialize(charge_array, target):
  feature = {'myChargeArray' : _bytes_feature(tf.io.serialize_tensor(charge_array)),
             'target' : _bytes_feature(tf.io.serialize_tensor(target))}
  example = tf.train.Example(features=tf.train.Features(feature=feature))
  return example.SerializeToString()

def conversion(filename, queue):
    options = TFRecordOptions(compression_type='GZIP')
    writer = TFRecordWriter(filename, options=options)
    while True:
        item = queue.get()
        if item == None:
            break
        charge_array, target = item
        charge_array= io.proc_features(charge_array)
        
        example = serialize(charge_array, target)
        writer.write(example)

In [3]:
dataPath = '/scratch/szslaw/'
input_files = [dataPath+'out_random_sigma2k2mm.root:TPCData']
batchSize = 5
nFiles = 5 # number of output files, equal to number of processes

datasetGenerator = io.minimal_generator(files=input_files, batchSize=batchSize)

In [5]:
%%time

output_files = [dataPath + 'test/' + f"out_random_sigma2k2mm-part-{i}.tfrecord" for i in range(nFiles)]

for file in output_files:
    if isfile(file):
        raise Exception('output file already exists')

if __name__ == '__main__':
    processes = []
    q = Queue(2*nFiles)
    
    for name in output_files:
        p = Process(target=conversion, args=(name, q))
        processes.append(p)
        p.start()
        print(p.name + ' started')

    counter = 0
    for item in datasetGenerator:
        q.put(item)
        counter+=1
        if counter%100 == 0:
            print(f'read {counter} batches')

    for _ in range(nFiles):
        q.put(None)
    
    for p in processes:
        p.join()
        print(p.name + ' done')

Process-1 started
Process-2 started
Process-3 started
Process-4 started
Process-5 started


2023-11-21 15:41:24.592641: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:268] failed call to cuInit: UNKNOWN ERROR (34)
2023-11-21 15:41:24.748360: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:268] failed call to cuInit: UNKNOWN ERROR (34)
2023-11-21 15:41:24.898134: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:268] failed call to cuInit: UNKNOWN ERROR (34)
2023-11-21 15:41:24.964491: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:268] failed call to cuInit: UNKNOWN ERROR (34)
2023-11-21 15:41:25.081353: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:268] failed call to cuInit: UNKNOWN ERROR (34)


read 100 batches
read 200 batches
read 300 batches
read 400 batches
Process-1 done
Process-2 done
Process-3 done
Process-4 done
Process-5 done
CPU times: user 34 s, sys: 19.8 s, total: 53.9 s
Wall time: 45.3 s


### Read TFRecord

In [7]:
filenames = [dataPath +"folder/"+ f"out_random_sigma-001-part-{i}.tfrecord" for i in range(5)]
train_dataset = tf.data.TFRecordDataset(filenames, compression_type='GZIP', num_parallel_reads=5)
test_dataset = tf.data.TFRecordDataset(dataPath +"folder/"+'out_random_sigma2k2mm.tfrecord', compression_type='GZIP')
# Create a description of the features.
feature_description = {
    'myChargeArray': tf.io.FixedLenFeature([], tf.string),
    'target': tf.io.FixedLenFeature([], tf.string),

}

def _parse_function(example_proto):
  # Parse the input `tf.train.Example` proto using the dictionary above.
    parsed_features = tf.io.parse_single_example(example_proto, feature_description)
    charge, target = parsed_features['myChargeArray'], parsed_features['target']
    # decode from bytes
    charge = tf.io.parse_tensor(charge, tf.float64)
    target = tf.io.parse_tensor(target, tf.float64)
    
    return charge, target

def shape_items(charge, target):
    charge.set_shape(charge_shape)
    target.set_shape(target_shape)
    return charge, target

train_dataset = train_dataset.map(_parse_function, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.map(_parse_function, num_parallel_calls=tf.data.AUTOTUNE)

train_dataset = train_dataset.unbatch()
test_dataset = test_dataset.unbatch()

train_dataset = train_dataset.batch(batchSize)
test_dataset = test_dataset.batch(batchSize)

for charge, target in train_dataset.take(1):
    charge_shape = charge.shape
    target_shape = target.shape

    train_dataset = train_dataset.map(shape_items)
    test_dataset = test_dataset.map(shape_items)