/
AutoRole.py
184 lines (160 loc) · 6.68 KB
/
AutoRole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# -*- coding: utf-8 -*-
import re
from logging import getLogger
from AccessControl.class_init import InitializeClass
from AccessControl.SecurityInfo import ClassSecurityInfo
from Acquisition import aq_inner, aq_parent
from Products.AutoRoleFromHostHeader.interfaces import \
ConfigurationChangedEvent
from Products.PageTemplates.Expressions import createZopeEngine
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.interfaces.plugins import (
IAuthenticationPlugin, IExtractionPlugin, IGroupsPlugin, IRolesPlugin)
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements
from zope.event import notify
manage_addAutoRoleForm = PageTemplateFile(
'www/autoRoleAdd', globals(), __name__='manage_addAutoRoleForm')
logger = getLogger("Products.AutoRoleFromHostHeader")
def addAutoRole( dispatcher
, id
, title=None
, match_roles=()
, REQUEST=None
):
""" Add an AutoRole plugin to a Pluggable Auth Service. """
sp = AutoRole(id, title, match_roles)
dispatcher._setObject(sp.getId(), sp)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect( '%s/manage_workspace'
'?manage_tabs_message='
'AutoRole+added.'
% dispatcher.absolute_url() )
class AutoRole(BasePlugin):
""" Multi-plugin for assigning auto roles from IP. """
meta_type = 'Auto Role Header Plugin'
security = ClassSecurityInfo()
_properties = (
dict(id='title', label='Title', type='string', mode='w'),
dict(id='match_roles', label='Header name; regexp; roles/groups ; TALES condition expression', type='lines',
mode='w'),
dict(id='anon_only', label='Anonymous Only', type='boolean',
mode='w'),
)
anon_only = False
def __init__(self, id, title=None, match_roles=()):
self._setId(id)
self.title = title
self.match_roles = match_roles
self.anon_only = False
self._compiled = []
def _compile_matchs(self):
self._compiled = compiled = []
for line in self.match_roles:
try:
values = line.split(';')
if len(values) == 3:
# If there isn't a condition, pretend the condition was
# something that's always true
values = values + ['python:True',]
header_name, regexp, roles, condition = values
roles = [r.strip() for r in roles.split(',')]
roles = set([role for role in roles if role])
if not roles:
continue
except (ValueError, AttributeError):
continue
compiled.append((header_name, regexp, roles, condition))
def _setPropValue(self, id, value):
BasePlugin._setPropValue(self, id, value)
if id == 'match_roles':
self._compile_matchs()
if value and len(self._compiled) != len(self.match_roles):
raise ValueError(
'match_roles contains invalid parameters!')
notify(ConfigurationChangedEvent(self))
#
# IRolesPlugin
#
security.declarePrivate('getRolesForPrincipal')
def getRolesForPrincipal(self, principal, request=None):
""" Assign roles based on 'request'. """
# we need this for uncontexted calls
if request is None:
return []
engine = createZopeEngine()
if (self.anon_only and
principal is not None and
principal.getUserName() != 'Anonymous User'):
return []
if not self._compiled:
return []
result = set()
portal = aq_inner(aq_parent(self._getPAS()))
context = engine.getContext(request=request, portal=portal)
for header_name, regexp, roles, condition in self._compiled:
try:
condition = engine.compile(condition)
header = request.get(header_name)
if header:
check_header = re.compile(regexp)
if check_header.match(header) and condition(context):
result.update(roles)
except Exception as e:
logger.exception("Couldn't evaluate AutoRole rule")
return list(result)
#
# IGroupsPlugin
#
# This method allows the plugin to be used to assign groups instead of
# roles if used as a group plugin instead of a role plugin.
security.declarePrivate('getGroupsForPrincipal')
def getGroupsForPrincipal(self, principal, request=None):
""" Assign groups based on 'request'. """
return self.getRolesForPrincipal(principal, request)
#
# IExtractionPlugin
#
security.declarePrivate('extractCredentials')
def extractCredentials(self, request):
# Avoid creating anon user if this is a regular user
# We actually have to poke request ourselves to avoid users from
# root becoming anonymous...
engine = createZopeEngine()
if getattr(request, '_auth', None):
return {}
if not self._compiled:
return {}
portal = aq_inner(aq_parent(self._getPAS()))
context = engine.getContext(request=request, portal=portal)
for header_name, regexp, roles, condition in self._compiled:
try:
condition = engine.compile(condition)
header = request.get(header_name)
if header:
check_header = re.compile(regexp)
if check_header.match(header) and condition(context):
return dict(AutoRole=True)
except Exception as e:
logger.exception("Couldn't evaluate AutoRole rule")
return {}
#
# IAuthenticationPlugin
#
security.declarePrivate('authenticateCredentials')
def authenticateCredentials(self, credentials):
# Make sure we don't instantiate anonymous if there are other credentials
# BBB: this seems only work for basic authentication. Can we do something better?
if credentials.get('login'):
return None
autorole = credentials.get('AutoRole', None)
if not autorole:
return None
return ('Anonymous User', 'Anonymous User')
classImplements( AutoRole
, IRolesPlugin
, IGroupsPlugin
, IExtractionPlugin
, IAuthenticationPlugin
)
InitializeClass(AutoRole)