-
Notifications
You must be signed in to change notification settings - Fork 7
/
ssd.py
1354 lines (1137 loc) · 46.8 KB
/
ssd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Copyright (C) 2017 Data61 CSIRO
Licensed under http://www.apache.org/licenses/LICENSE-2.0 <see LICENSE file>
Code for the SSD dataset description object
"""
import json
import logging
import networkx as nx
import random
from collections import OrderedDict
from collections import defaultdict
from .base import BaseSemantic, DEFAULT_NS, UNKNOWN_CN, UNKNOWN_DN, ALL_CN, OBJ_PROP
from ..elements import DataProperty, ObjectProperty, SSDSearchable, SSDLink, ClassNode, DataNode
from ..elements import DataLink, ObjectLink, ClassInstanceLink, ColumnLink, SubClassLink
from ..elements import Mapping, Column, Class
from ..dataset import DataSet
from ..semantics.ontology import Ontology
from serene.utils import gen_id, convert_datetime, flatten, Searchable
from serene.visualizers import SSDVisualizer
_logger = logging.getLogger()
_logger.setLevel(logging.WARN)
class SSD(object):
"""
Semantic source description is the translator between a DataSet
and a set of Ontologies
The SemanticSourceDesc contains the data columns,
a mapping between each column to a DataNode, and a
semantic model built from the available ontologies.
"""
def __init__(self,
dataset=None,
ontology=None,
name=None):
"""
Builds up a SemanticSourceDesc object given a dataset and
and a parent ontology (or set of ontologies).
"""
if dataset is not None:
if not issubclass(type(dataset), DataSet):
msg = "Required DataSet, not {}".format(type(dataset))
raise Exception(msg)
# dataset and ontology must be stored on the server!!!!
if not dataset.stored:
msg = "{} must be stored on the server, use <Serene>.datasets.upload"
raise Exception(msg)
if ontology is not None:
if issubclass(type(ontology), Ontology):
# we need a list of ontologies
ontology = [ontology]
if issubclass(type(ontology), list):
for onto in ontology:
if not issubclass(type(onto), Ontology):
msg = "Required Ontology, not {}".format(type(onto))
raise Exception(msg)
if not onto.stored:
msg = "{} must be stored on the server, use <Serene>.ontologies.upload".format(onto._uri)
raise Exception(msg)
else:
msg = "Required list of ontologies, not {}".format(type(ontology))
raise Exception(msg)
self._name = name if name is not None else gen_id()
self._dataset = dataset
self._ontology = ontology if ontology is not None else []
self._VERSION = "0.1"
self._id = None
self._stored = False # is stored on the server...
self._date_created = None
self._date_modified = None
# semantic model
self._semantic_model = SSDGraph()
# the mapping object from Column -> Mapping
# this should always contain the complete Column set,
# so that the user can easily see what is and isn't
# yet mapped.
# initialize the columns...
if self._dataset is not None:
for col in self._dataset.columns:
self._semantic_model.add_node(col, index=col.id)
def update(self, blob, dataset_endpoint, ontology_endpoint):
"""
Create the object from json directly
:param blob:
:param dataset_endpoint:
:param ontology_endpoint:
:return:
"""
_logger.debug("Updating ssd")
if 'id' in blob:
self._stored = True
self._date_created = convert_datetime(blob['dateCreated'])
self._date_modified = convert_datetime(blob['dateModified'])
self._id = int(blob['id'])
if 'name' in blob:
self._name = blob['name']
reader = SSDReader(blob,
dataset_endpoint,
ontology_endpoint)
self._ontology = reader.ontology
self._dataset = reader.dataset
self._semantic_model = reader.semantic_model
_logger.debug("Ssd update success!")
return self
def map(self, column, data_node):
"""
Map maps a Column object to a DataNode
:param column: The Column object in the DataSet
:param data_node: The DataNode object to map
:param add_class_node: Should the ClassNode in the DataNode be added
:return:
"""
column, data_node = self._clean_map_args(column, data_node)
# this will raise an error if the args are not correct
self._assert_map_args(column, data_node)
# this now breaks from the server...
self._stored = False
# attempt to add the class node if not already in the model...
if not self._semantic_model.exists(data_node.class_node, exact=True):
self._semantic_model.add_node(data_node.class_node)
# add the data_node
self._semantic_model.add_node(data_node)
# add a link between the two,
# with namespace from data node if it's available otherwise default prefix namespace...
if data_node.prefix:
ns = data_node.prefix
else:
ns = self.default_namespace
self._semantic_model.add_edge(
data_node.class_node,
data_node,
DataLink(data_node.label,
prefix=ns))
# add a link between the column and data node...
self._semantic_model.add_edge(
data_node,
column,
ColumnLink(column.name,
prefix=self.default_namespace))
return self
def link(self, src, label, dst):
"""
Adds a link between class nodes
:param src: The source ClassNode object
:param label: The label for the class node
:param dst: The destination ClassNode object
:return:
"""
src, dst = self._clean_link_args(src, dst)
# this will raise an error if the args are not correct
self._assert_link_args(src, label, dst)
# this now breaks from the server...
self._stored = False
# attempt to add the class nodes if not already in the model...
if not self._semantic_model.exists(src, exact=True):
self._semantic_model.add_node(src)
if not self._semantic_model.exists(dst, exact=True):
self._semantic_model.add_node(dst)
# add the link into
self._semantic_model.add_edge(
src, dst, ObjectLink(label, prefix=self.default_namespace)
)
return self
def remove_link(self, item, src=None, dst=None):
"""
Removes a link
:param item: a link type e.g. ClassNode("Person") or ObjectLink("worksFor")
:param src: Optional source node of link
:param dst: Optional destination node of link
:return:
"""
if issubclass(type(item), str):
item = ObjectLink(item, self.default_namespace)
if issubclass(type(item), SSDLink):
if item.prefix is None:
item.prefix = self.default_namespace
# this now breaks from the server...
self._stored = False
self._semantic_model.remove_edge(item, src, dst)
else:
msg = "Remove requires a link type"
raise ValueError(msg)
return self
def remove(self, item, src=None, dst=None):
"""
Removes nodes and links
:param item: a node or link type e.g. ClassNode("Person") or ObjectLink("worksFor")
:param src: Optional source node of link
:param dst: Optional destination node of link
:return:
"""
if issubclass(type(item), Searchable):
self._semantic_model.remove_node(item)
elif issubclass(type(item), SSDLink):
if item.prefix is None:
item.prefix = self.default_namespace
# this now breaks from the server...
self._stored = False
self._semantic_model.remove_edge(item, src, dst)
else:
msg = "Remove requires a node or link type"
raise ValueError(msg)
return self
def _clean_link_args(self, src, dst):
"""
Cleans the args for the link function, here a string will be automatically
wrapped int the relevant type
:param src: ClassNode() or str
:param dst: ClassNode() or str
:return: ClassNode, ClassNode
"""
if issubclass(type(src), str):
src = ClassNode(src, prefix=self.default_namespace)
if issubclass(type(dst), str):
dst = ClassNode(dst, prefix=self.default_namespace)
if issubclass(type(src), SSDSearchable):
if src.prefix is None:
src = ClassNode(src.label, src.index, prefix=self.default_namespace)
if issubclass(type(dst), SSDSearchable):
if dst.prefix is None:
dst = ClassNode(dst.label, dst.index, prefix=self.default_namespace)
return src, dst
def _clean_map_args(self, column, node):
"""
Cleans the args for the map function, here a string will be automatically
wrapped int the relevant type
:param column: Column() or str
:param node: DataNode(ClassNode(str), str), DataNode(str, str), ClassNode or str
:return: column, node as Column(), DataNode/ClassNode
"""
if issubclass(type(column), str):
column = Column(column)
if issubclass(type(node), str):
if '.' in node:
# this is a mapping to a data node
items = node.split('.')
class_node = ClassNode(items[0], prefix=self.default_namespace)
label = items[-1]
node = DataNode(class_node, label, prefix=self.default_namespace)
else:
# this is a mapping to a class node
node = ClassNode(node, prefix=self.default_namespace)
if issubclass(type(node), SSDSearchable):
if node.prefix is None:
node = DataNode(node.class_node,
node.label,
node.index,
self.default_namespace)
if node.class_node.prefix is None:
class_node = ClassNode(node.class_node.label,
node.class_node.index,
self.default_namespace)
node = DataNode(class_node,
node.label,
node.index,
node.prefix)
return column, node
def _assert_map_args(self, column, data_node):
"""
Checks that the requested Column -> DataNode mapping request is valid
:param column:
:param data_node:
:return:
"""
# Check the column status...
if not self._column_exists(column):
msg = "Map failed. Failed to find {} in {}".format(column, self._dataset)
raise ValueError(msg)
# Check the data node status...
# first check that the class is in the ontology...
cn = data_node.class_node
if not self._class_node_exists(cn):
msg = "Map failed. Failed to find {} in {}".format(cn, self._ontology)
raise ValueError(msg)
# next check the datanode in the ontology
if not self._data_node_exists(data_node):
msg = "Map failed. Failed to find {} in {}".format(data_node, self._ontology)
raise ValueError(msg)
# for columns, only one map can exist...
if self._semantic_model.degree(column) > 0:
msg = "Map failed. {} already has an existing mapping".format(column)
raise ValueError(msg)
# next check the datanode is unique
if self._semantic_model.exists(data_node, exact=True):
msg = "Map failed. {} already exists in the SSD".format(data_node)
raise ValueError(msg)
def _assert_link_args(self, src, label, dst):
"""
Checks that the requested Class - Class link request is valid
:param src: The source class node
:param label: The label name for the link
:param dst: The destination class node
:return: None
"""
# first check the src class in the ontology
if not self._class_node_exists(src):
msg = "Link failed. Failed to find {} in {}".format(src, self._ontology)
raise ValueError(msg)
# first check the dst class in the ontology
if not self._class_node_exists(dst):
msg = "Link failed. Failed to find {} in {}".format(dst, self._ontology)
raise ValueError(msg)
# next check the link exists in the ontology
if not self._link_exists(src, label, dst):
msg = "Link failed. Failed to find {}-{}-{} in {}".format(src, label, dst, self._ontology)
raise ValueError(msg)
def _link_exists(self, src, label, dst):
"""
Checks to see that the link exists in the ontology
:param src:
:param label:
:param dst:
:return:
"""
_logger.debug("Searching for {}-{}-{}".format(src, label, dst))
link_target = ObjectProperty(label, Class(src.label), Class(dst.label))
_logger.debug("using target: {}".format(link_target))
link = None
for onto in self._ontology:
# find any match in all ontologies
try:
link = ObjectProperty.search(onto.ilinks, link_target)
except:
continue
break
return link is not None
def _column_exists(self, column):
"""
Check that this column exists in the dataset.
:param column:
:return:
"""
# Check the column status...
col = Column.search(self._dataset.columns, column)
return col is not None
def _class_node_exists(self, cn: ClassNode):
"""
Check that the class node exists in the defined ontology
:param cn: The ClassNode parameter
:return:
"""
# Check the dataset status...
# first check the class
cls_target = Class(cn.label, prefix=cn.prefix)
cls = None
for onto in self._ontology:
# find any match in all ontologies
try:
cls = Class.search(onto.class_nodes, cls_target)
if cls is not None:
break
except:
continue
return cls is not None
def _data_node_exists(self, data_node: DataNode):
"""
Check that the data node exists in the defined ontology
:param data_node: A DataNode object to check
:return:
"""
# next check the datanode
cn = data_node.class_node
# construct a search param for the ontology Class and DataNode...
cls_target = Class(cn.label, prefix=cn.prefix)
target = DataProperty(cls_target, data_node.label)
dn = None
for onto in self._ontology:
# find any match in all ontologies
try:
dn = DataProperty.search(onto.idata_nodes, target)
if dn is not None:
break
except:
continue
return dn is not None
@property
def default_namespace(self):
"""
The default namespace to add to ClassNode, DataNode and Links if missing.
If there are ontologies, it will return the first not empty namespace of the ontology;
if all ontology namespaces are empty, it will return the default namespace.
An error will be raised if there are no ontologies.
:return: str
"""
# TODO: check if it's ok to take just the first ontology from the list...
if self._ontology and len(self._ontology):
for onto in self._ontology:
if onto.namespace:
return onto.namespace
return DEFAULT_NS
else:
msg = "No ontology available in SSD."
raise Exception(msg)
def fill_unknown(self):
"""
Make all unmapped columns from the dataset as instances of unknown.
Then add Thing/All node and make all class nodes as subclass of Thing/ connect to All.
This way we ensure that ssd is connected.
Use with caution!
"""
unmapped_cols = [col for col in self.dataset.columns if col not in self.columns]
# TODO: check that Unknown is among ontologies
for i, col in enumerate(unmapped_cols):
self._semantic_model.add_node(col, index=col.id)
data_node = DataNode(ClassNode(UNKNOWN_CN, prefix=DEFAULT_NS), label=UNKNOWN_DN,
index=i,
prefix=DEFAULT_NS)
self.map(col, data_node)
if len(unmapped_cols):
self._add_all_node()
return self
def _add_all_node(self):
"""
Add All node to ssd and make all class nodes connect to All.
Helper method for fill_unknown
"""
class_nodes = self.class_nodes
# TODO: check that All is among ontologies
thing_node = ClassNode(ALL_CN, prefix=DEFAULT_NS)
self._semantic_model.add_node(thing_node)
for cn in class_nodes:
self._semantic_model.add_edge(
thing_node, cn, ObjectLink(OBJ_PROP, prefix=DEFAULT_NS))
return self
def _add_thing_node(self):
"""
Add Thing node to ssd and make all class nodes subclass of Thing.
Helper method for fill_unknown
"""
class_nodes = self.class_nodes
prefix = "http://www.w3.org/2000/01/rdf-schema#"
thing_node = ClassNode("Thing", prefix="http://www.w3.org/2002/07/owl#")
self._semantic_model.add_node(thing_node)
for cn in class_nodes:
self._semantic_model.add_edge(
cn, thing_node, SubClassLink("subClassOf", prefix=prefix))
return self
def __repr__(self):
"""Output string"""
if self.stored:
return "SSD({}, {})".format(self.id, self.name)
else:
return "SSD(local, {})".format(self.name)
@property
def class_nodes(self):
return self._semantic_model.class_nodes
@property
def data_nodes(self):
return self._semantic_model.data_nodes
@property
def columns(self):
return self._semantic_model.columns # self._dataset.columns
@property
def data_links(self):
return self._semantic_model.data_links
@property
def object_links(self):
return self._semantic_model.object_links
@property
def links(self):
return self.data_links + self.object_links
@property
def dataset(self):
return self._dataset
@property
def ontology(self):
return self._ontology
@property
def mappings(self):
return self._semantic_model.mappings
@property
def unmapped_columns(self):
mapped_cols = set(self.mappings.values())
all_cols = set(self._dataset.columns)
return all_cols - mapped_cols
@property
def semantic_model(self):
return self._semantic_model
@property
def json(self):
return SSDJsonWriter(self).to_json()
@property
def json_dict(self):
return SSDJsonWriter(self).to_dict()
@property
def stored(self):
return self._stored
@property
def id(self):
return self._id
@property
def name(self):
return self._name
def show(self, title='', outfile=None):
"""Displays the SSD"""
self._semantic_model.show(name=self._dataset.filename, title=title, outfile=outfile)
def __repr__(self):
props = "name={}, dataset={}, ontology={}".format(
self._name, self._dataset, self._ontology)
if self._stored:
return "SSD({}, {})".format(self._id, props)
else:
return "SSD(local, {})".format(props)
def evaluate(self, ground_truth, include_all=False, include_cols=True):
"""
Evaluate this ssd against ground truth
:param ground_truth: another ssd
:param include_all: boolean whether to include the connector node All
:param include_cols: boolean wheter to include column links
:return:
"""
logging.info("Calculating comparison metrics for ssds")
ssd_triples = self.get_triples(include_all=include_all, include_cols=include_cols)
ground_triples = ground_truth.get_triples(include_all=include_all, include_cols=include_cols)
comparison = {"precision": self.get_precision(ground_triples, ssd_triples),
"recall": self.get_recall(ground_triples, ssd_triples),
"jaccard": self.get_jaccard(ground_triples, ssd_triples)}
return comparison
def _node_uri(self, node_id):
"""Get uri of the node in ssd"""
node = self._semantic_model._graph.node[node_id]["data"]
if type(node) == Column:
# for columns we return its name
return node.name
return node.prefix + node.label
@staticmethod
def _link_uri(ld):
"""Get uri of link"""
if type(ld) == ColumnLink:
return "ColumnLink"
else:
return ld.prefix + ld.label
def get_triples(self, include_all=False, include_cols=True):
"""
Get extended RDF triples from the ssd including columns
- include mappings to columns
- include All unknown
:param include_all: boolean whether to include the connector node All
:param include_cols: boolean wheter to include column links
"""
# logging.info("Extracting rdf triples from ssd...")
logging.info("Extracting rdf triples from ssd...")
triples = []
# a triple = (uri of subject, uri of predicate, uri of object, num)
# num just enumerates how many times the same triple appears in the semantic model
triple_count = defaultdict(int)
links = self._semantic_model._graph.edges(data=True)
all_uri = DEFAULT_NS + ALL_CN
for l_start, l_end, ld in links:
pred = self._link_uri(ld["data"])
if not (include_cols) and pred == "ColumnLink":
continue
subj, obj = self._node_uri(l_start), self._node_uri(l_end)
if not (include_all) and (subj == all_uri or obj == all_uri):
continue
triple = (subj, pred, obj)
triples.append(triple + (triple_count[triple],))
# increase the counter for this triple
triple_count[triple] += 1
# logging.info("Extracted {} rdf triples from ssd.".format(len(triples)))
logging.info("Extracted {} extended rdf triples from ssd.".format(len(triples)))
triples = set(triples)
logging.info("Extracted {} unique rdf triples from ssd.".format(len(triples)))
return triples
@staticmethod
def get_precision(correct_triples, result_triples):
"""
Calculate precision by comparing triples of the ground truth with the predicted result.
:param correct_triples: set
:param result_triples: set
:return:
"""
intersection = float(len(correct_triples & result_triples))
result_size = len(result_triples)
if result_size:
return intersection / result_size
return 0.0
@staticmethod
def get_recall(correct_triples, result_triples):
"""
Calculate recall by comparing triples of the ground truth with the predicted result.
:param correct_triples: set
:param result_triples: set
:return:
"""
intersection = float(len(correct_triples & result_triples))
correct_size = len(correct_triples)
if correct_size:
return intersection / correct_size
return 0.0
@staticmethod
def get_jaccard(correct_triples, result_triples):
"""
Calculate jaccard coefficient by comparing triples of the ground truth with the predicted result.
:param correct_triples: set
:param result_triples: set
:return:
"""
intersection = float(len(correct_triples & result_triples))
union_size = len(correct_triples | result_triples)
if union_size:
return intersection / union_size
return 0.0
class SSDGraph(object):
"""
The Semantic Model object for the SSD
"""
def __init__(self):
"""
Simple initialization to start the id counters and
hold the graph object
"""
self._node_id = 0
self._edge_id = 0
self._graph = nx.MultiDiGraph() # holds the ClassNodes, DataNodes and Column objects
self._lookup = {} # object -> int key
self.DATA_KEY = 'data'
def _type_list(self, value_type):
"""
Helper function to filter out the lookup keys by type.
:param value_type: The type of the node to search.
:return:
"""
return [x for x in self._lookup.keys() if type(x) == value_type]
def find(self, node: Searchable, exact=False) -> Searchable:
"""
Helper function to find `node` in the semantic model. The search
is loose on additional node parameters other than label.
:param node: A ClassNode or DataNode
:param exact: Should find return an exact .eq. match or just approximate through the type 'getters'
:return:
"""
# first collect the candidates from the lookup keys...
candidates = self._type_list(type(node))
return type(node).search(candidates, node, exact=exact)
def exists(self, node: Searchable, exact=False) -> bool:
"""
Helper function to check if a node exists...
:param node: A DataNode or ClassNode
:param exact: Should the match be exact or approximate
:return:
"""
return self.find(node, exact) is not None
def degree(self, node: Searchable):
"""Returns the degree of a node in the graph"""
key = self.find(node)
if key is None:
return 0
return self._graph.degree(self._lookup[key])
def add_node(self, node: Searchable, index=None):
"""
Adds a node into the semantic model
:param node: A DataNode or ClassNode to add into the graph
:param index: The override index parameter. If None the auto-increment will be used.
:return:
"""
true_node = self.find(node, exact=True)
if true_node is not None:
msg = "{} already exists in the SSD: {}".format(node, true_node)
_logger.debug(msg)
# keep the same index in this case...
index = self._lookup[true_node]
# set the index
if index is None:
# we need to be careful that this node is not in the graph...
nodes = self._graph.nodes()
index = max(nodes) + 1 if len(nodes) else 0
# add the node into the semantic model
self._graph.add_node(index, data=node, node_id=index)
# add this node into the lookup...
self._lookup[node] = index
return index
def add_edge(self,
src: SSDSearchable,
dst: SSDSearchable,
link: SSDLink,
index=None):
"""
Adds an edge from `src` -> `dest` with `link`
:param src: A SSDSearchable e.g. a DataNode or ClassNode
:param dst: A DataNode or ClassNode
:param link: An SSDLink value to store on the edge.
:param index: The override index parameter. If None the auto-increment will be used.
:return:
"""
if type(src) == int and type(dst) == int:
i_s = src
i_d = dst
else:
true_src, true_dst = self._check_edge_args(src, dst)
# make sure we aren't rewriting...
i_s = self._lookup[true_src]
i_d = self._lookup[true_dst]
if link in self.all_edge_data(i_s, i_d):
_logger.debug("{} is already in the SSD")
return
# set the index...
if index is None:
# we need to be careful that the auto index is not in the graph...
edges = [v['edge_id'] for _, _, v in self._graph.edges(data=True)]
index = max(edges) + 1 if len(edges) else 0
if i_s not in self._graph.node:
msg = "Link failed. Could not find source node {} in semantic model".format(i_s)
raise Exception(msg)
if i_d not in self._graph.node:
msg = "Link failed. Could not find destination node {} in semantic model".format(i_d)
raise Exception(msg)
self._graph.add_edge(i_s,
i_d,
data=link,
edge_id=index)
return i_s, i_d
def all_neighbors(self, n):
"""returns all neighbours of node `n`"""
return self._graph.successors(n) + self._graph.predecessors(n)
def remove_node(self, node: Searchable):
"""
Removes a node from the graph. Note that a Column node
cannot be removed. If node is a Column, the mapping will
be removed instead
:param node: A DataNode, Column or ClassNode
:return: None
"""
true_node = self.find(node)
if true_node is None:
msg = "Remove node failed. {} does not exist.".format(true_node)
raise ValueError(msg)
key = self._lookup[true_node]
if issubclass(type(true_node), Column):
# if it's a column, we just remove the data node...
for dn in self.all_neighbors(key):
for v in self._ilookup[dn]:
self.remove_node(v)
else:
# otherwise, we can remove the node and it's
# adjacent links. We also make sure that any
# hanging nodes are removed too...
neighbors = self.all_neighbors(key)
self._graph.remove_node(key)
del self._lookup[true_node]
# remove any hanging neighbors...
m = self._ilookup
for n in neighbors:
# if there are any nodes with no outgoing links
# then we should remove them.
if self._graph.out_degree(n) == 0:
# we remove the node by the item to chain
# the removal...
for item in m[n]:
if not issubclass(type(item), Column):
self.remove_node(item)
return self
@property
def _ilookup(self):
"""inverse lookup..."""
m = defaultdict(list)
for k, v in self._lookup.items():
m[v].append(k)
return m
def remove_edge(self,
item: SSDLink,
src: Searchable=None,
dst: Searchable=None):
"""
Removes an edge from the graph. Note that a Column node
cannot be removed. If node is a Column, the mapping will
be removed instead
:param item: The link object
:param src: A DataNode, Column or ClassNode
:param dst: A DataNode, Column or ClassNode
:return: None
"""
edges = [(x, y, k) for x, y, k, z in self._edge_list if z == item]
if len(edges) > 1:
# ambiguous, may need the src/dst to find it
if src is not None and dst is not None:
true_src, true_dst = self._check_edge_args(src, dst)
i = self._lookup[true_src]
j = self._lookup[true_dst]
if len(self._graph.edge[i][j]) == 1:
# remove the edge and its adjacent links if only one
self._graph.remove_edge(true_src, true_dst, 0)
msg = "Failed to remove edge. {} is ambiguous".format(item)
raise ValueError(msg)
elif len(edges) == 0:
msg = "Failed to remove edge. {} could not be found.".format(item)
raise ValueError(msg)
else:
x, y, k = edges[0]
self._graph.remove_edge(x, y, k)
if not len(self.all_neighbors(x)):
for v in self._ilookup[x]:
self.remove_node(v)
if not len(self.all_neighbors(y)):
for v in self._ilookup[y]:
self.remove_node(v)
return self
def show(self, name='source', title='', outfile=None):
"""
Prints to the screen and also shows a graphviz visualization.
:param title: Optional title for the graph
:param outfile: file where graph will be stored; if None - a temporary file will be created
:return:
"""
SSDVisualizer(self, name=name, outfile=outfile).show(title=title)
@property
def graph(self):
"""Reference to the graph object"""
return self._graph
def node_data(self, node):
"""Helper function to return the data stored at the node"""
return self._graph.node[node][self.DATA_KEY]
def edge_data(self, src, dst, index=0):
"""Helper function to return the data stored on the edge"""
return self._graph.edge[src][dst][index][self.DATA_KEY]
def all_edge_data(self, src, dst):
"""Helper function to return all the data stored on the edge"""
if src in self._graph.edge and dst in self._graph.edge[src]:
z = self._graph.edge[src][dst]
return [z[i][self.DATA_KEY] for i in range(len(z))
if z[i] is not None and
z[i][self.DATA_KEY] is not None]
else:
return []
@property
def class_nodes(self):
"""Returns the ClassNode objects in the semantic model"""
return [self.node_data(n) for n in self._graph.nodes()
if type(self.node_data(n)) == ClassNode]
@property
def data_nodes(self):
"""Returns the DataNode objects in the graph"""
return [self.node_data(n) for n in self._graph.nodes()
if type(self.node_data(n)) == DataNode]
@property
def columns(self):
"""Returns the Column objects in the graph"""
return [self.node_data(n) for n in self._graph.nodes()
if type(self.node_data(n)) == Column]
@property
def _edge_list(self):
"""helper function to return 4-tuple of (x, y, key, item)"""
edge_dicts = [(e1, e2, self._graph.edge[e1][e2]) for e1, e2 in self._graph.edges()]
# flatten out to (e1, e2, key, value)
arr = []
for x, y, z in edge_dicts:
for k, v in z.items():