-
Notifications
You must be signed in to change notification settings - Fork 120
/
test_dask_pandas.py
52 lines (37 loc) · 1.71 KB
/
test_dask_pandas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from __future__ import print_function, division
from coffea import processor
import pytest
def do_dask_pandas_job(client, filelist):
from coffea.processor.test_items import NanoTestProcessorPandas
from coffea import nanoevents
executor = processor.DaskExecutor(client=client, use_dataframes=True)
run = processor.Runner(executor=executor, schema=nanoevents.NanoAODSchema)
output = run(filelist, "Events", processor_instance=NanoTestProcessorPandas())
# Can save to Parquet straight from distributed DataFrame without explicitly collecting the outputs:
#
# import dask.dataframe as dd
# dd.to_parquet(df=output, path=/output/path/)
#
#
# It's also possible to do some operations on distributed DataFrames without collecting them.
# For example, split the dataframe by column value and save to different directories:
#
# dd.to_parquet(df=output[output.dataset=='ZJets'], path=/output/path/ZJets/)
# dd.to_parquet(df=output[output.dataset=='Data'], path=/output/path/Data/)
# Alternatively, can continue working with output.
# Convert from Dask DataFrame back to Pandas:
output = output.compute()
assert output[output.dataset == "ZJets"].shape[0] == 6
assert output[output.dataset == "Data"].shape[0] == 18
# print(output)
def test_dask_pandas_job():
distributed = pytest.importorskip("distributed", minversion="2.6.0")
client = distributed.Client(dashboard_address=None)
import os
import os.path as osp
filelist = {
"ZJets": [osp.join(os.getcwd(), "tests/samples/nano_dy.root")],
"Data": [osp.join(os.getcwd(), "tests/samples/nano_dimuon.root")],
}
do_dask_pandas_job(client, filelist)
client.close()