-
Notifications
You must be signed in to change notification settings - Fork 120
/
test_taskvine.py
executable file
·80 lines (64 loc) · 2.15 KB
/
test_taskvine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import hist.dask as hda
import pytest
from coffea.nanoevents import NanoAODSchema, NanoEventsFactory
def histogram_common():
# The opendata files are non-standard NanoAOD, so some optional data columns are missing
NanoAODSchema.warn_missing_crossrefs = False
# "file:/tmp/Run2012B_SingleMu.root",
events = NanoEventsFactory.from_root(
{os.path.abspath("tests/samples/nano_dy.root"): "Events"},
steps_per_file=4,
metadata={"dataset": "SingleMu"},
).events()
q1_hist = (
hda.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
.Double()
.fill(events.MET.pt)
)
return q1_hist
def test_taskvine_local_env():
try:
from ndcctools.taskvine import DaskVine, Factory
except ImportError:
print("taskvine is not installed. Omitting test.")
return
m = DaskVine(port=0)
workers = Factory(manager=m, batch_type="local")
workers.min_workers = 1
workers.max_workers = 1
workers.cores = 1
q1_hist = histogram_common()
with workers:
result = q1_hist.compute(
scheduler=m.get, resources={"cores": 1}, resources_mode=None
)
assert result.sum() == 40.0
@pytest.mark.skipif(
"'CONDA_PREFIX' not in os.environ",
reason="test needs a conda environment with coffea and ndcctools",
)
def test_taskvine_remote_env():
try:
from ndcctools.poncho import package_create
from ndcctools.taskvine import DaskVine, Factory
except ImportError:
print("taskvine is not installed. Omitting test.")
return
env_filename = "vine-env.tar.gz"
package_create.pack_env(os.environ["CONDA_PREFIX"], env_filename)
m = DaskVine(port=0)
env = m.declare_poncho(env_filename, cache=True)
workers = Factory(manager=m, batch_type="local")
workers.min_workers = 1
workers.max_workers = 1
workers.cores = 1
q1_hist = histogram_common()
with workers:
result = q1_hist.compute(
scheduler=m.get,
resources={"cores": 1},
resources_mode=None,
environment=env,
)
assert result.sum() == 40.0