-
Notifications
You must be signed in to change notification settings - Fork 1
/
__init__.py
183 lines (150 loc) · 6.18 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import logging
import requests
from frictionless_ckan_mapper import ckan_to_frictionless, frictionless_to_ckan
from ckanclient.auth import CkanAuthApi
from ckanclient.errors import ResponseError
from ckanclient.text import camel_to_snake
from ckanclient.upload import push_data_to_blob_storage, verify_upload
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
log = logging.getLogger(__name__)
class CkanClientError(ResponseError):
pass
class Client:
"""The client to connect to and interact with a CKAN instance running with
blob storage. It offers basic methods to perform frequent actions such as
pushing or retrieving a dataset."""
def __init__(self, api_url, api_key, organization, dataset_id, lfs_url):
"""Initialization to create instances of the client (requires CKAN with
blob storage).
Attributes:
api_url (str): The base URL to access the API (without the
`/api/<version>` part).
api_key (str): The API key (as seen in the web interface of a user).
organization (str): The name or id of an existing organization.
dataset_id (str): = The ID of an existing dataset.
lfs_url (str): = The URL to access the Git LFS server (e.g.: Giftless).
"""
if not api_url.endswith('/'):
api_url = f'{api_url}/'
if not lfs_url.endswith('/'):
lfs_url = f'{lfs_url}/'
self.api_url = api_url
self.api_key = api_key
self.organization = organization
self.dataset_id = dataset_id
self.lfs_url = lfs_url
self.auth = CkanAuthApi(self)
def action(
self,
name,
payload,
http_get=False,
transform_payload=None,
transform_response=None,
):
"""Gives you direct access to the CKAN Action API
(https://docs.ckan.org/en/2.8/api/).
Attributes:
name (str): The action name, e.g. site_read, package_show…
payload (dict): The payload being sent to CKAN. If a payload is
provided for a GET request, it will be converted to URL
parameters and each key will be converted to snake case.
http_get (bool): Optional, if `True` will make `GET` request,
otherwise `POST`.
transform_payload (func): Function to mutate the `payload` before
making the request (useful to convert to and from CKAN and
Frictionless formats).
transform_response (func): Function to mutate the response data
before returning it (useful to convert to and from CKAN and
Frictionless formats).
Returns:
dict: the API response converted from JSON to a Python dictionary.
"""
url = f'{self.api_url}api/3/action/{name}'
headers = {
'Content-Type': 'application/json;charset=utf-8',
'Authorization': self.api_key,
}
if transform_payload:
payload = transform_payload(payload)
if http_get:
params = {camel_to_snake(k): v for k, v in payload.items()}
response = requests.get(url, headers=headers, params=params)
else:
response = requests.post(url, headers=headers, json=payload)
if response.status_code < 200 or response.status_code >= 300:
raise CkanClientError(response)
data = response.json()
return transform_response(data) if transform_response else data
def create(self, dataset_name_or_metadata):
"""Creates a new dataset.
Attributes:
dataset_name_or_metadata (str or dict): It is either a string
being a valid dataset name or metadata for the dataset in
Frictionless format.
Returns:
dict: the newly created dataset.
"""
metadata = dataset_name_or_metadata
if isinstance(metadata, str):
metadata = {
'name': dataset_name_or_metadata,
'owner_org': self.organization,
}
return self.action(
'package_create',
metadata,
transform_payload=ckan_to_frictionless.dataset,
transform_response=frictionless_to_ckan.resource,
)
def push(self, dataset_metadata):
"""Updates the dataset.
Attributes:
dataset_metadata (dict): the metadata in Frictionless format.
Returns:
dict: the updated dataset.
"""
return self.action(
'package_update',
dataset_metadata,
transform_payload=ckan_to_frictionless.dataset,
transform_response=frictionless_to_ckan.resource,
)
def retrieve(self, name_or_id):
"""Retrieves the dataset.
Attributes:
name_or_id (str): Id or name of the dataset.
Returns:
dict: a Frictionless dataset.
"""
return self.action(
'package_show',
{'id': name_or_id},
http_get=True,
transform_response=ckan_to_frictionless.resource,
)
def push_blob(self, resource): # TODO async version with on_progress attr
"""Gets the result of push blob method.
Attributes:
resource (dict): A Fricionless resource.
Returns:
dict: the push blob result has the keys `oid` (str), `size` (int)
of the file, `name` (str) of the resource, `success` (bool) of
the request, and `fileExists` (bool) indicating whether the
resource existed or not.
"""
lfs = self.auth.request_file_upload_actions(resource)
obj, *_ = lfs['objects']
result = {
'oid': obj['oid'],
'size': obj['size'],
'name': resource['name'],
'success': True,
'fileExists': True,
}
if obj.get("actions"): # file is not in storage
push_data_to_blob_storage(obj["actions"]["upload"], resource)
verify_upload(obj["actions"]["verify"], resource)
result["fileExists"] = False
return result