Skip to content

Commit

Permalink
Merge pull request #1079 from alexander-held/feat/preprocessing-tree-…
Browse files Browse the repository at this point in the history
…reduction

feat: use tree reduction to aggregate files in preprocessing
  • Loading branch information
lgray committed Apr 15, 2024
2 parents b8d296c + 7bfdcc1 commit 92adea6
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion src/coffea/dataset_tools/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,20 @@ def preprocess(
dak_norm_files = dask_awkward.from_awkward(
ak_norm_files, math.ceil(len(ak_norm_files) / files_per_batch)
)
files_to_preprocess[name] = dask_awkward.map_partitions(

concat_fn = partial(
awkward.concatenate,
axis=0,
)

split_every = 8

files_trl_label = f"preprocess-{name}"
files_trl_token = dask.base.tokenize(dak_norm_files, concat_fn, split_every)
files_trl_name = f"{files_trl_label}-{files_trl_token}"
files_trl_tree_node_name = f"{files_trl_label}-tree-node-{files_trl_token}"

files_part = dask_awkward.map_partitions(
get_steps,
dak_norm_files,
step_size=step_size,
Expand All @@ -325,6 +338,29 @@ def preprocess(
save_form=save_form,
step_size_safety_factor=step_size_safety_factor,
uproot_options=uproot_options,
meta=dask_awkward.lib.core.empty_typetracer(),
)

files_trl = dask_awkward.layers.layers.AwkwardTreeReductionLayer(
name=files_trl_name,
name_input=files_part.name,
npartitions_input=files_part.npartitions,
concat_func=concat_fn,
tree_node_func=lambda x: x,
finalize_func=lambda x: x,
split_every=split_every,
tree_node_name=files_trl_tree_node_name,
)

files_graph = dask.highlevelgraph.HighLevelGraph.from_collections(
files_trl_name, files_trl, dependencies=[files_part]
)

files_to_preprocess[name] = dask_awkward.lib.core.new_array_object(
files_graph,
files_trl_name,
meta=dask_awkward.lib.core.empty_typetracer(),
npartitions=len(files_trl.output_partitions),
)

(all_processed_files,) = dask.compute(files_to_preprocess, scheduler=scheduler)
Expand Down

0 comments on commit 92adea6

Please sign in to comment.