generated from Hochfrequenz/python_template_repository
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mig_ahb_matching.py
223 lines (212 loc) · 11.6 KB
/
mig_ahb_matching.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""
MAUS is the MIG AHB Utility stack.
This module contains methods to merge data from Message Implementation Guide and Anwendungshandbuch
"""
from itertools import groupby
from typing import List, Optional, Sequence, Set, Tuple
from more_itertools import first, first_true, last
from maus.models.anwendungshandbuch import _VERSION, AhbLine, DeepAnwendungshandbuch, FlatAnwendungshandbuch
from maus.models.edifact_components import (
DataElement,
DataElementFreeText,
DataElementValuePool,
EdifactStack,
Segment,
SegmentGroup,
ValuePoolEntry,
derive_data_type_from_segment_code,
)
from maus.models.message_implementation_guide import SegmentGroupHierarchy
from maus.navigation import AhbLocation, calculate_distance, determine_locations
from maus.reader.mig_reader import MigReader
def merge_lines_with_same_data_element(
ahb_lines: Sequence[AhbLine], first_stack: Optional[EdifactStack]
) -> DataElement:
"""
Merges lines that have the same data element into a single data element instance which is returned
"""
distinct_data_element_keys = {ahb_line.data_element for ahb_line in ahb_lines}
if len(distinct_data_element_keys) != 1:
raise ValueError(
"You must only use this function with lines that share the same data element but the "
f"parameter ahb_lines contains: {', '.join([x or '' for x in distinct_data_element_keys])} "
)
result: DataElement
discriminator = None
if first_stack is not None:
discriminator = first_stack.to_json_path()
if ahb_lines[0].value_pool_entry is not None:
result = DataElementValuePool(
discriminator=discriminator,
value_pool=[],
data_element_id=ahb_lines[0].data_element, # type:ignore[arg-type]
entered_input=None,
)
for data_element_value_entry in ahb_lines:
if not data_element_value_entry.ahb_expression:
# value pool entries with empty/None AHB expression shall not be included
# https://github.com/Hochfrequenz/mig_ahb_utility_stack/issues/38
continue
value_pool_entry = ValuePoolEntry(
qualifier=data_element_value_entry.value_pool_entry, # type:ignore[arg-type]
meaning=data_element_value_entry.name.strip(), # type:ignore[assignment,union-attr]
ahb_expression=data_element_value_entry.ahb_expression,
)
result.value_pool.append(value_pool_entry) # type:ignore[index]
else:
result = DataElementFreeText(
entered_input=None,
ahb_expression=ahb_lines[0].ahb_expression, # type:ignore[arg-type]
discriminator=discriminator,
data_element_id=first( # type:ignore[union-attr]
ahb_lines, lambda line: line.data_element is not None
).data_element, # type:ignore[arg-type]
)
# a free text field never spans more than 1 line
data_type = derive_data_type_from_segment_code(ahb_lines[0].segment_code) # type:ignore[arg-type]
if data_type is not None:
result.value_type = data_type
return result
def _remove_qualifier(location: AhbLocation) -> AhbLocation:
"""
returns a copy of location but with an empty qualifier
"""
result: AhbLocation = AhbLocation(
qualifier=None,
layers=location.layers,
data_element_id=location.data_element_id,
segment_code=location.segment_code,
)
return result
# I'm aware the function is too long; Let's first make it work, then split up into separate functions.
# pylint:disable=too-many-locals, too-many-branches, too-many-statements
# https://github.com/Hochfrequenz/mig_ahb_utility_stack/issues/205
def to_deep_ahb(
flat_ahb: FlatAnwendungshandbuch, segment_group_hierarchy: SegmentGroupHierarchy, mig_reader: MigReader
) -> DeepAnwendungshandbuch:
"""
Converts a flat ahb into a nested ahb using the provided segment hierarchy
"""
result = DeepAnwendungshandbuch(meta=flat_ahb.meta, lines=[])
result.meta.maus_version = _VERSION
parent_group_lists: List[List[SegmentGroup]] = []
used_stacks: Set[str] = set()
# The following lists are _not_ a view into the lines that are going to follow (hence no name starting with "next")
# Instead the lists are a reference to the respective attribute _inside_ the upper hierarchy element.
# So whenever we e.g. run into a data element, we can "remember" to where the data element shall be appended.
# The lists are our "reference" so that each element deeper into the hierarchy (e.g. data elements) knows to which
# parent (e.g. segment) it belongs.
# Note that the lists are never assigned to anything.
# + they're always instantiated empty
# + then we store a reference to the list in `append_next_foo_here`
# + when ever we find a foo downstream we append it to this list (and thereby assign it to its parent)
# + when ever we find a new parent element, this variable that holds the reference to the old (now previous) parent
# is replaced with a reference to the new parent to all the downstream elements to come.
append_next_segments_here: List[Segment] #: is instantiated/replaced whenever a new segment group is created
append_next_sg_here: List[SegmentGroup] #: is instantiated/replaced whenever a new segment group is created
append_next_data_elements_here: List[DataElement] #: is instantiated/replaced whenever a new segment is created
previous_position: AhbLocation
for position, layer_group in groupby(
determine_locations(segment_group_hierarchy, flat_ahb.lines),
key=lambda line_and_position: _remove_qualifier(line_and_position[1]),
):
layer_group: List[Tuple[AhbLine, AhbLocation]] = list(layer_group) # type:ignore[no-redef]
data_element_lines = [x[0] for x in layer_group] # index 1 is the position
if not any((True for line in data_element_lines if line.segment_code is not None)):
continue # section heading only
stack: EdifactStack
try:
if len(data_element_lines) == 1:
position = layer_group[0][1] # type:ignore[index]
stack = mig_reader.get_edifact_stack(position)
except ValueError:
# if the AHB/MIG matching does not work as expected, set your breakpoints here
stack = None # type:ignore[assignment]
if len(data_element_lines) > 1:
for _position in (x[1] for x in layer_group):
try:
stack = mig_reader.get_edifact_stack(layer_group[0][1]) # type:ignore[index]
break
except ValueError:
pass
if any((True for line in data_element_lines if line.data_element is not None)):
if not any((True for line in data_element_lines if line.ahb_expression is not None)):
# if none of the items is marked with an ahb expression it's probably not required in this AHB
continue
data_element = merge_lines_with_same_data_element(data_element_lines, first_stack=stack)
try:
append_next_data_elements_here.append(data_element) # pylint:disable=used-before-assignment
except UnboundLocalError as unbound_local_error:
raise ValueError(f"No segment has been created for {stack}") from unbound_local_error
else:
first_line = first(data_element_lines)
last_line = last(data_element_lines)
if (
first_line.segment_group_key == last(position.layers).segment_group_key
and last(position.layers).opening_segment_code == last_line.segment_code
and stack is not None
and stack.to_json_path() not in used_stacks
and first_line.ahb_expression is not None
):
# a new segment group has been opened
segment_group = SegmentGroup(
discriminator=stack.to_json_path(),
# type:ignore[arg-type] # might be None now, will be replaced later
ahb_expression=first_line.ahb_expression.strip(),
segments=[],
segment_groups=[],
ahb_line_index=first_line.index,
)
used_stacks.add(stack.to_json_path())
append_next_segments_here = segment_group.segments # type:ignore[assignment]
if segment_group.discriminator == '$["Dokument"][0]["Nachricht"][0]':
result.lines.append(segment_group)
append_next_sg_here = segment_group.segment_groups # type:ignore[assignment]
parent_group_lists.append(result.lines)
elif position.is_sub_location_of(previous_position): # pylint:disable=used-before-assignment
append_next_sg_here.append(segment_group)
parent_group_lists.append(append_next_sg_here)
append_next_sg_here = segment_group.segment_groups # type:ignore[assignment]
else:
distance = calculate_distance(previous_position, position)
for _ in range(0, distance.layers_up):
try:
append_next_sg_here = parent_group_lists.pop()
except IndexError as index_error:
raise ValueError(f"Couldn't move from {previous_position} to {position}") from index_error
for _ in range(0, distance.layers_down - 1):
parent_group_lists.append(append_next_sg_here)
append_next_sg_here = last(append_next_sg_here).segment_groups # type:ignore[assignment]
append_next_sg_here.append(segment_group)
parent_group_lists.append(append_next_sg_here)
append_next_sg_here = segment_group.segment_groups # type:ignore[assignment]
assert last_line.data_element is None
assert last_line.segment_code is not None
# these assertion are because we assume that the lines always come like this:
# Section Heading
# SGx Foo <-- a line with only the segment code but no actual content; this is where we're right now
first_expression_line: Optional[AhbLine] = first_true(
data_element_lines, default=None, pred=lambda l: l is not None and l.ahb_expression
)
if first_expression_line is None:
continue
first_expression: str = first_expression_line.ahb_expression # type:ignore[assignment]
discriminator: str
if stack is None:
# this is a pretty ugly hack. when we cannot find a position in the MIG, then we just stringify the
# position and by doing so hope, that the reader of the maus can use it somehow
discriminator = str(position)
else:
# this should be the default path
discriminator = stack.to_json_path()
segment = Segment(
discriminator=discriminator, # todo: sometimes the discriminator is not as sharp as it could have been
data_elements=[],
ahb_expression=first_expression,
section_name=first_line.section_name,
ahb_line_index=first_line.index,
)
append_next_data_elements_here = segment.data_elements
append_next_segments_here.append(segment)
previous_position = position
return result