-
Notifications
You must be signed in to change notification settings - Fork 7
/
utils.py
296 lines (269 loc) · 10.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import logging
from language_tags import tags
from skosprovider.skos import Collection
from skosprovider.skos import Concept
from sqlalchemy import select
from sqlalchemy.exc import NoResultFound
from skosprovider_sqlalchemy.models import Collection as CollectionModel
from skosprovider_sqlalchemy.models import Concept as ConceptModel
from skosprovider_sqlalchemy.models import Label as LabelModel
from skosprovider_sqlalchemy.models import Language as LanguageModel
from skosprovider_sqlalchemy.models import Match as MatchModel
from skosprovider_sqlalchemy.models import Note as NoteModel
from skosprovider_sqlalchemy.models import Source as SourceModel
from skosprovider_sqlalchemy.models import Thing as ThingModel
log = logging.getLogger(__name__)
def import_provider(provider, conceptscheme, session):
'''
Import a provider into a SQLAlchemy database.
:param provider: The :class:`skosprovider.providers.VocabularyProvider`
to import. Since the SQLAlchemy backend uses integers as
keys, this backend should have id values that can be cast to int.
:param conceptscheme: A
:class:`skosprovider_sqlalchemy.models.Conceptscheme` to import
the provider into. This should be an empty scheme so that there are
no possible id clashes.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
# Copy information about the scheme
cs = provider.concept_scheme
_add_labels(conceptscheme, cs.labels, session)
_add_notes(conceptscheme, cs.notes, session)
_add_sources(conceptscheme, cs.sources, session)
for l in cs.languages:
language = _check_language(l, session)
conceptscheme.languages.append(language)
# First pass: load all concepts and collections
for stuff in provider.get_all():
c = provider.get_by_id(stuff['id'])
if isinstance(c, Concept):
cm = ConceptModel(
concept_id=c.id,
uri=c.uri,
conceptscheme=conceptscheme
)
elif isinstance(c, Collection):
cm = CollectionModel(
concept_id=c.id,
uri=c.uri,
conceptscheme=conceptscheme
)
session.add(cm)
_add_labels(cm, c.labels, session)
_add_notes(cm, c.notes, session)
_add_sources(cm, c.sources, session)
if hasattr(c, 'matches'):
for mt in c.matches:
matchtype = mt + 'Match'
for m in c.matches[mt]:
match = MatchModel(matchtype_id=matchtype, uri=m)
cm.matches.append(match)
session.flush()
# Second pass: link
for stuff in provider.get_all():
c = provider.get_by_id(stuff['id'])
if isinstance(c, Concept):
cm = session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(c.id)
)
).scalar_one()
if len(c.narrower) > 0:
for nc in c.narrower:
try:
nc = session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(nc)
)
).scalar_one()
cm.narrower_concepts.add(nc)
except NoResultFound:
log.warning(
'Tried to add a relation %s narrower %s, but target \
does not exist. Relation will be lost.' % (c.id, nc))
if len(c.subordinate_arrays) > 0:
for sa in c.subordinate_arrays:
try:
sa = session.execute(
select(CollectionModel)
.filter(
CollectionModel.conceptscheme_id == conceptscheme.id,
CollectionModel.concept_id == str(sa)
)
).scalar_one()
cm.narrower_collections.add(sa)
except NoResultFound:
log.warning(
'Tried to add a relation %s subordinate array %s, but target \
does not exist. Relation will be lost.' % (c.id, sa))
if len(c.related) > 0:
for rc in c.related:
try:
rc = session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(rc)
)
).scalar_one()
cm.related_concepts.add(rc)
except NoResultFound:
log.warning(
'Tried to add a relation %s related %s, but target \
does not exist. Relation will be lost.' % (c.id, rc))
elif isinstance(c, Collection) and len(c.members) > 0:
cm = session.execute(
select(CollectionModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(c.id)
)
).scalar_one()
for mc in c.members:
try:
mc = session.execute(
select(ThingModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(mc)
)
).scalar_one()
cm.members.add(mc)
except NoResultFound:
log.warning(
'Tried to add a relation %s member %s, but target \
does not exist. Relation will be lost.' % (c.id, mc))
def _check_language(language_tag, session):
'''
Checks if a certain language is already present, if not import.
:param string language_tag: IANA language tag
:param session: Database session to use
:rtype: :class:`skosprovider_sqlalchemy.models.Language`
'''
if not language_tag: # pragma: no cover
language_tag = 'und'
l = session.get(LanguageModel, language_tag)
if not l:
if not tags.check(language_tag):
raise ValueError('Unable to import provider. Invalid language tag: %s' % language_tag)
descriptions = ', '.join(tags.description(language_tag))
l = LanguageModel(id=language_tag, name=descriptions)
session.add(l)
return l
def _add_labels(target, labels, session):
'''
Adds the labels to the target
:param target: Target to add the labels to
:param labels: A list of :class:`skosprovider.skos.Label` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for l in labels:
_check_language(l.language, session)
target.labels.append(LabelModel(
label=l.label,
labeltype_id=l.type,
language_id=l.language
))
return target
def _add_notes(target, notes, session):
'''
Adds the notes to the target
:param target: Target to add the notes to
:param notes: A list of :class:`skosprovider.skos.Note` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for n in notes:
_check_language(n.language, session)
target.notes.append(NoteModel(
note=n.note,
notetype_id=n.type,
language_id=n.language,
markup=n.markup
))
return target
def _add_sources(target, sources, session):
'''
Adds the sources to the target
:param target: Target to add the sources to
:param sources: A list of :class:`skosprovider.skos.Source` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for s in sources:
target.sources.append(SourceModel(
citation=s.citation,
markup=s.markup
))
return target
class VisitationCalculator:
'''
Generates a nested set for a conceptscheme.
'''
def __init__(self, session):
'''
:param :class:`sqlalchemy.orm.session.Session` session: A database
session.
'''
self.session = session
self.count = 0
self.depth = 0
self.visitation = []
def visit(self, conceptscheme):
'''
Visit a :class:`skosprovider_sqlalchemy.models.Conceptscheme` and
calculate a nested set representation.
:param conceptscheme: A
:class:`skosprovider_sqlalchemy.models.Conceptscheme` for which
the nested set will be calculated.
'''
self.count = 0
self.depth = 0
self.visitation = []
# get all possible top concepts
topc = self.session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme == conceptscheme,
~ConceptModel.broader_concepts.any()
)
).scalars().all()
# check if they have an indirect broader concept
def _has_higher_concept(c):
for coll in c.member_of:
if (
coll.infer_concept_relations
and coll.broader_concepts or _has_higher_concept(coll)
):
return True
return False
topc = [c for c in topc if not _has_higher_concept(c)]
for tc in topc:
self._visit_concept(tc)
self.visitation.sort(key=lambda v: v['lft'])
return self.visitation
def _visit_concept(self, concept):
if concept.type == 'concept':
log.debug('Visiting concept %s.' % concept.id)
self.depth += 1
self.count += 1
v = {
'id': concept.id,
'lft': self.count,
'depth': self.depth
}
for nc in concept.narrower_concepts:
self._visit_concept(nc)
for ncol in concept.narrower_collections:
if ncol.infer_concept_relations:
self._visit_concept(ncol)
self.count += 1
v['rght'] = self.count
self.visitation.append(v)
self.depth -= 1
elif concept.type == 'collection':
log.debug('Visiting collection %s.' % concept.id)
for m in concept.members:
self._visit_concept(m)