/
auth.py
218 lines (164 loc) · 6.8 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import functools
import ldap
import logbook
import requests
from flask import (abort, Blueprint, current_app, jsonify, request)
from itsdangerous import BadSignature, TimedSerializer
from flask_login import logout_user
from flask_simple_api import error_abort
from flask_security import SQLAlchemyUserDatastore
from flask_security.utils import login_user, verify_password
from .config import get_runtime_config_private_dict
from .models import db, Role, User
from .utils.oauth2 import get_oauth2_identity
_logger = logbook.Logger(__name__)
_MAX_TOKEN_AGE = 60 * 60 * 24 * 30 # one month
auth = Blueprint("auth", __name__, template_folder="templates")
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
@auth.route('/testing_login', methods=['POST'])
def testing_login():
if not current_app.config.get('TESTING'):
abort(requests.codes.not_found)
user_id = request.args.get('user_id')
if user_id is not None:
user = User.query.get_or_404(int(user_id))
else:
testing_email = 'testing@localhost'
user = get_or_create_user({'email': testing_email})
assert user
login_user(user)
user_info = {}
return _make_success_login_response(user, user_info)
@auth.route("/login", methods=['POST'])
def login():
credentials = request.get_json(silent=True)
if not isinstance(credentials, dict):
error_abort('Credentials provided are not a JSON object')
if credentials.get('username'):
return _login_with_credentials(credentials)
auth_code = credentials.get('authorizationCode')
if auth_code:
return _login_with_google_oauth2(auth_code)
error_abort('No credentials were specified', code=requests.codes.unauthorized)
def _login_with_credentials(credentials):
config = get_runtime_config_private_dict()
username = credentials.get('username')
password = credentials.get('password')
email = _fix_email(username, config)
user = User.query.filter_by(email=email).first()
if current_app.config['TESTING']:
if user is None:
user = get_or_create_user({'email': email})
login_user(user)
return _make_success_login_response(user)
if user is not None and user.password:
if verify_password(password, user.password):
login_user(user)
return _make_success_login_response(user)
_logger.debug('Could not login user locally (no user or password mismatch)')
return _login_with_ldap(email, password, config)
def _fix_email(email, runtime_config):
if email:
domain = runtime_config['default_domain']
if domain and not domain.startswith('@'):
domain = '@' + domain
if '@' not in email and domain:
email += domain
return email
def _login_with_ldap(email, password, config):
_logger.debug('Attempting login via LDAP for {}...', email)
_login_failed = functools.partial(
error_abort,
'Username or password are incorrect for {}'.format(email), code=requests.codes.unauthorized)
if not config['ldap_login_enabled'] or not email or not password:
_logger.debug('Rejecting login because LDAP is disabled or no username/password')
_login_failed()
try:
ldap_obj = ldap.initialize(config['ldap_uri'])
ldap_obj.bind_s(email, password)
ldap_infos = ldap_obj.search_s(config['ldap_base_dn'], ldap.SCOPE_SUBTREE, 'userPrincipalName={}'.format(email))
if not ldap_infos:
_logger.error('Could not authenticate via LDAP - no records found')
_login_failed()
ldap_info = ldap_infos[0][1]
user_info = {
'email': ldap_info['mail'][0].decode('utf-8'),
'given_name': ldap_info['givenName'][0].decode('utf-8'),
'family_name': ldap_info['sn'][0].decode('utf-8'),
}
user = get_or_create_user(user_info)
login_user(user)
return _make_success_login_response(user, user_info)
except ldap.INVALID_CREDENTIALS:
_logger.error('LDAP Invalid credentials', exc_info=True)
_login_failed()
def _login_with_google_oauth2(auth_code):
user_info = get_oauth2_identity(auth_code)
if not user_info:
error_abort('Could not complete OAuth2 exchange', code=requests.codes.unauthorized)
_check_alowed_email_domain(user_info)
user = get_or_create_user(user_info)
login_user(user)
return _make_success_login_response(user, user_info)
@auth.route("/logout", methods=['POST'])
def logout():
logout_user()
return jsonify({})
def _make_success_login_response(user, user_info=None):
if user_info is None:
user_info = {'email': user.email, 'given_name': user.first_name, 'last_name': user.last_name}
token = _generate_token(user, user_info)
_logger.debug('OAuth2 login success for {}. Token: {!r}', user_info, token)
return jsonify({
'auth_token': token,
'user_info': user_info,
})
@auth.route("/reauth", methods=['POST'])
def reauth():
token = (request.json or {}).get('auth_token')
if token is None:
error_abort('Missing reauth token')
try:
token_data = _get_token_serializer().loads(
token, max_age=_MAX_TOKEN_AGE)
except BadSignature:
error_abort('Reauth token invalid', code=requests.codes.unauthorized)
user = User.query.get_or_404(token_data['user_id'])
login_user(user)
return jsonify({
'auth_token': token,
'user_info': token_data['user_info'],
})
def _generate_token(user, user_info):
return _get_token_serializer().dumps({
'user_id': user.id,
'user_info': user_info})
def _get_token_serializer():
return TimedSerializer(current_app.config['SECRET_KEY'])
def get_or_create_user(user_info):
email = user_info['email']
user = user_datastore.get_user(email)
if not user:
user = user_datastore.create_user(
active=True,
email=email)
user_datastore.db.session.commit()
if user.first_name is None:
user.first_name = user_info.get('given_name', user_info.get('first_name'))
user.last_name = user_info.get('family_name', user_info.get('last_name'))
user_datastore.db.session.commit()
user_info['user_id'] = user.id
user_info['roles'] = [role.name for role in user.roles]
user_info['can'] = {role.name: True for role in user.roles}
user_info['can']['moderate'] = user_info['can'].get('moderator', False)
_logger.debug('User info: {}', user_info)
return user
def _check_alowed_email_domain(user_info):
email = user_info['email']
domain = email.split('@', 1)
allowed = current_app.config.get('ALLOWED_EMAIL_DOMAINS')
if allowed is None:
return
if domain not in allowed:
error_abort('Disallowed email domain for {}'.format(email), code=requests.codes.unauthorized)