This repository has been archived by the owner on Jan 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 26
/
action.py
224 lines (172 loc) · 7.99 KB
/
action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# coding: utf8
from __future__ import unicode_literals
import ckan.authz as authz
from ckan.common import _
import ckan.lib.base as base
from ckan.lib.base import render_jinja2
from ckan.lib.mailer import mail_recipient
from ckan.lib.mailer import MailerException
import ckan.logic
from ckan.logic.action.create import user_create
from ckan.logic.action.get import package_search
from ckan.logic.action.get import package_show
from ckan.logic.action.get import resource_search
from ckan.logic.action.get import resource_view_list
from ckan.logic import side_effect_free
from ckanext.restricted import auth
from ckanext.restricted import logic
import json
import copy
from ckan.common import config
from logging import getLogger
log = getLogger(__name__)
_get_or_bust = ckan.logic.get_or_bust
NotFound = ckan.logic.NotFound
render = base.render
def restricted_user_create_and_notify(context, data_dict):
def body_from_user_dict(user_dict):
body = ''
for key, value in user_dict.items():
body += '* {0}: {1}\n'.format(
key.upper(), value if isinstance(value, str) else str(value))
return body
user_dict = user_create(context, data_dict)
# Send your email, check ckan.lib.mailer for params
try:
name = _('CKAN System Administrator')
email = config.get('email_to')
if not email:
raise MailerException('Missing "email-to" in config')
subject = _('New Registration: {0} ({1})').format(
user_dict.get('name', _(u'new user')), user_dict.get('email'))
extra_vars = {
'site_title': config.get('ckan.site_title'),
'site_url': config.get('ckan.site_url'),
'user_info': body_from_user_dict(user_dict)}
body = render(
'restricted/emails/restricted_user_registered.txt', extra_vars)
mail_recipient(name, email, subject, body)
except MailerException as mailer_exception:
log.error('Cannot send mail after registration')
log.error(mailer_exception)
return user_dict
@side_effect_free
def restricted_resource_view_list(context, data_dict):
model = context['model']
id = _get_or_bust(data_dict, 'id')
resource = model.Resource.get(id)
if not resource:
raise NotFound
authorized = auth.restricted_resource_show(
context, {'id': resource.get('id'), 'resource': resource}).get('success', False)
if not authorized:
return []
else:
return resource_view_list(context, data_dict)
@side_effect_free
def restricted_package_show(context, data_dict):
package_metadata = package_show(context, data_dict)
# Ensure user who can edit can see the resource
if authz.is_authorized(
'package_update', context, package_metadata).get('success', False):
return package_metadata
# Custom authorization
if isinstance(package_metadata, dict):
restricted_package_metadata = dict(package_metadata)
else:
restricted_package_metadata = dict(package_metadata.for_json())
# restricted_package_metadata['resources'] = _restricted_resource_list_url(
# context, restricted_package_metadata.get('resources', []))
restricted_package_metadata['resources'] = _restricted_resource_list_hide_fields(
context, restricted_package_metadata.get('resources', []))
return restricted_package_metadata
@side_effect_free
def restricted_resource_search(context, data_dict):
resource_search_result = resource_search(context, data_dict)
restricted_resource_search_result = {}
for key, value in resource_search_result.items():
if key == 'results':
# restricted_resource_search_result[key] = \
# _restricted_resource_list_url(context, value)
restricted_resource_search_result[key] = \
_restricted_resource_list_hide_fields(context, value)
else:
restricted_resource_search_result[key] = value
return restricted_resource_search_result
@side_effect_free
def restricted_package_search(context, data_dict):
package_search_result = package_search(context, data_dict)
restricted_package_search_result = {}
package_show_context = context.copy()
package_show_context['with_capacity'] = False
for key, value in package_search_result.items():
if key == 'results':
restricted_package_search_result_list = []
for package in value:
restricted_package_search_result_list.append(
restricted_package_show(package_show_context, {'id': package.get('id')}))
restricted_package_search_result[key] = \
restricted_package_search_result_list
else:
restricted_package_search_result[key] = value
return restricted_package_search_result
@side_effect_free
def restricted_check_access(context, data_dict):
package_id = data_dict.get('package_id', False)
resource_id = data_dict.get('resource_id', False)
user_name = logic.restricted_get_username_from_context(context)
if not package_id:
raise ckan.logic.ValidationError('Missing package_id')
if not resource_id:
raise ckan.logic.ValidationError('Missing resource_id')
log.debug("action.restricted_check_access: user_name = " + str(user_name))
log.debug("checking package " + str(package_id))
package_dict = ckan.logic.get_action('package_show')(dict(context, return_type='dict'), {'id': package_id})
log.debug("checking resource")
resource_dict = ckan.logic.get_action('resource_show')(dict(context, return_type='dict'), {'id': resource_id})
return logic.restricted_check_user_resource_access(user_name, resource_dict, package_dict)
# def _restricted_resource_list_url(context, resource_list):
# restricted_resources_list = []
# for resource in resource_list:
# authorized = auth.restricted_resource_show(
# context, {'id': resource.get('id'), 'resource': resource}).get('success', False)
# restricted_resource = dict(resource)
# if not authorized:
# restricted_resource['url'] = _('Not Authorized')
# restricted_resources_list += [restricted_resource]
# return restricted_resources_list
def _restricted_resource_list_hide_fields(context, resource_list):
restricted_resources_list = []
for resource in resource_list:
# copy original resource
restricted_resource = dict(resource)
# get the restricted fields
restricted_dict = logic.restricted_get_restricted_dict(restricted_resource)
# hide fields to unauthorized users
authorized = auth.restricted_resource_show(
context, {'id': resource.get('id'), 'resource': resource}
).get('success', False)
# hide other fields in restricted to everyone but dataset owner(s)
if not authz.is_authorized(
'package_update', context, {'id': resource.get('package_id')}
).get('success'):
user_name = logic.restricted_get_username_from_context(context)
# hide partially other allowed user_names (keep own)
allowed_users = []
for user in restricted_dict.get('allowed_users'):
if len(user.strip()) > 0:
if user_name == user:
allowed_users.append(user_name)
else:
allowed_users.append(user[0:3] + '*****' + user[-2:])
new_restricted = json.dumps({
'level': restricted_dict.get("level"),
'allowed_users': ','.join(allowed_users)})
extras_restricted = resource.get('extras', {}).get('restricted', {})
if (extras_restricted):
restricted_resource['extras']['restricted'] = new_restricted
field_restricted_field = resource.get('restricted', {})
if (field_restricted_field):
restricted_resource['restricted'] = new_restricted
restricted_resources_list += [restricted_resource]
return restricted_resources_list