-
Notifications
You must be signed in to change notification settings - Fork 0
/
clients.py
389 lines (319 loc) · 14.6 KB
/
clients.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import logging
from io import BufferedReader
import bigchaindb_driver as bd
import prov.graph as provgraph
import prov.model as provmodel
from bigchaindb_driver import pool as bdpool
from networkx import is_directed_acyclic_graph
from networkx import isolates
from networkx import topological_sort
from prov2bigchaindb.core import utils, local_stores, accounts
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class BaseClient(object):
""" BigchainDB Base Client """
def __init__(self, host: str = '0.0.0.0', port: int = 9984,
num_connections: int = 5, local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Base Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param num_connections: Amount of connections made to BigchainDB node
:type num_connections: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
assert num_connections > 0
self.node = 'http://{}:{}'.format(host, str(port))
self.connections = num_connections * [bd.BigchainDB(self.node)]
self.connection_pool = bdpool.Pool(self.connections)
self.store = local_store
def test_transaction(self, tx: dict) -> bool:
"""
Validate a transaction against BigchainDB
:param tx: Transaction to test
:type tx: dict
:return: True or Exception
:rtype: bool
"""
reason = None
if not utils.is_valid_tx(tx['id'], self.connection_pool.get_connection()):
reason = "TX is invalid"
elif not utils.is_block_to_tx_valid(tx['id'], self.connection_pool.get_connection()):
reason = "Block is invalid"
if reason is None:
return True
log.error("Test failed: %s", tx['id'])
raise Exception(reason)
def _get_bigchain_connection(self) -> bd.BigchainDB:
"""
Returns BigchainDB connection
:return: BigchainDB connection object
:rtype: bd.BigchainDB
"""
return self.connection_pool.get_connection()
def save_document(self, document: object) -> object:
"""
Abstract method to store a document
:param document: Document to save
:type document: object
:return: id
:rtype: object
"""
raise NotImplementedError("Abstract method")
def get_document(self, document_id: object) -> provmodel.ProvDocument:
"""
Abstract method to retrieve a document
:param document_id: Document to save
:type document_id: object
:rtype: ProvDocument
"""
raise NotImplementedError("Abstract method")
class DocumentConceptClient(BaseClient):
""""""
def __init__(self, account_id: str = None, host: str = '0.0.0.0', port: int = 9984, num_connections: int = 1,
local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Document Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
super().__init__(host, port, num_connections, local_store)
self.account = accounts.DocumentConceptAccount(account_id, self.store)
def save_document(self, document: str or bytes or provmodel.ProvDocument) -> str:
"""
Write a document into BigchainDB
:param document: Document as JSON/XML/PROVN
:type document: str or bytes or ProvDocument
:return: Transaction id of document
:rtype: str
"""
log.info("Save document...")
prov_document = utils.to_prov_document(content=document)
asset = {'prov': prov_document.serialize(format='json')}
tx_id = self.account.save_asset(asset, self._get_bigchain_connection())
log.info("Saved document in Tx with id: %s", tx_id)
return tx_id
def get_document(self, tx_id: str) -> provmodel.ProvDocument:
"""
Retrieve a document by transaction id from BigchainDB
:param tx_id: Transaction Id of Document
:type tx_id: str
:return: Document as ProvDocument object
:rtype: ProvDocument
"""
log.info("Retrieve and build document")
tx = self._get_bigchain_connection().transactions.retrieve(tx_id)
self.test_transaction(tx)
if 'id' in tx['asset'].keys():
tx = self._get_bigchain_connection().transactions.get(asset_id=tx['asset']['id'])[0]
self.test_transaction(tx)
log.info("Success")
return utils.to_prov_document(tx['asset']['data']['prov'])
class GraphConceptClient(BaseClient):
""""""
def __init__(self, host: str = '0.0.0.0', port: int = 9984, num_connections: int = 5,
local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Graph Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
super().__init__(host, port, num_connections, local_store=local_store)
self.accounts = []
@staticmethod
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
"""
Transforms a ProvDocument into a tuple with ProvElement, list of ProvRelation and list of Namespaces
:param prov_document: Document to transform
:type prov_document:
:return: List of tuples(element, relations, namespace)
:rtype: list
"""
namespaces = prov_document.get_registered_namespaces()
g = provgraph.prov_to_graph(prov_document=prov_document)
elements = []
for node, node_dict in g.adjacency():
relations = {'with_id': [], 'without_id': []}
# print(node)
for tmp_relations in node_dict.values():
for relation in tmp_relations.values():
relation = relation['relation']
if relation.identifier:
relations['with_id'].append(relation)
else:
relations['without_id'].append(relation)
elements.append((node, relations, namespaces))
return elements
def save_document(self, document: str or BufferedReader or provmodel.ProvDocument) -> list:
"""
Write a document into BigchainDB
:param document: Document as JSON/XML/PROVN
:type document: str or BufferedReader or ProvDocument
:return: List of transaction ids
:rtype: list
"""
log.info("Save document...")
document_tx_ids = []
prov_document = utils.to_prov_document(content=document)
elements = GraphConceptClient.calculate_account_data(prov_document)
id_mapping = {}
log.info("Create and Save instances")
for prov_element, prov_relations, namespaces in elements:
for rel in prov_relations['with_id']:
id_mapping[rel.identifier] = ''
for prov_element, prov_relations, namespaces in elements:
account = accounts.GraphConceptAccount(prov_element, prov_relations, id_mapping, namespaces, self.store)
self.accounts.append(account)
tx_id = account.save_instance_asset(self._get_bigchain_connection())
document_tx_ids.append(tx_id)
log.info("Save relations with ids")
for account in filter(lambda acc: acc.has_relations_with_id, self.accounts):
document_tx_ids += account.save_relations_with_ids(self._get_bigchain_connection())
log.info("Save relations without ids")
for account in filter(lambda acc: acc.has_relations_without_id, self.accounts):
document_tx_ids += account.save_relations_without_ids(self._get_bigchain_connection())
log.info("Saved document in %s Tx", len(document_tx_ids))
return document_tx_ids
def get_document(self, document_tx_ids: list) -> provmodel.ProvDocument:
"""
Retrieve a document by a list transaction ids from BigchainDB
:param document_tx_ids: Transaction Ids of Document
:type document_tx_ids: list
:return: Document as ProvDocument object
:rtype: ProvDocument
"""
log.info("Retrieve and rebuild document...")
doc = provmodel.ProvDocument()
for i in document_tx_ids:
log.info("tx id: %s",i)
tx = self._get_bigchain_connection().transactions.get(asset_id=i)[0]
self.test_transaction(tx)
if 'id' in tx['asset'].keys():
tx = self._get_bigchain_connection().transactions.get(asset_id=tx['asset']['id'])[0]
self.test_transaction(tx)
tmp_doc = utils.to_prov_document(tx['asset']['data']['prov'])
for namespace in tmp_doc.get_registered_namespaces():
doc.add_namespace(namespace)
for record in tmp_doc.get_records():
doc.add_record(record=record)
log.info("Success")
return doc
class RoleConceptClient(BaseClient):
""""""
def __init__(self, host: str = '0.0.0.0', port: int = 9984, num_connections: int = 5,
local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Role Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
super().__init__(host, port, num_connections, local_store=local_store)
self.accounts = []
@staticmethod
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
"""
Transforms a ProvDocument into a list of tuples including:
ProvAgent, list of ProvRelations from agent,
list of ProvElements associated to ProvAgent,
list of Namespaces
:param prov_document: Document to transform
:type prov_document:
:return: List of tuples(ProvAgent, list(), list(), list())
:rtype: list
"""
namespaces = prov_document.get_registered_namespaces()
g = provgraph.prov_to_graph(prov_document=prov_document)
sorted_nodes = list(reversed(list(topological_sort(g))))
agents = list(filter(lambda elem: isinstance(elem, provmodel.ProvAgent), sorted_nodes))
elements = list(filter(lambda elem: not isinstance(elem, provmodel.ProvAgent), sorted_nodes))
# Check on compatibility
if not is_directed_acyclic_graph(g):
raise Exception("Provenance graph is not acyclic")
if list(isolates(g)):
raise Exception("Provenance not compatible with role-based concept. Has isolated Elements")
for element in elements:
if provmodel.ProvAgent not in [type(n) for n in g.neighbors(element)]:
raise Exception(
"Provenance not compatible with role-based concept. Element {} has not relation to any agent".format(
element))
accounts = []
for agent in agents:
# find out-going relations from agent
agent_relations = []
for u, v in g.out_edges(agent):
# Todo check if filter does not left out some info
agent_relations.append(g.get_edge_data(u, v)[0]['relation'])
agent_elements = {}
i = 0
for element in elements:
element_relations = []
if g.has_edge(element, agent):
for u, v in set(g.out_edges(element)):
for relation in g[u][v].values():
element_relations.append(relation['relation'])
agent_elements[i] = {element: element_relations}
i += 1
accounts.append((agent, agent_relations, agent_elements, namespaces))
return accounts
def save_document(self, document: str or BufferedReader or provmodel.ProvDocument) -> list:
"""
Write a document into BigchainDB
:param document: Document as JSON/XML/PROVN
:type document: str or BufferedReader or ProvDocument
:return: List of transaction ids
:rtype: list
"""
log.info("Save document...")
document_tx_ids = []
prov_document = utils.to_prov_document(content=document)
account_data = RoleConceptClient.calculate_account_data(prov_document)
id_mapping = {}
log.info("Create and Save instances")
for agent, relations, elements, namespaces in account_data:
account = accounts.RoleConceptAccount(agent, relations, elements, id_mapping, namespaces, self.store)
self.accounts.append(account)
tx_id = account.save_instance_asset(self._get_bigchain_connection())
document_tx_ids.append(tx_id)
log.info("Save elements")
for account in self.accounts:
document_tx_ids += account.save_elements(self._get_bigchain_connection())
log.info("Saved document in %s Tx", len(document_tx_ids))
return document_tx_ids
def get_document(self, document_tx_ids: list) -> provmodel.ProvDocument:
"""
Returns a document by a list transaction ids from BigchainDB
:param document_tx_ids: Transaction Ids of Document
:type document_tx_ids: list
:return: Document as ProvDocument object
:rtype: ProvDocument
"""
log.info("Retrieve and rebuild document...")
doc = provmodel.ProvDocument()
for i in document_tx_ids:
tx = self._get_bigchain_connection().transactions.get(asset_id=i)[0]
self.test_transaction(tx)
if 'id' in tx['asset'].keys():
tx = self._get_bigchain_connection().transactions.get(asset_id=tx['asset']['id'])[0]
self.test_transaction(tx)
tmp_doc = utils.to_prov_document(tx['asset']['data']['prov'])
for namespace in tmp_doc.get_registered_namespaces():
doc.add_namespace(namespace)
for record in tmp_doc.get_records():
doc.add_record(record=record)
log.info("Success")
return doc