-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathgenerator.py
276 lines (236 loc) · 9.92 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import re
from typing import Any, Callable, List, Optional, Pattern, Union
from .dynamic_typing import (
ComplexType,
DDict,
DList,
DOptional,
DUnion,
MetaData,
ModelPtr,
Null,
SingleType,
StringLiteral,
StringSerializable,
StringSerializableRegistry,
Unknown,
registry,
)
_static_types = {float, bool, int}
class MetadataGenerator:
CONVERTER_TYPE = Optional[Callable[[str], Any]]
def __init__(
self,
str_types_registry: StringSerializableRegistry = None,
dict_keys_regex: List[Union[Pattern, str]] = None,
dict_keys_fields: List[str] = None
):
"""
:param str_types_registry: StringSerializableRegistry instance. Default registry will be used if None passed .
:param dict_keys_regex: List of RegExpressions (compiled or not).
If all keys of some dict are match one of them then this dict will be marked as dict field
but not nested model.
:param dict_keys_fields: List of model fields names that will be marked as dict field
"""
self.str_types_registry = str_types_registry if str_types_registry is not None else registry
self.dict_keys_regex = [re.compile(r) for r in dict_keys_regex] if dict_keys_regex else []
self.dict_keys_fields = set(dict_keys_fields or ())
def generate(self, *data_variants: dict) -> dict:
"""
Convert given list of data variants to metadata dict
"""
fields_sets = [self._convert(data) for data in data_variants]
fields = self.merge_field_sets(fields_sets)
return self.optimize_type(fields)
def _convert(self, data: dict):
"""
Key and string value converting
"""
fields = {}
for key, value in data.items():
if not isinstance(key, str):
raise TypeError(f'You are probably using a parser that is not JSON compatible and have data with some {type(key)}s as dict keys. '
f'This is not supported.\n'
f'Context: {data}\n'
f'(If you are parsing yaml, try replacing PyYaml with ruamel.yaml)')
convert_dict = key not in self.dict_keys_fields
fields[key] = self._detect_type(value, convert_dict)
return fields
def _detect_type(self, value, convert_dict=True) -> MetaData:
"""
Converts json value to metadata
"""
# Simple types
t = type(value)
if t in _static_types:
return t
# List trying to yield nested type
elif t is list:
if value:
types = [self._detect_type(item) for item in value]
if len(types) > 1:
union = DUnion(*types)
if len(union.types) == 1:
return DList(*union.types)
return DList(union)
else:
return DList(*types)
else:
return DList(Unknown)
# Dict should be processed as another model if convert_dict is enabled
elif isinstance(value, dict):
if not value:
return DDict(Unknown)
for reg in self.dict_keys_regex:
if all(map(reg.match, value.keys())):
convert_dict = False
break
if convert_dict:
return self._convert(value)
else:
types = [self._detect_type(item) for item in value.values()]
if len(types) > 1:
union = DUnion(*types)
if len(union.types) == 1:
return DDict(*union.types)
return DDict(union)
else:
return DDict(*types)
# null interpreted as is and will be processed later on Union merge stage
elif value is None:
return Null
# string types trying to convert to other string-serializable types
else:
for t in self.str_types_registry:
try:
value = t.to_internal_value(value)
except ValueError:
continue
return t
return StringLiteral({value})
def merge_field_sets(self, field_sets: List[MetaData]) -> MetaData:
"""
Merge fields sets into one set of pairs (key, metadata)
"""
fields: dict = {}
first = True
for model in field_sets:
fields_diff = set(fields.keys())
for name, field in model.items():
if name not in fields:
# New field
field = field if first or isinstance(field, DOptional) else DOptional(field)
else:
field_original = fields[name]
fields_diff.remove(name)
if isinstance(field_original, DOptional):
# Existing optional field
if field_original == field or field_original.type == field:
continue
field_original = field_original.type
field = DOptional(DUnion(
*(field.types if isinstance(field, DUnion) else [field]),
*(field_original.types if isinstance(field_original, DUnion) else [field_original])
))
if len(field.type) == 1:
field.type = field.type.types[0]
else:
if field_original == field or (isinstance(field, DOptional) and field_original == field.type):
continue
field = DUnion(
*(field.types if isinstance(field, DUnion) else [field]),
*(field_original.types if isinstance(field_original, DUnion) else [field_original])
)
if len(field) == 1:
field = field.types[0]
fields[name] = field
for name in fields_diff:
# Missing fields becomes optionals
if not isinstance(fields[name], DOptional):
fields[name] = DOptional(fields[name])
first = False
return fields
def optimize_type(self, meta: MetaData, process_model_ptr=False) -> MetaData:
"""
Finds some redundant types and replace them with a simpler one
:param process_model_ptr: Control whether process ModelPtr instances or not.
Default is False to prevent recursion cycles.
"""
if isinstance(meta, dict):
fields = {}
for k, v in meta.items():
fields[k] = self.optimize_type(v)
return fields
elif isinstance(meta, DUnion):
return self._optimize_union(meta)
elif isinstance(meta, DOptional):
t = self.optimize_type(meta.type)
if isinstance(t, DOptional):
t = t.type
return meta.replace(t)
elif isinstance(meta, SingleType) and (process_model_ptr or not isinstance(meta, ModelPtr)):
# Optimize nested type
return meta.replace(self.optimize_type(meta.type))
elif isinstance(meta, ComplexType):
# Optimize all nested types
return meta.replace([self.optimize_type(nested) for nested in meta])
elif isinstance(meta, StringLiteral):
if meta.overflowed or not meta.literals:
return str
return meta
def _optimize_union(self, t: DUnion):
# Replace DUnion of 1 element with this element
# if len(t) == 1:
# return t.types[0]
# Split nested types into categories
str_types: List[Union[type, StringSerializable]] = []
types_to_merge: List[dict] = []
list_types: List[DList] = []
dict_types: List[DDict] = []
other_types: List[MetaData] = []
for item in t.types:
if isinstance(item, DOptional):
item = item.type
other_types.append(Null)
if isinstance(item, dict):
types_to_merge.append(item)
elif item in self.str_types_registry or item is str:
str_types.append(item)
elif isinstance(item, DList):
list_types.append(item)
elif isinstance(item, DDict):
dict_types.append(item)
else:
other_types.append(item)
if int in other_types and float in other_types:
other_types.remove(int)
if types_to_merge:
other_types.append(self.merge_field_sets(types_to_merge))
for cls, iterable_types in ((DList, list_types), (DDict, dict_types)):
if iterable_types:
other_types.append(cls(DUnion(*(
t.type for t in iterable_types
))))
if str in str_types:
other_types.append(str)
elif str_types:
str_types = self.str_types_registry.resolve(*str_types)
# Replace str pseudo-types with <class 'str'> when they can not be resolved into single type
other_types.append(str if len(str_types) > 1 else next(iter(str_types)))
types = [self.optimize_type(t) for t in other_types]
if len(types) > 1:
if Unknown in types:
types.remove(Unknown)
optional = False
if Null in types:
optional = True
while Null in types:
types.remove(Null)
meta_type = DUnion(*types)
if len(meta_type.types) == 1:
meta_type = meta_type.types[0]
if optional:
return DOptional(meta_type)
else:
meta_type = types[0]
return meta_type