-
Notifications
You must be signed in to change notification settings - Fork 120
/
base.py
133 lines (119 loc) · 4.77 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from coffea.nanoevents import transforms
from coffea.nanoevents.util import concat, quote
def listarray_form(content, offsets):
if offsets["class"] != "NumpyArray":
raise ValueError
if offsets["primitive"] == "int32":
arrayclass = "ListOffsetArray"
offsetstype = "i32"
elif offsets["primitive"] == "int64":
arrayclass = "ListOffsetArray"
offsetstype = "i64"
else:
raise ValueError("Unrecognized offsets data type")
return {
"class": arrayclass,
"offsets": offsetstype,
"content": content,
"form_key": concat(offsets["form_key"], "!skip"),
}
def zip_forms(forms, name, record_name=None, offsets=None, bypass=False):
if not isinstance(forms, dict):
raise ValueError("Expected a dictionary")
if all(form["class"].startswith("ListOffsetArray") for form in forms.values()):
first = next(iter(forms.values()))
if not all(form["class"] == first["class"] for form in forms.values()):
print(
tuple((name, form["class"]) for name, form in forms.items()),
first["class"],
)
raise ValueError
if not all(form["offsets"] == first["offsets"] for form in forms.values()):
print(
tuple((name, form["offsets"]) for name, form in forms.items()),
first["offsets"],
)
raise ValueError
record = {
"class": "RecordArray",
"fields": [k for k in forms.keys()],
"contents": [form["content"] for form in forms.values()],
"form_key": quote("!invalid," + name),
}
if record_name is not None:
record["parameters"] = {"__record__": record_name}
if offsets is None:
return {
"class": first["class"],
"offsets": first["offsets"],
"content": record,
"form_key": first["form_key"],
}
else:
return listarray_form(record, offsets)
elif all(form["class"] == "NumpyArray" for form in forms.values()):
record = {
"class": "RecordArray",
"fields": [key for key in forms.keys()],
"contents": [value for value in forms.values()],
"form_key": quote("!invalid," + name),
}
if record_name is not None:
record["parameters"] = {"__record__": record_name}
return record
# elif all(form["class"] in [ "RecordArray", "NumpyArray", "ListOffsetArray"] for form in forms.values()):
elif all("class" in form for form in forms.values()) and not bypass:
record = {
"class": "RecordArray",
"fields": [key for key in forms.keys()],
"contents": [value for value in forms.values()],
"form_key": quote("!invalid," + name),
}
if record_name is not None:
record["parameters"] = {"__record__": record_name}
return record
else:
raise NotImplementedError("Cannot zip forms")
def nest_jagged_forms(parent, child, counts_name, name):
"""Place child listarray inside parent listarray as a double-jagged array"""
if not parent["class"].startswith("ListOffsetArray"):
raise ValueError
if parent["content"]["class"] != "RecordArray":
raise ValueError
if not child["class"].startswith("ListOffsetArray"):
raise ValueError
counts_idx = parent["content"]["fields"].index(counts_name)
counts = parent["content"]["contents"][counts_idx]
offsets = transforms.counts2offsets_form(counts)
inner = listarray_form(child["content"], offsets)
parent["content"]["fields"].append(name)
parent["content"]["contents"].append(inner)
class BaseSchema:
"""Base schema builder
The basic schema is essentially unchanged from the original ROOT file.
A top-level `base.NanoEvents` object is returned, where each original branch
form is accessible as a direct descendant.
"""
__dask_capable__ = True
def __init__(self, base_form, *args, **kwargs):
params = dict(base_form.get("parameters", {}))
params["__record__"] = "NanoEvents"
if "metadata" in params and params["metadata"] is None:
params.pop("metadata")
params.setdefault("metadata", {})
self._form = {
"class": "RecordArray",
"fields": base_form["fields"],
"contents": base_form["contents"],
"parameters": params,
"form_key": None,
}
@property
def form(self):
"""Awkward form of this schema"""
return self._form
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import base
return base.behavior