-
Notifications
You must be signed in to change notification settings - Fork 81
/
Copy pathtypes.py
438 lines (355 loc) · 16.4 KB
/
types.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
from collections import OrderedDict
from collections.abc import Iterable
from contextlib import suppress
from .exceptions import FinTSNoResponseError
from .utils import SubclassesMixin
class Field:
def __init__(self, length=None, min_length=None, max_length=None, count=None, min_count=None, max_count=None, required=True, _d=None):
if length is not None and (min_length is not None or max_length is not None):
raise ValueError("May not specify both 'length' AND 'min_length'/'max_length'")
if count is not None and (min_count is not None or max_count is not None):
raise ValueError("May not specify both 'count' AND 'min_count'/'max_count'")
self.length = length
self.min_length = min_length
self.max_length = max_length
self.count = count
self.min_count = min_count
self.max_count = max_count
self.required = required
if not self.count and not self.min_count and not self.max_count:
self.count = 1
self.__doc__ = _d
def _default_value(self):
return None
def __get__(self, instance, owner):
if self not in instance._values:
self.__set__(instance, None)
return instance._values[self]
def __set__(self, instance, value):
if value is None:
if self.count == 1:
instance._values[self] = self._default_value()
else:
instance._values[self] = ValueList(parent=self)
else:
if self.count == 1:
value_ = self._parse_value(value)
self._check_value(value_)
else:
value_ = ValueList(parent=self)
for i, v in enumerate(value):
value_[i] = v
instance._values[self] = value_
def __delete__(self, instance):
self.__set__(instance, None)
def _parse_value(self, value):
raise NotImplementedError('Needs to be implemented in subclass')
def _render_value(self, value):
raise NotImplementedError('Needs to be implemented in subclass')
def _check_value(self, value):
with suppress(NotImplementedError):
self._render_value(value)
def _check_value_length(self, value):
if self.max_length is not None and len(value) > self.max_length:
raise ValueError("Value {!r} cannot be rendered: max_length={} exceeded".format(value, self.max_length))
if self.min_length is not None and len(value) < self.min_length:
raise ValueError("Value {!r} cannot be rendered: min_length={} not reached".format(value, self.min_length))
if self.length is not None and len(value) != self.length:
raise ValueError("Value {!r} cannot be rendered: length={} not satisfied".format(value, self.length))
def render(self, value):
if value is None:
return None
return self._render_value(value)
def _inline_doc_comment(self, value):
if self.__doc__:
d = self.__doc__.splitlines()[0].strip()
if d:
return " # {}".format(d)
return ""
class TypedField(Field, SubclassesMixin):
def __new__(cls, *args, **kwargs):
target_cls = None
fallback_cls = None
for subcls in cls._all_subclasses():
if getattr(subcls, 'type', '') is None:
fallback_cls = subcls
if getattr(subcls, 'type', None) == kwargs.get('type', None):
target_cls = subcls
break
if target_cls is None and fallback_cls is not None and issubclass(fallback_cls, cls):
target_cls = fallback_cls
retval = object.__new__(target_cls or cls)
return retval
def __init__(self, type=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.type = type or getattr(self.__class__, 'type', None)
class ValueList:
def __init__(self, parent):
self._parent = parent
self._data = []
def __getitem__(self, i):
if i >= len(self._data):
self.__setitem__(i, None)
if i < 0:
raise IndexError("Cannot access negative index")
return self._data[i]
def __setitem__(self, i, value):
if i < 0:
raise IndexError("Cannot access negative index")
if self._parent.count is not None:
if i >= self._parent.count:
raise IndexError("Cannot access index {} beyond count {}".format(i, self._parent.count))
elif self._parent.max_count is not None:
if i >= self._parent.max_count:
raise IndexError("Cannot access index {} beyound max_count {}".format(i, self._parent.max_count))
for x in range(len(self._data), i):
self.__setitem__(x, None)
if value is None:
value = self._parent._default_value()
else:
value = self._parent._parse_value(value)
self._parent._check_value(value)
if i == len(self._data):
self._data.append(value)
else:
self._data[i] = value
def __delitem__(self, i):
self.__setitem__(i, None)
def _get_minimal_true_length(self):
retval = 0
for i, val in enumerate(self._data):
if isinstance(val, Container):
if val.is_unset():
continue
elif val is None:
continue
retval = i + 1
return retval
def __len__(self):
if self._parent.count is not None:
return self._parent.count
else:
retval = self._get_minimal_true_length()
if self._parent.min_count is not None:
if self._parent.min_count > retval:
retval = self._parent.min_count
return retval
def __iter__(self):
for i in range(len(self)):
yield self[i]
def __repr__(self):
return "{!r}".format(list(self))
def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer="", print_doc=True, first_line_suffix=""):
import sys
stream = stream or sys.stdout
stream.write(
((prefix + level * indent) if first_level_indent else "")
+ "[{}\n".format(first_line_suffix)
)
min_true_length = self._get_minimal_true_length()
skipped_items = 0
for i, val in enumerate(self):
if i > min_true_length:
skipped_items += 1
continue
if print_doc:
docstring = self._parent._inline_doc_comment(val)
else:
docstring = ""
if not hasattr(getattr(val, 'print_nested', None), '__call__'):
stream.write(
(prefix + (level + 1) * indent) + "{!r},{}\n".format(val, docstring)
)
else:
val.print_nested(stream=stream, level=level + 2, indent=indent, prefix=prefix, trailer=",", print_doc=print_doc, first_line_suffix=docstring)
if skipped_items:
stream.write((prefix + (level + 1) * indent) + "# {} empty items skipped\n".format(skipped_items))
stream.write((prefix + level * indent) + "]{}\n".format(trailer))
class SegmentSequence:
"""A sequence of FinTS3Segment objects"""
def __init__(self, segments=None):
if isinstance(segments, bytes):
from .parser import FinTS3Parser
parser = FinTS3Parser()
data = parser.explode_segments(segments)
segments = [parser.parse_segment(segment) for segment in data]
self.segments = list(segments) if segments else []
def render_bytes(self) -> bytes:
from .parser import FinTS3Serializer
return FinTS3Serializer().serialize_message(self)
def __repr__(self):
return "{}.{}({!r})".format(self.__class__.__module__, self.__class__.__name__, self.segments)
def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer="", print_doc=True, first_line_suffix=""):
import sys
stream = stream or sys.stdout
stream.write(
((prefix + level * indent) if first_level_indent else "")
+ "{}.{}([".format(self.__class__.__module__, self.__class__.__name__)
+ first_line_suffix
+ "\n"
)
for segment in self.segments:
docstring = print_doc and segment.__doc__
if docstring:
docstring = docstring.splitlines()[0].strip()
if docstring:
docstring = " # {}".format(docstring)
else:
docstring = ""
segment.print_nested(stream=stream, level=level + 1, indent=indent, prefix=prefix, first_level_indent=True, trailer=",", print_doc=print_doc,
first_line_suffix=docstring)
stream.write((prefix + level * indent) + "]){}\n".format(trailer))
def find_segments(self, query=None, version=None, callback=None, recurse=True, throw=False):
"""Yields an iterable of all matching segments.
:param query: Either a str or class specifying a segment type (such as 'HNHBK', or :class:`~fints.segments.message.HNHBK3`), or a list or tuple of strings or classes.
If a list/tuple is specified, segments returning any matching type will be returned.
:param version: Either an int specifying a segment version, or a list or tuple of ints.
If a list/tuple is specified, segments returning any matching version will be returned.
:param callback: A callable that will be given the segment as its sole argument and must return a boolean indicating whether to return this segment.
:param recurse: If True (the default), recurse into SegmentSequenceField values, otherwise only look at segments in this SegmentSequence.
:param throw: If True, a FinTSNoResponseError is thrown if no result is found. Defaults to False.
The match results of all given parameters will be AND-combined.
"""
found_something = False
if query is None:
query = []
elif isinstance(query, str) or not isinstance(query, (list, tuple, Iterable)):
query = [query]
if version is None:
version = []
elif not isinstance(version, (list, tuple, Iterable)):
version = [version]
if callback is None:
callback = lambda s: True
for s in self.segments:
if ((not query) or any((isinstance(s, t) if isinstance(t, type) else s.header.type == t) for t in query)) and \
((not version) or any(s.header.version == v for v in version)) and \
callback(s):
yield s
found_something = True
if recurse:
for name, field in s._fields.items():
val = getattr(s, name)
if val and hasattr(val, 'find_segments'):
for v in val.find_segments(query=query, version=version, callback=callback, recurse=recurse):
yield v
found_something = True
if throw and not found_something:
raise FinTSNoResponseError(
'The bank\'s response did not contain a response to your request, please inspect debug log.'
)
def find_segment_first(self, *args, **kwargs):
"""Finds the first matching segment.
Same parameters as find_segments(), but only returns the first match, or None if no match is found."""
for m in self.find_segments(*args, **kwargs):
return m
return None
def find_segment_highest_version(self, query=None, version=None, callback=None, recurse=True, default=None):
"""Finds the highest matching segment.
Same parameters as find_segments(), but returns the match with the highest version, or default if no match is found."""
# FIXME Test
retval = None
for s in self.find_segments(query=query, version=version, callback=callback, recurse=recurse):
if not retval or s.header.version > retval.header.version:
retval = s
if retval is None:
return default
return retval
class ContainerMeta(type):
@classmethod
def __prepare__(metacls, name, bases):
return OrderedDict()
def __new__(cls, name, bases, classdict):
retval = super().__new__(cls, name, bases, classdict)
retval._fields = OrderedDict()
for supercls in reversed(bases):
if hasattr(supercls, '_fields'):
retval._fields.update((k, v) for (k, v) in supercls._fields.items())
retval._fields.update((k, v) for (k, v) in classdict.items() if isinstance(v, Field))
return retval
class Container(metaclass=ContainerMeta):
def __init__(self, *args, **kwargs):
init_values = OrderedDict()
additional_data = kwargs.pop("_additional_data", [])
for init_value, field_name in zip(args, self._fields):
init_values[field_name] = init_value
args = ()
for field_name in self._fields:
if field_name in kwargs:
if field_name in init_values:
raise TypeError("__init__() got multiple values for argument {}".format(field_name))
init_values[field_name] = kwargs.pop(field_name)
super().__init__(*args, **kwargs)
self._values = {}
self._additional_data = additional_data
for k, v in init_values.items():
setattr(self, k, v)
@classmethod
def naive_parse(cls, data):
if data is None:
raise TypeError("No data provided")
retval = cls()
for ((name, field), value) in zip(retval._fields.items(), data):
setattr(retval, name, value)
return retval
def is_unset(self):
for name in self._fields.keys():
val = getattr(self, name)
if isinstance(val, Container):
if not val.is_unset():
return False
elif val is not None:
return False
return True
@property
def _repr_items(self):
for name, field in self._fields.items():
val = getattr(self, name)
if not field.required:
if isinstance(val, Container):
if val.is_unset():
continue
elif isinstance(val, ValueList):
if len(val) == 0:
continue
elif val is None:
continue
yield (name, val)
if self._additional_data:
yield ("_additional_data", self._additional_data)
def __repr__(self):
return "{}.{}({})".format(
self.__class__.__module__,
self.__class__.__name__,
", ".join(
"{}={!r}".format(name, val) for (name, val) in self._repr_items
)
)
def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer="", print_doc=True, first_line_suffix=""):
"""Structured nested print of the object to the given stream.
The print-out is eval()able to reconstruct the object."""
import sys
stream = stream or sys.stdout
stream.write(
((prefix + level * indent) if first_level_indent else "")
+ "{}.{}(".format(self.__class__.__module__, self.__class__.__name__)
+ first_line_suffix
+ "\n"
)
for name, value in self._repr_items:
val = getattr(self, name)
if print_doc and not name.startswith("_"):
docstring = self._fields[name]._inline_doc_comment(val)
else:
docstring = ""
if not hasattr(getattr(val, 'print_nested', None), '__call__'):
stream.write(
(prefix + (level + 1) * indent) + "{} = {!r},{}\n".format(name, val, docstring)
)
else:
stream.write(
(prefix + (level + 1) * indent) + "{} = ".format(name)
)
val.print_nested(stream=stream, level=level + 2, indent=indent, prefix=prefix, first_level_indent=False, trailer=",", print_doc=print_doc,
first_line_suffix=docstring)
stream.write((prefix + level * indent) + "){}\n".format(trailer))