/
views.py
366 lines (261 loc) · 10.4 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
"""
byceps.blueprints.authentication.views
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:Copyright: 2006-2020 Jochen Kupperschmidt
:License: Modified BSD, see LICENSE for details.
"""
from typing import Optional
from flask import abort, g, request, url_for
from ...config import get_app_mode
from ...services.authentication.exceptions import AuthenticationFailed
from ...services.authentication import service as authentication_service
from ...services.authentication.password import service as password_service
from ...services.authentication.password import (
reset_service as password_reset_service,
)
from ...services.authentication.session import service as session_service
from ...services.consent import consent_service
from ...services.email import service as email_service
from ...services.email.transfer.models import Sender
from ...services.site import service as site_service
from ...services.terms import version_service as terms_version_service
from ...services.user import service as user_service
from ...services.verification_token import service as verification_token_service
from ...services.verification_token.models import Token as VerificationToken
from ...typing import UserID
from ...util.framework.blueprint import create_blueprint
from ...util.framework.flash import flash_error, flash_notice, flash_success
from ...util.framework.templating import templated
from ...util.views import redirect_to, respond_no_content
from ..admin.core.authorization import AdminPermission
from ..user.creation.views import _find_privacy_policy_consent_subject_id
from .forms import (
LoginForm,
RequestPasswordResetForm,
ResetPasswordForm,
UpdatePasswordForm,
)
from . import service, session as user_session
blueprint = create_blueprint('authentication', __name__)
# -------------------------------------------------------------------- #
# log in/out
@blueprint.route('/login')
@templated
def login_form():
"""Show login form."""
logged_in = g.current_user.is_active
if logged_in:
flash_notice(
f'Du bist bereits als Benutzer "{g.current_user.screen_name}" '
'angemeldet.'
)
in_admin_mode = get_app_mode().is_admin()
if not _is_login_enabled(in_admin_mode):
return {
'login_enabled': False,
}
form = LoginForm()
user_account_creation_enabled = _is_user_account_creation_enabled(in_admin_mode)
return {
'login_enabled': True,
'logged_in': logged_in,
'form': form,
'user_account_creation_enabled': user_account_creation_enabled,
}
@blueprint.route('/login', methods=['POST'])
@respond_no_content
def login():
"""Allow the user to authenticate with e-mail address and password."""
if g.current_user.is_active:
return
in_admin_mode = get_app_mode().is_admin()
if not _is_login_enabled(in_admin_mode):
abort(403, 'Log in to this site is generally disabled.')
form = LoginForm(request.form)
screen_name = form.screen_name.data.strip()
password = form.password.data
permanent = form.permanent.data
if not all([screen_name, password]):
abort(403)
try:
user = authentication_service.authenticate(screen_name, password)
except AuthenticationFailed:
abort(403)
if in_admin_mode:
_require_admin_access_permission(user.id)
if not in_admin_mode:
required_consent_subject_ids = _get_required_consent_subject_ids()
if _is_consent_required(user.id, required_consent_subject_ids):
verification_token = verification_token_service.create_for_terms_consent(
user.id
)
consent_form_url = url_for(
'consent.consent_form', token=verification_token.token
)
return [('Location', consent_form_url)]
# Authorization succeeded.
session_token = session_service.get_session_token(user.id)
session_service.record_recent_login(user.id)
service.create_login_event(user.id, request.remote_addr)
user_session.start(user.id, session_token.token, permanent=permanent)
flash_success(f'Erfolgreich eingeloggt als {user.screen_name}.')
def _require_admin_access_permission(user_id: UserID) -> None:
permissions = service.get_permissions_for_user(user_id)
if AdminPermission.access not in permissions:
# The user lacks the admin access permission which is required
# to enter the admin area.
abort(403)
def _get_required_consent_subject_ids():
subject_ids = []
terms_version = terms_version_service.find_current_version_for_brand(
g.brand_id
)
if terms_version:
subject_ids.append(terms_version.consent_subject_id)
privacy_policy_consent_subject_id = (
_find_privacy_policy_consent_subject_id()
)
if privacy_policy_consent_subject_id:
subject_ids.append(privacy_policy_consent_subject_id)
return subject_ids
def _is_consent_required(user_id, subject_ids):
for subject_id in subject_ids:
if not consent_service.has_user_consented_to_subject(
user_id, subject_id
):
return True
return False
@blueprint.route('/logout', methods=['POST'])
@respond_no_content
def logout():
"""Log out user by deleting the corresponding cookie."""
user_session.end()
flash_success('Erfolgreich ausgeloggt.')
# -------------------------------------------------------------------- #
# password update
@blueprint.route('/password/update')
@templated
def password_update_form(erroneous_form=None):
"""Show a form to update the current user's password."""
_get_current_user_or_404()
form = erroneous_form if erroneous_form else UpdatePasswordForm()
return {'form': form}
@blueprint.route('/password', methods=['POST'])
def password_update():
"""Update the current user's password."""
user = _get_current_user_or_404()
form = UpdatePasswordForm(request.form)
if not form.validate():
return password_update_form(form)
password = form.new_password.data
password_service.update_password_hash(user.id, password, user.id)
flash_success('Dein Passwort wurde geändert. Bitte melde dich erneut an.')
return redirect_to('.login_form')
# -------------------------------------------------------------------- #
# password reset
@blueprint.route('/password/reset/request')
@templated
def request_password_reset_form(erroneous_form=None):
"""Show a form to request a password reset."""
form = erroneous_form if erroneous_form else RequestPasswordResetForm()
return {'form': form}
@blueprint.route('/password/reset/request', methods=['POST'])
def request_password_reset():
"""Request a password reset."""
form = RequestPasswordResetForm(request.form)
if not form.validate():
return request_password_reset_form(form)
screen_name = form.screen_name.data.strip()
user = user_service.find_user_by_screen_name(
screen_name, case_insensitive=True
)
if (user is None) or user.deleted:
flash_error(f'Der Benutzername "{screen_name}" ist unbekannt.')
return request_password_reset_form(form)
if user.email_address is None:
flash_error(
f'Für das Benutzerkonto "{screen_name}" ist keine E-Mail-Adresse hinterlegt.'
)
return request_password_reset_form(form)
if not user.email_address_verified:
flash_error(
f'Die E-Mail-Adresse für das Benutzerkonto "{screen_name}" '
'wurde noch nicht bestätigt.'
)
return redirect_to('user_email_address.request_confirmation_email')
if user.suspended:
flash_error(f'Das Benutzerkonto "{screen_name}" ist gesperrt.')
return request_password_reset_form(form)
sender = _get_sender()
password_reset_service.prepare_password_reset(
user, request.url_root, sender=sender
)
flash_success(
'Ein Link zum Setzen eines neuen Passworts '
f'für den Benutzernamen "{user.screen_name}" '
'wurde an die hinterlegte E-Mail-Adresse versendet.'
)
return request_password_reset_form()
def _get_sender() -> Optional[Sender]:
if not get_app_mode().is_site():
return None
site = site_service.get_site(g.site_id)
email_config = email_service.get_config(site.email_config_id)
return email_config.sender
@blueprint.route('/password/reset/token/<token>')
@templated
def password_reset_form(token, erroneous_form=None):
"""Show a form to reset the current user's password."""
_verify_password_reset_token(token)
form = erroneous_form if erroneous_form else ResetPasswordForm()
return {
'form': form,
'token': token,
}
@blueprint.route('/password/reset/token/<token>', methods=['POST'])
def password_reset(token):
"""Reset the current user's password."""
verification_token = _verify_password_reset_token(token)
form = ResetPasswordForm(request.form)
if not form.validate():
return password_reset_form(token, form)
password = form.new_password.data
password_reset_service.reset_password(verification_token, password)
flash_success('Das Passwort wurde geändert.')
return redirect_to('.login_form')
def _verify_password_reset_token(token: str) -> VerificationToken:
verification_token = verification_token_service.find_for_password_reset_by_token(
token
)
if not _is_verification_token_valid(verification_token):
flash_error(
'Es wurde kein gültiges Token angegeben. '
'Ein Token ist nur 24 Stunden lang gültig.'
)
abort(404)
user = user_service.find_active_user(verification_token.user_id)
if user is None:
flash_error('Es wurde kein gültiges Token angegeben.')
abort(404)
return verification_token
def _is_verification_token_valid(token: Optional[VerificationToken]) -> bool:
return (token is not None) and not token.is_expired
# -------------------------------------------------------------------- #
# helpers
def _is_user_account_creation_enabled(in_admin_mode):
if in_admin_mode:
return False
site = _get_site()
return site.user_account_creation_enabled
def _is_login_enabled(in_admin_mode):
if in_admin_mode:
return True
site = _get_site()
return site.login_enabled
def _get_site():
return site_service.get_site(g.site_id)
def _get_current_user_or_404():
user = g.current_user
if not user.is_active:
abort(404)
return user