This repository has been archived by the owner on Nov 12, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
schema_parser.py
264 lines (191 loc) · 9.8 KB
/
schema_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import json
import re
import jsonref
from .exceptions import RootSchemaException
from ..utils import doctict
class SchemaParser:
""" A SchemaParser provides functions for accessing objects in a JSON schema. """
def __init__(self, template):
self.ignored_properties = ["describedBy", "schema_version", "schema_type", "provenance"]
self.required_properties = []
self.schema_template = template
# TODO: identifiable should be in the schema - hardcoded here for now.
self._identifiable = ["biomaterial_id", "process_id", "protocol_id", "file_name"]
self._key_lookup = {}
def load_schema(self, json_schema):
"""
Resolve references in a given JSON-formatted metadata schema and populate a SchemaTemplate object with the
data in the metadata schema.
:param json_schema: An object representing a deserialized JSON-formatted metadata schema with references.
:return: An object representing a deserialized JSON-formatted metadata schema with all its references resolved.
"""
# Use jsonrefs to resolve all $refs in JSON
metadata_schema_data = jsonref.loads(json.dumps(json_schema))
return self.initialise_template(metadata_schema_data)
def load_migration(self, property_migration):
return self.initialise_property_migration_template(property_migration)
def key_lookup(self, key):
return self._key_lookup[key]
def initialise_property_migration_template(self, property_migration):
migrated_property = property_migration["source_schema"] + "." + property_migration["property"]
migration_info = {}
if "target_schema" in property_migration and "replaced_by" in property_migration:
migration_info["replaced_by"] = \
property_migration["target_schema"] + "." + property_migration["replaced_by"]
if "effective_from" in property_migration:
migration_info["version"] = property_migration["effective_from"]
elif "effective_from_source" in property_migration:
migration_info["version"] = property_migration["effective_from_source"]
migration_info["target_version"] = property_migration["effective_from_target"]
migration_info = {migrated_property.split(".")[-1]: migration_info}
for part in reversed(migrated_property.split(".")[:-1]):
migration_info = {part: migration_info}
self.schema_template.put_migration(migration_info)
return self.schema_template
def initialise_template(self, data):
self.get_required_properties_from_metadata_schema(data)
property = self._extract_property(data)
if not property.schema or "type" not in property.schema.high_level_entity:
raise RootSchemaException(
"Schema must start with a root submittable type schema")
else:
property.uuid = {'external_reference': True, 'identifiable': True}
# todo get tab display name from schema
tab_display = property.schema.module[0].upper() + property.schema.module[1:].replace("_", " ")
tab_info = {property.schema.module: {"display_name": tab_display, "columns": []}}
self.schema_template.append_tab(tab_info)
self.schema_template.put(property.schema.module, property)
self._recursive_fill_properties(property.schema.module, data)
self.schema_template.set_label_mappings(self._key_lookup)
return self.schema_template
def _get_path(self, str1, str2):
return ".".join([str1, str2.split('/')[0]])
def _recursive_fill_properties(self, path, data):
for property_name, property_block in self._get_schema_properties_from_object(data).items():
new_path = self._get_path(path, property_name)
property = self._extract_property(property_block, property_name=property_name, key=new_path)
doctict.put(self.schema_template.get_template(), new_path, property)
self._recursive_fill_properties(new_path, property_block)
def get_required_properties_from_metadata_schema(self, data):
if "required" in data:
self.required_properties = list(set().union(self.required_properties, data["required"]))
def create_new_template_for_property(self):
""" Returns a dictionary populated with keys and respective default values that represent metadata about a
property that exists in a metadata schema.
"""
return {
"multivalue": False,
"format": None,
"required": False,
"identifiable": False,
"external_reference": False,
"user_friendly": None,
"description": None,
"example": None,
"guidelines": None,
"value_type": "string"}
def _extract_property(self, data, *args, **kwargs):
property_metadata = self.create_new_template_for_property()
if "type" in data:
property_metadata["value_type"] = data["type"]
if data["type"] == "array":
items = data.get("items", {})
property_metadata["value_type"] = items.get('type', 'string')
property_metadata["multivalue"] = True
schema = self._get_schema_from_object(data)
if 'property_name' in kwargs:
if kwargs.get('property_name') in self.required_properties:
property_metadata["required"] = True
if kwargs.get('property_name') in self._identifiable:
property_metadata["identifiable"] = True
if 'key' in kwargs and "object" != property_metadata["value_type"]:
self.schema_template.append_column_to_tab(kwargs.get('key'))
if schema:
property_metadata["schema"] = schema
if 'key' in kwargs:
self._update_label_to_key_map(kwargs.get("key"), kwargs.get("key"))
if "user_friendly" in data:
property_metadata["user_friendly"] = data["user_friendly"]
self._update_label_to_key_map(data["user_friendly"], kwargs.get("key"))
elif isinstance(data, jsonref.JsonRef) and "user_friendly" in data.__reference__:
property_metadata["user_friendly"] = data.__reference__["user_friendly"]
self._update_label_to_key_map(data.__reference__["user_friendly"], kwargs.get("key"))
if "description" in data:
property_metadata["description"] = data["description"]
elif isinstance(data, jsonref.JsonRef) and "description" in data.__reference__:
property_metadata["description"] = data.__reference__["description"]
if "format" in data:
property_metadata["format"] = data["format"]
if "example" in data:
property_metadata["example"] = data["example"]
if "guidelines" in data:
property_metadata["guidelines"] = data["guidelines"]
return doctict.DotDict(property_metadata)
def _update_label_to_key_map(self, label, key):
values = []
if label.lower() not in self._key_lookup:
values = [key]
else:
values = self._key_lookup[label.lower()]
values.append(key)
if key not in self._key_lookup:
self._key_lookup[key] = [key]
self._key_lookup[label.lower()] = list(set(values))
def _get_schema_from_object(self, data):
""" Given a JSON object get the id and work out the high-level metadata about the metadata schema. """
if "items" in data:
return self._get_schema_from_object(data["items"])
url_key = None
if '$id' in data:
url_key = '$id'
if 'id' in data:
url_key = 'id'
if url_key:
url = data[url_key]
return self.create_and_populate_schema_given_information_in_url(url)
return None
def create_and_populate_schema_given_information_in_url(self, url):
"""
Given a URL, create and populate a Schema with high level information about the schema gleaned directly from
the URL.
:param url: A string representing the URL of a metadata schema.
:return: A Schema object that contains a dictionary with metadata about the metadata schema.
"""
schema = Schema().build()
# Populate the high level entity
pattern = re.compile("http[s]?://[^/]*/([^/]*)/")
match = pattern.search(url)
schema.high_level_entity = match.group(1)
# Populate the domain entity
pattern = re.compile(r'http[s]?://[^/]*/[^/]*/(?P<domain_entity>.*)/(((\d+\.)?(\d+\.)?(\*|\d+))|(latest))/.*')
match = pattern.search(url)
schema.domain_entity = match.group(1) if match else None
# Populate the module
schema.module = url.rsplit('/', 1)[-1]
# Populate the version
schema.version = url.rsplit('/', 2)[-2]
# Populate the url
schema.url = url
return schema
def _get_schema_properties_from_object(self, object):
self.get_required_properties_from_metadata_schema(object)
if "items" in object and isinstance(object["items"], dict):
return self._get_schema_properties_from_object(object["items"])
if "properties" in object and isinstance(object["properties"], dict):
keys_to_remove = set(self.ignored_properties).intersection(set(object["properties"].keys()))
for unwanted_key in keys_to_remove:
del object["properties"][unwanted_key]
return object["properties"]
return {}
class Schema:
def __init__(self):
self.dict = {}
def build(self):
self.dict = {
"high_level_entity": None,
"domain_entity": None,
"module": None,
"version": None,
"url": None,
}
return doctict.DotDict(self.dict)