In [1]:
#!pip install coffea

In [2]:
import os
import glob
import pickle
import itertools
import awkward as ak
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import tensorflow as tf

In [3]:
from coffea.nanoevents import NanoEventsFactory, PFNanoAODSchema
PFNanoAODSchema.warn_missing_crossrefs = False
import warnings

In [4]:
data_dir = '/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn'

In [5]:
in_dir = os.path.join(data_dir, 'raw/dev')
out_dir = os.path.join(data_dir, 'preprocessed/dev')

root_files = glob.glob(os.path.join(in_dir, '*.root'))
num_files = len(root_files)

In [6]:
try:
    os.makedirs(out_dir)
except FileExistsError:
    pass

In [7]:
events = NanoEventsFactory.from_root(os.path.join(data_dir, 'raw/dev/1.root'), schemaclass=PFNanoAODSchema).events()

In [8]:
all_jet_fields = list(filter(lambda field: 'IdxG' not in field, events.Jet.fields)) + ['log_pt']
all_pf_fields = list(filter(lambda field: 'IdxG' not in field, events.Jet.constituents.pf.fields)) + ['rel_eta', 'rel_phi', 'rel_pt']

all_jet_keys = [f'jet_{field}' for field in all_jet_fields]
all_pf_keys = [f'pf_{field}' for field in all_pf_fields]

In [9]:
def read_nanoaod(path):
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', message='found duplicate branch')
        events = NanoEventsFactory.from_root(path, schemaclass=PFNanoAODSchema).events()

    jets = events.Jet[(ak.count(events.Jet.matched_gen.pt, axis=1) >= 2)]

    sorted_jets = jets[ak.argsort(jets.matched_gen.pt, ascending=False, axis=1)]

    leading_jets = ak.concatenate((sorted_jets[:,0], sorted_jets[:,1]), axis=0)

    selected_jets = leading_jets[(leading_jets.matched_gen.pt > 30) & (abs(leading_jets.matched_gen.eta) < 5)]

    valid_jets = selected_jets[~ak.is_none(selected_jets.matched_gen.pt)]

    for field in ['dz', 'dzErr', 'd0', 'd0Err']:
        valid_jets = valid_jets[ak.all(valid_jets.constituents.pf[field] != np.inf, axis=1)]

    return valid_jets, valid_jets.constituents.pf

In [10]:
def preprocess(jet, pf):
    jet['target'] = jet.matched_gen.pt / jet.pt
    jet['log_pt'] = np.log(jet.pt)
    pf['rel_eta'] = (pf.eta - jet.eta) * np.sign(jet.eta)
    pf['rel_pt'] = pf.pt / jet.pt
    pf['rel_phi'] = (pf.phi - jet.phi + np.pi) % (2 * np.pi) - np.pi
    return jet, pf

In [11]:
def create_dataset(root_file, parquet_dir):
    print(parquet_dir + '\n')
    
    jet, pf = read_nanoaod(root_file)
    jet, pf = preprocess(jet, pf)
    
    try:
        os.makedirs(parquet_dir)
    except FileExistsError:
        pass
    
    ak.to_parquet(jet, os.path.join(parquet_dir, 'jet.parquet'))
    ak.to_parquet(pf, os.path.join(parquet_dir, 'pf.parquet'))

In [12]:
with ProcessPoolExecutor(max_workers=None) as executor:
    parquet_dirs = ['/'.join((path, str(index))) for index, path in enumerate(itertools.repeat(out_dir, num_files), start=1)]
    results = executor.map(create_dataset, root_files, parquet_dirs)

/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/1
/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/3
/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/2
/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/4




/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/5



In [13]:
def float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=value))

In [14]:
def int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

In [15]:
def serialize_example(jet, flat_pf, row_lengths):
    """
    Creates a tf.train.Example message ready to be written to a file.
    """
    # Create a dictionary mapping the feature name to the tf.train.Example-compatible
    # data type.
    
    jet_dict = {key: float_feature(value) for key, value in zip(all_jet_keys, ak.unzip(jet[all_jet_fields]))}
    pf_dict = {key: float_feature(value) for key, value in zip(all_pf_keys, ak.unzip(flat_pf[all_pf_fields]))}
    
    feature = {'row_lengths': int64_feature(row_lengths), 'target': float_feature(jet.target)}
    feature.update(jet_dict)
    feature.update(pf_dict)
    
    # Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

In [16]:
def create_record(root_file, record_file):
    print(record_file + '\n')
    
    jet, pf = read_nanoaod(root_file)
    jet, pf = preprocess(jet, pf)
    
    row_lengths = ak.num(pf, axis=1)
    flat_pf = ak.flatten(pf, axis=1)

    ex = serialize_example(jet, flat_pf, row_lengths)
    
    with tf.io.TFRecordWriter(record_file) as writer:
        writer.write(ex)

In [17]:
root_names = [os.path.basename(file) for file in root_files]
record_names = [f'{os.path.splitext(file)[0]}.tfrecords' for file in root_names]
record_files = [os.path.join(out_dir, record_name) for record_name in record_names]

In [18]:
with ProcessPoolExecutor(max_workers=None) as executor:
    results = executor.map(create_record, root_files, record_files)

/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/1.tfrecords
/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/3.tfrecords
/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/4.tfrecords

/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/2.tfrecords



/eos/cms/store/group/phys_jetmet/dholmber/jec-dnn/preprocessed/dev/5.tfrecords



In [42]:
features = {'row_lengths': tf.io.VarLenFeature(dtype=tf.int64), 'target': tf.io.VarLenFeature(dtype=tf.float32)}
for key in all_jet_keys:
    features[key] = tf.io.VarLenFeature(dtype=tf.float32)
for key in all_pf_keys:
    features[key] = tf.io.VarLenFeature(dtype=tf.float32)

with open(os.path.join(out_dir, 'metadata.pkl'), 'wb') as f:
    pickle.dump(features, f)

In [43]:
record_files = glob.glob(os.path.join(out_dir, '*.tfrecords'))

In [44]:
raw_ds = tf.data.TFRecordDataset(filenames=[record_files])

In [45]:
def parse_record(example_proto):
    return tf.io.parse_single_example(example_proto, features=features)

In [46]:
parsed_ds = raw_ds.map(parse_record)

In [47]:
jet_numerical = ['log_pt', 'eta', 'mass', 'phi', 'area', 'qgl_axis2', 'qgl_ptD', 'qgl_mult']
jet_categorical = ['puId', 'partonFlavour']

pf_numerical = ['rel_pt', 'rel_eta', 'rel_phi', 'd0', 'dz', 'd0Err', 'dzErr', 'trkChi2', 'vtxChi2', 'puppiWeight', 'puppiWeightNoLep']
pf_categorical = ['charge', 'lostInnerHits', 'pdgId', 'pvAssocQuality', 'trkQuality']

jet_fields = jet_numerical + jet_categorical
pf_fields = pf_numerical + pf_categorical

jet_keys = [f'jet_{field}' for field in jet_fields]
pf_keys = [f'pf_{field}' for field in pf_fields]

In [48]:
def select_features(data):
    jet_data = tf.concat([tf.expand_dims(data[key].values, axis=1) for key in jet_keys], axis=1)
    pf_data = tf.concat([tf.expand_dims(data[key].values, axis=1) for key in pf_keys], axis=1)
    pf_data = tf.RaggedTensor.from_row_lengths(pf_data, row_lengths=data['row_lengths'].values)
    
    inputs = (pf_data, jet_data)
    return inputs, data['target']

In [49]:
ds = parsed_ds.map(select_features)

In [50]:
ds

<MapDataset shapes: (((None, None, 16), (None, 10)), (None,)), types: ((tf.float32, tf.float32), tf.float32)>