/
common.py
241 lines (194 loc) · 7.47 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
"""Authentication methods for API."""
from kqueen.config import current_config
from kqueen.models import Organization
from kqueen.models import User
from uuid import uuid4
import bcrypt
import importlib
import logging
config = current_config()
logger = logging.getLogger('kqueen_api')
"""
Authentication Modules
To define new module, need to specify it as dictionary, where:
"auth_option_lower_case": {
"engine": "EqualsToAuthClassName",
"parameters": {
"key": "value"
}
}
"""
AUTH_MODULES = {
"ldap": {
"engine": "LDAPAuth",
"parameters": {
"uri": config.get('LDAP_URI'),
"admin_dn": config.get('LDAP_DN'),
"_password": config.get('LDAP_PASSWORD')
}
},
"local": {
"engine": "LocalAuth",
"parameters": {}
}
}
def generate_auth_options(auth_list):
auth_options = {}
methods = [item.strip() for item in auth_list.split(',')]
for m in methods:
if m in AUTH_MODULES:
auth_options[m] = AUTH_MODULES[m]
if not auth_options:
auth_options['local'] = {'engine': 'LocalAuth', 'parameters': {}}
logger.debug('Auth configuration options are generated ')
return auth_options
def get_auth_instance(name):
# Default type is local auth
config = current_config()
auth_options = generate_auth_options(config.get("AUTH_MODULES"))
auth_config = auth_options.get(name, {})
# If user auth is not specified clearly, use local
if name == 'local' or name is None:
auth_config = {'engine': 'LocalAuth', 'parameters': {}}
module = importlib.import_module('kqueen.auth')
auth_engine = auth_config.get('engine')
logger.debug("Using {} Authentication Engine".format(auth_engine))
if not auth_engine:
raise Exception('Authentication type is set to {}, but engine class name is not found. '
'Please, set it with the "engine" key'.format(name))
auth_class = getattr(module, auth_engine)
if callable(auth_class):
return auth_class(**auth_config.get('parameters', {}))
def authenticate(username, password):
"""
Authenticate user.
Args:
username (str): Username to login
password (str): Password
Returns:
user: authenticated user
"""
# find user by username
users = list(User.list(None, return_objects=True).values())
username_table = {u.username: u for u in users}
user = username_table.get(username)
if user:
user.metadata = user.metadata or {}
given_password = password.encode('utf-8')
logger.debug("User {} will be authenticated using {}".format(username, user.auth))
auth_instance = get_auth_instance(user.auth)
try:
verified_user, verification_error = auth_instance.verify(user, given_password)
except Exception as e:
logger.exception("Verification method {} failed".format(user.auth))
verified_user, verification_error = None, str(e)
if isinstance(verified_user, User) and verified_user.active:
logger.info("User {user} passed {method} auth successfully".format(user=user, method=user.auth))
return verified_user
else:
logger.info("User {user} failed auth using {method} auth method with error {error}".format(
user=user,
method=user.auth,
error=verification_error,
))
def identity(payload):
"""
Read user_id from payload and return User.
Args:
payload (dict): Request payload
Returns:
user: detected user
"""
user_id = payload['identity']
try:
user = User.load(None, user_id)
except Exception:
user = None
return user
def encrypt_password(_password):
if not _password:
return None
config = current_config()
rounds = config.get('BCRYPT_ROUNDS', 12)
password = str(_password).encode('utf-8')
encrypted = bcrypt.hashpw(password, bcrypt.gensalt(rounds)).decode('utf-8')
return encrypted
def is_authorized(_user, policy_value, resource=None):
"""
Evaluate if given user fulfills requirements of the given
policy_value.
Example:
>>> user.get_dict()
>>> {'username': 'jsmith', ..., 'role': 'member'}
>>> is_authorized(user, "ALL")
True
>>> is_authorized(user, "IS_ADMIN")
False
Args:
user (dict or User): User data
policy_value (string or list): Condition written using shorthands and magic keywords
Returns:
bool: authorized or not
"""
if isinstance(_user, User):
user = _user.get_dict()
elif isinstance(_user, dict):
user = _user
else:
raise TypeError('Invalid type for argument user {}'.format(type(_user)))
# magic keywords
USER = user['id'] # noqa: F841
ORGANIZATION = user['organization'].id # noqa: F841
ROLE = user['role']
if resource:
validation, _ = resource.validate()
if not validation:
invalid = True
# test if we are validating on create view and if so, patch missing object id
if not resource.id:
resource.id = uuid4()
validation, _ = resource.validate()
if validation:
invalid = False
resource.id = None
# if invalid resource is passed, let's just continue dispatch_request
# so it can properly fail with 500 response code
if invalid:
logger.error('Cannot evaluate policy for invalid object: {}'.format(str(resource.get_dict())))
return True
# TODO: check owner has id and, organization ...
if hasattr(resource, 'owner'):
OWNER = resource.owner.id # noqa: F841
OWNER_ORGANIZATION = resource.owner.organization.id # noqa: F841
elif isinstance(resource, User):
OWNER = resource.id # noqa: F841
OWNER_ORGANIZATION = resource.organization.id # noqa: F841
elif isinstance(resource, Organization):
OWNER_ORGANIZATION = resource.id # noqa: F841
# predefined conditions for evaluation
conditions = {
'IS_ADMIN': 'ORGANIZATION == OWNER_ORGANIZATION and ROLE == "admin"',
'IS_SUPERADMIN': 'ROLE == "superadmin"',
'IS_OWNER': 'ORGANIZATION == OWNER_ORGANIZATION and USER == OWNER',
'ADMIN_OR_OWNER': 'ORGANIZATION == OWNER_ORGANIZATION and (ROLE == "admin" or USER == OWNER)',
'ALL': 'ORGANIZATION == OWNER_ORGANIZATION'
}
try:
condition = conditions[policy_value]
except KeyError:
logger.exception('Policy evaluation failed. Invalid rule: {}'.format(str(policy_value)))
return False
if ROLE == 'superadmin':
# no point in checking anything here
logger.debug('User {} id {} authorized as {}'.format(user['username'], user['id'], user['role']))
return True
try:
authorized = eval(condition)
if not isinstance(authorized, bool):
logger.error('Policy evaluation did not return boolean: {}'.format(str(authorized)))
authorized = False
except Exception as e:
logger.exception('Policy evaluation failed: ')
authorized = False
logger.debug('User {} id {} authorized as {}'.format(user['username'], user['id'], user['role']))
return authorized