-
Notifications
You must be signed in to change notification settings - Fork 0
/
edc_data_management.py
132 lines (119 loc) · 5.06 KB
/
edc_data_management.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import requests
from uuid import uuid4
class EdcDataManagement():
"""
This is a new clean implemenation of what is 'hidden' in edc_handling.
This implementation does NOT include backward compatibilites prior product-edc 0.1.1.
"""
def __init__(self,
data_management_base_url: str,
data_management_auth_key: str = '',
data_management_auth_code: str = '',
backend_auth_code: str = '',
backend_auth_key: str = 'X-Api-Key',
):
"""
data_management_base_url: base url. /assets /policies etc is added to that
data_management_auth_key: header key to access data management api
data_management_auth_code: header value to access data management api
backend_auth_key: header that is aded to the 'dataAddress' when an asset is created. used to access the backend
backend_auth_code: value that is added to the 'dataAddress' to access a backend.
"""
self.data_management_base_url = data_management_base_url
self.data_management_auth_key = data_management_auth_key
self.data_management_auth_code = data_management_auth_code
self.backend_auth_code = backend_auth_code
self.backend_auth_key = backend_auth_key
def exists(self, asset_id: str) -> bool:
r = requests.get(f"{self.data_management_base_url}/assets/{asset_id}", headers=self._prepare_data_management_auth())
if not r.ok:
return False
return True
def create_asset_and_friends(self, asset_id: str, endpoint: str):
asset_id = self.create_asset(asset_id=asset_id, endpoint=endpoint)
policy_id = self.create_policy(asset_id=asset_id)
contract_definition_id = self.create_contract_definition(policy_id=policy_id, asset_id=asset_id)
return {
'asset_id': asset_id,
'policy_id': policy_id,
'contract_definition_id': contract_definition_id,
}
def create_asset(self, asset_id: str, endpoint: str):
asset_props = {
"asset:prop:id": asset_id,
"asset:prop:contenttype": "application/json",
"asset:prop:policy-id": "use-eu",
}
data_address_props = {
"type": "HttpData",
"proxyMethod": True,
"proxyBody": True,
#"proxyPath": False, # to avoid /submodel at the end
#"proxyQueryParams": True,
"baseUrl": endpoint
}
return self.create_asset_by_props(asset_props=asset_props, data_address_props=data_address_props)
def create_asset_by_props(self, asset_props: dict, data_address_props: dict):
data = {
"asset": {
"properties": asset_props
},
"dataAddress": {
"properties": data_address_props
}
}
# add secrets for the backen system if set
if self.backend_auth_code:
data['dataAddress']['properties']['authCode'] = self.backend_auth_code
data['dataAddress']['properties']['authKey'] = self.backend_auth_key
r = requests.post(f"{self.data_management_base_url}/assets", json=data, headers=self._prepare_data_management_auth())
if not r.ok:
print("Could not create asset.") # TODO: should we try to delete before we create?
print(r.content)
return None
# TODO: checks
return asset_props["asset:prop:id"]
def create_contract_definition(self, policy_id: str, asset_id: str):
cd_id = str(uuid4())
data = {
"id": cd_id,
"accessPolicyId": policy_id,
"contractPolicyId": policy_id,
"criteria": [
{
"operandLeft": "asset:prop:id",
"operator": "=",
"operandRight": asset_id
}
],
}
r = requests.post(f"{self.data_management_base_url}/contractdefinitions", json=data, headers=self._prepare_data_management_auth())
return cd_id
def create_policy(self, asset_id: str):
policy_id = str(uuid4())
data = {
"id": policy_id,
"policy": {
"permissions": [
{
"target": asset_id,
"action": {
"type": "USE"
},
"edctype": "dataspaceconnector:permission"
}
],
},
"@type": {
"@policytype": "set"
}
}
r = requests.post(f"{self.data_management_base_url}/policydefinitions", json=data, headers=self._prepare_data_management_auth())
return policy_id
def _prepare_data_management_auth(self):
if not self.data_management_auth_key:
return {}
headers = {
self.data_management_auth_key : self.data_management_auth_code
}
return headers