-
Notifications
You must be signed in to change notification settings - Fork 2k
/
delete.py
126 lines (101 loc) · 4.51 KB
/
delete.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import ckan.logic as logic
import ckan.new_authz as new_authz
from ckan.logic.auth import get_package_object, get_group_object, get_related_object
from ckan.logic.auth import get_resource_object
from ckan.logic.auth.create import package_relationship_create
from ckan.authz import Authorizer
from ckan.lib.base import _
def package_delete(context, data_dict):
model = context['model']
user = context['user']
package = get_package_object(context, data_dict)
authorized = logic.check_access_old(package, model.Action.PURGE, context)
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to delete package %s') % (str(user),package.id)}
else:
return {'success': True}
def resource_delete(context, data_dict):
model = context['model']
user = context.get('user')
resource = get_resource_object(context, data_dict)
# check authentication against package
query = model.Session.query(model.Package)\
.join(model.ResourceGroup)\
.join(model.Resource)\
.filter(model.ResourceGroup.id == resource.resource_group_id)
pkg = query.first()
if not pkg:
raise logic.NotFound(_('No package found for this resource, cannot check auth.'))
pkg_dict = {'id': pkg.id}
authorized = package_delete(context, pkg_dict).get('success')
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to delete resource %s') % (str(user), resource.id)}
else:
return {'success': True}
def related_delete(context, data_dict):
model = context['model']
user = context['user']
if not user:
return {'success': False, 'msg': _('Only the owner can delete a related item')}
if Authorizer().is_sysadmin(unicode(user)):
return {'success': True}
related = get_related_object(context, data_dict)
userobj = model.User.get( user )
if related.datasets:
package = related.datasets[0]
pkg_dict = { 'id': package.id }
authorized = package_delete(context, pkg_dict).get('success')
if authorized:
return {'success': True}
if not userobj or userobj.id != related.owner_id:
return {'success': False, 'msg': _('Only the owner can delete a related item')}
return {'success': True}
def package_relationship_delete(context, data_dict):
can_edit_this_relationship = package_relationship_create(context, data_dict)
if not can_edit_this_relationship['success']:
return can_edit_this_relationship
model = context['model']
user = context['user']
relationship = context['relationship']
authorized = logic.check_access_old(relationship, model.Action.PURGE, context)
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to delete relationship %s') % (str(user),relationship.id)}
else:
return {'success': True}
def group_delete(context, data_dict):
group = get_group_object(context, data_dict)
user = context['user']
user_id = new_authz.get_user_id_for_username(user)
authorized = new_authz.has_user_permission_for_group_or_org(
group.id, user_id, 'delete')
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to delete group %s') % (str(user),group.id)}
else:
return {'success': True}
def organization_delete(context, data_dict):
group = get_group_object(context, data_dict)
user = context['user']
user_id = new_authz.get_user_id_for_username(user)
authorized = new_authz.has_user_permission_for_group_or_org(
group.id, user_id, 'delete')
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to delete organization %s') % (str(user),group.id)}
else:
return {'success': True}
def revision_undelete(context, data_dict):
return {'success': False, 'msg': 'Not implemented yet in the auth refactor'}
def revision_delete(context, data_dict):
return {'success': False, 'msg': 'Not implemented yet in the auth refactor'}
def task_status_delete(context, data_dict):
user = context['user']
authorized = Authorizer().is_sysadmin(unicode(user))
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to delete task_status') % str(user)}
else:
return {'success': True}
def vocabulary_delete(context, data_dict):
user = context['user']
return {'success': Authorizer.is_sysadmin(user)}
def tag_delete(context, data_dict):
user = context['user']
return {'success': Authorizer.is_sysadmin(user)}