From efe98c8ec8e142e7a831da87fa3ee23565443d5a Mon Sep 17 00:00:00 2001 From: Chris Wagner Date: Sat, 10 Jul 2021 18:48:52 -0700 Subject: [PATCH] Typing Part 3. (#502) This adds types to the Security constructor and init_app. This required major refactoring of how initialization happened since prior to this all arguments were added to kwargs (including default forms, config variables, etc) then set as attributes on the instance. Not easy to provide types for each one. We remove all that, remove the _SecurityState class and concept. Furthermore - in many places - we change from using _securyty.attr and use config_value("xxx"). Improve performance of config_value - no reason to create a dictonary every time - just query the key! Add typing to more tests and views to help verify the types make sense. Fix view responses - the types should be flask.ResponsValue - not Response Fix 'within' and 'grace' typing - then can take floats. closes: #140 --- .pre-commit-config.yaml | 2 +- CHANGES.rst | 19 +- docs/conf.py | 3 + flask_security/core.py | 599 +++++++++++++++++++------------ flask_security/decorators.py | 4 +- flask_security/proxies.py | 1 - flask_security/unified_signin.py | 145 ++++---- flask_security/utils.py | 25 +- flask_security/views.py | 120 ++++--- requirements/dev.txt | 1 + tests/conftest.py | 17 +- tests/test_changeable.py | 5 +- tests/test_misc.py | 59 ++- tests/test_registerable.py | 6 +- tox.ini | 1 + 15 files changed, 591 insertions(+), 416 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 099aa5f1..a16b6c9f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,7 @@ repos: - id: check-merge-conflict - id: fix-byte-order-marker - repo: https://github.com/asottile/pyupgrade - rev: v2.19.4 + rev: v2.21.0 hooks: - id: pyupgrade args: [--py36-plus] diff --git a/CHANGES.rst b/CHANGES.rst index a6de50f9..7ab9c102 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -8,12 +8,11 @@ Version 4.1.0 Released TBD -This release was bumped to 4.1.0 on the small chance that the change in the way -babel packages are tried and used might break existing applications. - Features ++++++++ - (:issue:`474`) Add public API and CLI command to change a user's password. +- (:issue:`140`) Add type hints. Please note that many of the packages that flask-security + depends on aren't typed yet - so there are likely errors in some of the types. Fixes +++++ @@ -21,10 +20,10 @@ Fixes - (:issue:`490`) Flask-Mail sender name can be a tuple. (hrishikeshrt) - (:issue:`486`) Possible open redirect vulnerability. - (:pr:`478`) Improve/update German translation. (sr-verde) -- (:issue:`488`) Improve handling of Babel packages +- (:issue:`488`) Improve handling of Babel packages. - (:pr:`496`) Documentation improvements, distribution extras, fix single message - override. -- (:issue:`497`) Improve cookie handling and default samesite to Strict + override. +- (:issue:`497`) Improve cookie handling and default ``samesite`` to ``Strict``. Backwards Compatibility Concerns +++++++++++++++++++++++++++++++++ @@ -39,8 +38,14 @@ Backwards Compatibility Concerns one of the other package - however if those modules are NOT initialized, Flask-Security will simply ignore them and no translations will occur. - (:issue:`497`) The CSRF_COOKIE and TWO_FACTOR_VALIDITY cookie had their defaults - changed to set ``samesite=Strict``. This follows the Flask-Security directive of + changed to set ``samesite=Strict``. This follows the Flask-Security goal of making things more secure out-of-the-box. +- (:issue:`140`) Type hinting. For the most part this of course has no runtime effects. + However, this required a fairly major overhaul of how Flask-Security is initialized in + order to provide valid types for the many constructor attributes. There are no known + compatability concerns - however initialization used to convert all arguments into kwargs + then add those as attributes and merge with application constants. That no longer happens + and it is possible that some corner cases don't behave precisely as they did before. Version 4.0.1 ------------- diff --git a/docs/conf.py b/docs/conf.py index 823eafcb..17f27151 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -100,6 +100,7 @@ nitpick_ignore = [ ("py:attr", "LoginManager.unauthorized"), ("py:class", "flask_mongoengine.MongoEngine"), + ("py:class", "ResponseValue"), ("py:class", "function"), ] autodoc_typehints = "description" @@ -112,7 +113,9 @@ "itsdangerous": ("https://itsdangerous.palletsprojects.com/", None), "sqlalchemy": ("https://docs.sqlalchemy.org/", None), "wtforms": ("https://wtforms.readthedocs.io/", None), + "flask_wtforms": ("https://flask-wtf.readthedocs.io", None), "flask_sqlalchemy": ("https://flask-sqlalchemy.palletsprojects.com/", None), + "flask_login": ("https://flask-login.readthedocs.io/en/latest/", None), "passlib": ("https://passlib.readthedocs.io/en/stable", None), } diff --git a/flask_security/core.py b/flask_security/core.py index 030591fe..6ec412c3 100644 --- a/flask_security/core.py +++ b/flask_security/core.py @@ -17,11 +17,13 @@ import warnings import pkg_resources -from flask import _request_ctx_stack, current_app, render_template +from flask import _request_ctx_stack, current_app +from flask.json import JSONEncoder from flask_login import AnonymousUserMixin, LoginManager from flask_login import UserMixin as BaseUserMixin from flask_login import current_user from flask_principal import Identity, Principal, RoleNeed, UserNeed, identity_loaded +from flask_wtf import FlaskForm from itsdangerous import URLSafeTimedSerializer from passlib.context import CryptContext from werkzeug.datastructures import ImmutableList @@ -50,6 +52,7 @@ from .mail_util import MailUtil from .password_util import PasswordUtil from .phone_util import PhoneUtil +from .proxies import _security from .twofactor import tf_send_security_token from .unified_signin import ( UnifiedSigninForm, @@ -65,6 +68,7 @@ FsJsonEncoder, FsPermNeed, csrf_cookie_handler, + default_render_template, default_want_json, get_config, get_identity_attribute, @@ -79,15 +83,12 @@ from .views import create_blueprint, default_render_json if t.TYPE_CHECKING: # pragma: no cover - from flask import Flask - from .datastore import Role + import flask + from flask import Request + from flask.typing import ResponseValue + import flask_login.mixins + from .datastore import Role, User, UserDatastore -# Convenient references -# noinspection PyTypeChecker -_security: "Security" = LocalProxy( # type: ignore - lambda: current_app.extensions["security"] -) -_datastore = LocalProxy(lambda: _security.datastore) # List of authentication mechanisms supported. AUTHN_MECHANISMS = ("basic", "session", "token") @@ -426,25 +427,6 @@ "USE_CODE": (_("Use this code to sign in: %(code)s."), "info"), } -_default_forms = { - "login_form": LoginForm, - "verify_form": VerifyForm, - "confirm_register_form": ConfirmRegisterForm, - "register_form": RegisterForm, - "forgot_password_form": ForgotPasswordForm, - "reset_password_form": ResetPasswordForm, - "change_password_form": ChangePasswordForm, - "send_confirmation_form": SendConfirmationForm, - "passwordless_login_form": PasswordlessLoginForm, - "two_factor_verify_code_form": TwoFactorVerifyCodeForm, - "two_factor_setup_form": TwoFactorSetupForm, - "two_factor_rescue_form": TwoFactorRescueForm, - "us_signin_form": UnifiedSigninForm, - "us_setup_form": UnifiedSigninSetupForm, - "us_setup_validate_form": UnifiedSigninSetupValidateForm, - "us_verify_form": UnifiedVerifyForm, -} - def _user_loader(user_id): """Load based on fs_uniquifier (alternative_id).""" @@ -551,7 +533,7 @@ def _get_principal(app): return p -def _get_pwd_context(app: "Flask") -> CryptContext: +def _get_pwd_context(app: "flask.Flask") -> CryptContext: pw_hash = cv("PASSWORD_HASH", app=app) schemes = cv("PASSWORD_SCHEMES", app=app) deprecated = cv("DEPRECATED_PASSWORD_SCHEMES", app=app) @@ -570,7 +552,7 @@ def _get_pwd_context(app: "Flask") -> CryptContext: return cc -def _get_hashing_context(app): +def _get_hashing_context(app: "flask.Flask") -> CryptContext: schemes = cv("HASHING_SCHEMES", app=app) deprecated = cv("DEPRECATED_HASHING_SCHEMES", app=app) return CryptContext(schemes=schemes, deprecated=deprecated) @@ -582,46 +564,6 @@ def _get_serializer(app, name): return URLSafeTimedSerializer(secret_key=secret_key, salt=salt) -def _get_state(app, datastore, anonymous_user=None, **kwargs): - for key, value in get_config(app).items(): - kwargs[key.lower()] = value - - kwargs.update( - dict( - app=app, - datastore=datastore, - principal=_get_principal(app), - pwd_context=_get_pwd_context(app), - hashing_context=_get_hashing_context(app), - i18n_domain=FsDomain(app), - remember_token_serializer=_get_serializer(app, "remember"), - login_serializer=_get_serializer(app, "login"), - reset_serializer=_get_serializer(app, "reset"), - confirm_serializer=_get_serializer(app, "confirm"), - us_setup_serializer=_get_serializer(app, "us_setup"), - tf_validity_serializer=_get_serializer(app, "two_factor_validity"), - _context_processors={}, - _unauthorized_callback=None, - _render_json=default_render_json, - _want_json=default_want_json, - _unauthn_handler=default_unauthn_handler, - _reauthn_handler=default_reauthn_handler, - _unauthz_handler=default_unauthz_handler, - ) - ) - if "redirect_validate_re" in kwargs: - kwargs["_redirect_validate_re"] = re.compile(kwargs["redirect_validate_re"]) - - if "login_manager" not in kwargs: - kwargs["login_manager"] = _get_login_manager(app, anonymous_user) - - for key, value in _default_forms.items(): - if key not in kwargs or not kwargs[key]: - kwargs[key] = value - - return _SecurityState(**kwargs) - - def _context_processor(): return dict(url_for_security=url_for_security, security=_security) @@ -918,94 +860,6 @@ def has_role(self, *args): return False -class _SecurityState: - def __init__(self, **kwargs): - for key, value in kwargs.items(): - setattr(self, key.lower(), value) - - def _add_ctx_processor(self, endpoint, fn): - group = self._context_processors.setdefault(endpoint, []) - fn not in group and group.append(fn) - - def _run_ctx_processor(self, endpoint): - rv = {} - for g in [None, endpoint]: - for fn in self._context_processors.setdefault(g, []): - rv.update(fn()) - return rv - - def context_processor(self, fn): - self._add_ctx_processor(None, fn) - - def forgot_password_context_processor(self, fn): - self._add_ctx_processor("forgot_password", fn) - - def login_context_processor(self, fn): - self._add_ctx_processor("login", fn) - - def register_context_processor(self, fn): - self._add_ctx_processor("register", fn) - - def reset_password_context_processor(self, fn): - self._add_ctx_processor("reset_password", fn) - - def change_password_context_processor(self, fn): - self._add_ctx_processor("change_password", fn) - - def send_confirmation_context_processor(self, fn): - self._add_ctx_processor("send_confirmation", fn) - - def send_login_context_processor(self, fn): - self._add_ctx_processor("send_login", fn) - - def verify_context_processor(self, fn): - self._add_ctx_processor("verify", fn) - - def mail_context_processor(self, fn): - self._add_ctx_processor("mail", fn) - - def tf_setup_context_processor(self, fn): - self._add_ctx_processor("tf_setup", fn) - - def tf_token_validation_context_processor(self, fn): - self._add_ctx_processor("tf_token_validation", fn) - - def us_signin_context_processor(self, fn): - self._add_ctx_processor("us_signin", fn) - - def us_setup_context_processor(self, fn): - self._add_ctx_processor("us_setup", fn) - - def us_verify_context_processor(self, fn): - self._add_ctx_processor("us_verify", fn) - - def unauthorized_handler(self, fn): - warnings.warn( - "'unauthorized_handler' has been replaced with" - " 'unauthz_handler' and 'unauthn_handler'", - DeprecationWarning, - ) - self._unauthorized_callback = fn - - def totp_factory(self, tf): - self._totp_factory = tf - - def render_json(self, fn): - self._render_json = fn - - def want_json(self, fn): - self._want_json = fn - - def unauthz_handler(self, cb): - self._unauthz_handler = cb - - def unauthn_handler(self, cb): - self._unauthn_handler = cb - - def reauthn_handler(self, cb): - self._reauthn_handler = cb - - class Security: """The :class:`Security` class initializes the Flask-Security extension. @@ -1031,16 +885,22 @@ class Security: :param us_setup_validate_form: set form for the unified sign in setup validate view :param us_verify_form: set form for re-authenticating due to freshness check :param anonymous_user: class to use for anonymous user - :param render_template: function to use to render templates. The default is Flask's - render_template() function. + :param login_manager: An subclass of LoginManager :param json_encoder_cls: Class to use as blueprint.json_encoder. Defaults to :class:`FsJsonEncoder` - :param totp_cls: Class to use as TOTP factory. Defaults to :class:`Totp` - :param phone_util_cls: Class to use for phone number utilities. - Defaults to :class:`PhoneUtil` :param mail_util_cls: Class to use for sending emails. Defaults to :class:`MailUtil` :param password_util_cls: Class to use for password normalization/validation. Defaults to :class:`PasswordUtil` + :param phone_util_cls: Class to use for phone number utilities. + Defaults to :class:`PhoneUtil` + :param render_template: function to use to render templates. The default is Flask's + render_template() function. + :param totp_cls: Class to use as TOTP factory. Defaults to :class:`Totp` + + .. tip:: + Be sure that all your configuration values have been set PRIOR to + instantiating this class. Some configuration values are set as attributes + on the instance and therefore won't track any changes. .. versionadded:: 3.4.0 ``verify_form`` added as part of freshness/re-authentication @@ -1068,50 +928,160 @@ class Security: """ - def __init__(self, app=None, datastore=None, register_blueprint=True, **kwargs): - + def __init__( + self, + app: t.Optional["flask.Flask"] = None, + datastore: t.Optional["UserDatastore"] = None, + register_blueprint: bool = True, + login_form: t.Type[FlaskForm] = LoginForm, + verify_form: t.Type[FlaskForm] = VerifyForm, + confirm_register_form: t.Type[FlaskForm] = ConfirmRegisterForm, + register_form: t.Type[FlaskForm] = RegisterForm, + forgot_password_form: t.Type[FlaskForm] = ForgotPasswordForm, + reset_password_form: t.Type[FlaskForm] = ResetPasswordForm, + change_password_form: t.Type[FlaskForm] = ChangePasswordForm, + send_confirmation_form: t.Type[FlaskForm] = SendConfirmationForm, + passwordless_login_form: t.Type[FlaskForm] = PasswordlessLoginForm, + two_factor_verify_code_form: t.Type[FlaskForm] = TwoFactorVerifyCodeForm, + two_factor_setup_form: t.Type[FlaskForm] = TwoFactorSetupForm, + two_factor_rescue_form: t.Type[FlaskForm] = TwoFactorRescueForm, + us_signin_form: t.Type[FlaskForm] = UnifiedSigninForm, + us_setup_form: t.Type[FlaskForm] = UnifiedSigninSetupForm, + us_setup_validate_form: t.Type[FlaskForm] = UnifiedSigninSetupValidateForm, + us_verify_form: t.Type[FlaskForm] = UnifiedVerifyForm, + anonymous_user: t.Optional[t.Type["flask_login.AnonymousUserMixin"]] = None, + login_manager: t.Optional["flask_login.LoginManager"] = None, + json_encoder_cls: t.Type[JSONEncoder] = FsJsonEncoder, + mail_util_cls: t.Type["MailUtil"] = MailUtil, + password_util_cls: t.Type["PasswordUtil"] = PasswordUtil, + phone_util_cls: t.Type["PhoneUtil"] = PhoneUtil, + render_template: t.Callable[..., str] = default_render_template, + totp_cls: t.Type["Totp"] = Totp, + **kwargs: t.Any, + ): + + # to be nice and hopefully avoid backwards compat issues - we still accept + # kwargs - but we don't do anything with them. If caller sends in some - + # output a deprecation warning + if len(kwargs) > 0: + warnings.warn( + "kwargs passed to the constructor are now ignored", + DeprecationWarning, + ) self.app = app self._datastore = datastore self._register_blueprint = register_blueprint - self._kwargs = kwargs + self.login_form = login_form + self.verify_form = verify_form + self.confirm_register_form = confirm_register_form + self.register_form = register_form + self.forgot_password_form = forgot_password_form + self.reset_password_form = reset_password_form + self.change_password_form = change_password_form + self.send_confirmation_form = send_confirmation_form + self.passwordless_login_form = passwordless_login_form + self.two_factor_verify_code_form = two_factor_verify_code_form + self.two_factor_setup_form = two_factor_setup_form + self.two_factor_rescue_form = two_factor_rescue_form + self.us_signin_form = us_signin_form + self.us_setup_form = us_setup_form + self.us_setup_validate_form = us_setup_validate_form + self.us_verify_form = us_verify_form + self.anonymous_user = anonymous_user + self.json_encoder_cls = json_encoder_cls + self.login_manager = login_manager + self.mail_util_cls = mail_util_cls + self.password_util_cls = password_util_cls + self.phone_util_cls = phone_util_cls + self.render_template = render_template + self.totp_cls = totp_cls + + # Attributes not settable from init. + self._unauthn_handler: t.Callable[ + [t.List[str], t.Optional[t.Dict[str, str]]], "ResponseValue" + ] = default_unauthn_handler + self._reauthn_handler: t.Callable[ + [timedelta, timedelta], "ResponseValue" + ] = default_reauthn_handler + self._unauthz_handler: t.Callable[ + [t.Callable[[t.Any], t.Any], t.Optional[t.List[str]]], "ResponseValue" + ] = default_unauthz_handler + self._unauthorized_callback: t.Optional[t.Callable[[], "ResponseValue"]] = None + self._render_json: t.Callable[ + [t.Dict[str, t.Any], int, t.Optional[t.Dict[str, str]], t.Optional["User"]], + "ResponseValue", + ] = default_render_json + self._want_json: t.Callable[["Request"], bool] = default_want_json + + # Type attributes that we don't initialize until init_app time. + self.remember_token_serializer: URLSafeTimedSerializer + self.login_serializer: URLSafeTimedSerializer + self.reset_serializer: URLSafeTimedSerializer + self.confirm_serializer: URLSafeTimedSerializer + self.us_setup_serializer: URLSafeTimedSerializer + self.tf_validity_serializer: URLSafeTimedSerializer + self.principal: Principal + self.pwd_context: CryptContext + self.hashing_context: CryptContext + self._context_processors: t.Dict[ + str, t.List[t.Callable[[], t.Dict[str, t.Any]]] + ] = {} + self.i18n_domain: FsDomain + self.datastore: "UserDatastore" + self.register_blueprint: bool + + self._mail_util: MailUtil + self._phone_util: PhoneUtil + self._password_util: PasswordUtil + self._redirect_validate_re: re.Pattern + self._totp_factory: "Totp" + + # We add forms, config etc as attributes - which of course mypy knows + # nothing about. Add necessary attributes here to keep mypy happy + self.trackable: bool = False + self.confirmable: bool = False + self.registrable: bool = False + self.changeable: bool = False + self.recoverable: bool = False + self.two_factor: bool = False + self.unified_signin: bool = False + self.passwordless: bool = False + + self.redirect_behavior: t.Optional[str] = None - self._state = None # set by init_app if app is not None and datastore is not None: - self._state = self.init_app( - app, datastore, register_blueprint=register_blueprint, **kwargs - ) - - def init_app(self, app, datastore=None, register_blueprint=None, **kwargs): + self.init_app(app, datastore, register_blueprint=register_blueprint) + + def init_app( + self, + app: "flask.Flask", + datastore: t.Optional["UserDatastore"] = None, + register_blueprint: t.Optional[bool] = None, + **kwargs: t.Any, + ) -> None: """Initializes the Flask-Security extension for the specified application and datastore implementation. :param app: The application. :param datastore: An instance of a user datastore. :param register_blueprint: to register the Security blueprint or not. + :param kwargs: Can be used to override/initialize any of the constructor + attributes. + + If you create the Security instance with both an 'app' and 'datastore' + you shouldn't call this - it will be called as part of the constructor. """ self.app = app - if datastore is None: - datastore = self._datastore - - if register_blueprint is None: - register_blueprint = self._register_blueprint - - for key, value in self._kwargs.items(): - kwargs.setdefault(key, value) + if datastore: + self._datastore = datastore + if not self._datastore: + raise ValueError("Datastore must be provided") + self.datastore = self._datastore - if "render_template" not in kwargs: - kwargs.setdefault("render_template", self.render_template) - if "json_encoder_cls" not in kwargs: - kwargs.setdefault("json_encoder_cls", FsJsonEncoder) - if "totp_cls" not in kwargs: - kwargs.setdefault("totp_cls", Totp) - if "phone_util_cls" not in kwargs: - kwargs.setdefault("phone_util_cls", PhoneUtil) - if "mail_util_cls" not in kwargs: - kwargs.setdefault("mail_util_cls", MailUtil) - if "password_util_cls" not in kwargs: - kwargs.setdefault("password_util_cls", PasswordUtil) + if register_blueprint: + self._register_blueprint = register_blueprint + self.register_blueprint = self._register_blueprint # default post redirects to APPLICATION_ROOT, which itself defaults to "/" app.config.setdefault( @@ -1127,28 +1097,69 @@ def init_app(self, app, datastore=None, register_blueprint=None, **kwargs): for key, value in _default_messages.items(): app.config.setdefault("SECURITY_MSG_" + key, value) + # Override default forms + # BC - kwarg value here overrides init time + # BC - we allow forms to be set as config items + # Can't wait for assignment expressions. + form_names = [ + "login_form", + "verify_form", + "confirm_register_form", + "register_form", + "forgot_password_form", + "reset_password_form", + "change_password_form", + "send_confirmation_form", + "passwordless_login_form", + "two_factor_verify_code_form", + "two_factor_setup_form", + "two_factor_rescue_form", + "us_signin_form", + "us_setup_form", + "us_setup_validate_form", + "us_verify_form", + ] + for form_name in form_names: + if kwargs.get(form_name, None): + setattr(self, form_name, kwargs.get(form_name)) + elif app.config.get(f"SECURITY_{form_name.upper()}", None): + setattr( + self, form_name, app.config.get(f"SECURITY_{form_name.upper()}") + ) + + # Allow kwargs to overwrite/init other constructor attributes + attr_names = [ + "anonymous_user", + "json_encoder_cls", + "login_manager", + "mail_util_cls", + "password_util_cls", + "phone_util_cls", + "render_template", + "totp_cls", + ] + for attr in attr_names: + if kwargs.get(attr, None): + setattr(self, attr, kwargs.get(attr)) + + # set all config items as attributes (minus the SECURITY_ prefix) + for key, value in get_config(app).items(): + setattr(self, key.lower(), value) + identity_loaded.connect_via(app)(_on_identity_loaded) - self._state = state = _get_state(app, datastore, **kwargs) - if hasattr(datastore, "user_model") and not hasattr( - datastore.user_model, "fs_uniquifier" + if hasattr(self.datastore, "user_model") and not hasattr( + self.datastore.user_model, "fs_uniquifier" ): # pragma: no cover raise ValueError("User model must contain fs_uniquifier as of 4.0.0") - if register_blueprint: - bp = create_blueprint( - app, state, __name__, json_encoder=kwargs["json_encoder_cls"] - ) - app.register_blueprint(bp) - app.context_processor(_context_processor) - @app.before_first_request def _register_i18n(): # This is only not registered if Flask-Babel isn't installed... if "_" not in app.jinja_env.globals: - current_app.jinja_env.globals["_"] = state.i18n_domain.gettext + current_app.jinja_env.globals["_"] = self.i18n_domain.gettext # Register so other packages can reference our translations. - current_app.jinja_env.globals["_fsdomain"] = state.i18n_domain.gettext + current_app.jinja_env.globals["_fsdomain"] = self.i18n_domain.gettext @app.before_first_request def _csrf_init(): @@ -1211,19 +1222,45 @@ def _csrf_init(): # Add configured header to WTF_CSRF_HEADERS current_app.config["WTF_CSRF_HEADERS"].append(cv("CSRF_HEADER")) - state._phone_util = state.phone_util_cls(app) - state._mail_util = state.mail_util_cls(app) - state._password_util = state.password_util_cls(app) - - app.extensions["security"] = state + self._phone_util = self.phone_util_cls(app) + self._mail_util = self.mail_util_cls(app) + self._password_util = self.password_util_cls(app) + rvre = cv("REDIRECT_VALIDATE_RE", app=app) + if rvre: + self._redirect_validate_re = re.compile(rvre) + + if not self.login_manager: + self.login_manager = _get_login_manager(app, self.anonymous_user) + + self.remember_token_serializer = _get_serializer(app, "remember") + self.login_serializer = _get_serializer(app, "login") + self.reset_serializer = _get_serializer(app, "reset") + self.confirm_serializer = _get_serializer(app, "confirm") + self.us_setup_serializer = _get_serializer(app, "us_setup") + self.tf_validity_serializer = _get_serializer(app, "two_factor_validity") + self.principal = _get_principal(app) + self.pwd_context = _get_pwd_context(app) + self.hashing_context = _get_hashing_context(app) + self.i18n_domain = FsDomain(app) + + if self.register_blueprint: + bp = create_blueprint( + app, self, __name__, json_encoder=self.json_encoder_cls + ) + app.register_blueprint(bp) + app.context_processor(_context_processor) if hasattr(app, "cli"): from .cli import users, roles - if state.cli_users_name: - app.cli.add_command(users, state.cli_users_name) - if state.cli_roles_name: - app.cli.add_command(roles, state.cli_roles_name) + # Waiting for 3.8 assignment expressions + un = cv("CLI_USERS_NAME", app, strict=True) + rn = cv("CLI_ROLES_NAME", app, strict=True) + + if un: + app.cli.add_command(users, un) + if rn: + app.cli.add_command(roles, rn) # Migrate from TWO_FACTOR config to generic config. for newc, oldc in [ @@ -1292,18 +1329,19 @@ def _csrf_init(): sms_service = cv("SMS_SERVICE", app=app) if sms_service == "Twilio": # pragma: no cover self._check_modules("twilio", "SMS") - if state.phone_util_cls == PhoneUtil: + if self.phone_util_cls == PhoneUtil: self._check_modules("phonenumbers", "SMS") secrets = cv("TOTP_SECRETS", app=app) issuer = cv("TOTP_ISSUER", app=app) if not secrets or not issuer: raise ValueError("Both TOTP_SECRETS and TOTP_ISSUER must be set") - state.totp_factory(state.totp_cls(secrets, issuer)) + self._totp_factory = self.totp_cls(secrets, issuer) if cv("PASSWORD_COMPLEXITY_CHECKER", app=app) == "zxcvbn": self._check_modules("zxcvbn", "PASSWORD_COMPLEXITY_CHECKER") - return state + + app.extensions["security"] = self def _check_modules(self, module, config_name): # pragma: no cover from importlib.util import find_spec @@ -1314,10 +1352,13 @@ def _check_modules(self, module, config_name): # pragma: no cover return module_exists - def render_template(self, *args, **kwargs): - return render_template(*args, **kwargs) - - def render_json(self, cb): + def render_json( + self, + cb: t.Callable[ + [t.Dict[str, t.Any], int, t.Optional[t.Dict[str, str]], t.Optional["User"]], + "ResponseValue", + ], + ) -> None: """Callback to render response payload as JSON. :param cb: Callback function with @@ -1349,9 +1390,9 @@ def render_json(self, cb): .. versionadded:: 3.3.0 """ - self._state._render_json = cb + self._render_json = cb - def want_json(self, fn): + def want_json(self, fn: t.Callable[["flask.Request"], bool]) -> None: """Function that returns True if response should be JSON (based on the request) :param fn: Function with the following signature (request) @@ -1363,9 +1404,14 @@ def want_json(self, fn): .. versionadded:: 3.3.0 """ - self._state._want_json = fn + self._want_json = fn - def unauthz_handler(self, cb): + def unauthz_handler( + self, + cb: t.Callable[ + [t.Callable[[t.Any], t.Any], t.Optional[t.List[str]]], "ResponseValue" + ], + ) -> None: """ Callback for failed authorization. This is called by the :func:`roles_required`, :func:`roles_accepted`, @@ -1386,9 +1432,12 @@ def unauthz_handler(self, cb): .. versionadded:: 3.3.0 """ - self._state._unauthz_handler = cb + self._unauthz_handler = cb - def unauthn_handler(self, cb): + def unauthn_handler( + self, + cb: t.Callable[[t.List[str], t.Optional[t.Dict[str, str]]], "ResponseValue"], + ) -> None: """ Callback for failed authentication. This is called by :func:`auth_required`, :func:`auth_token_required` @@ -1408,9 +1457,11 @@ def unauthn_handler(self, cb): .. versionadded:: 3.3.0 """ - self._state._unauthn_handler = cb + self._unauthn_handler = cb - def reauthn_handler(self, cb): + def reauthn_handler( + self, cb: t.Callable[[timedelta, timedelta], "ResponseValue"] + ) -> None: """ Callback when endpoint required a fresh authentication. This is called by :func:`auth_required`. @@ -1434,7 +1485,93 @@ def reauthn_handler(self, cb): .. versionadded:: 3.4.0 """ - self._state._reauthn_handler = cb + self._reauthn_handler = cb + + def unauthorized_handler(self, cb: t.Callable[[], "ResponseValue"]) -> None: + warnings.warn( + "'unauthorized_handler' has been replaced with" + " 'unauthz_handler' and 'unauthn_handler'", + DeprecationWarning, + ) + self._unauthorized_callback = cb + + def _add_ctx_processor( + self, endpoint: str, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + group = self._context_processors.setdefault(endpoint, []) + if fn not in group: + group.append(fn) + + def _run_ctx_processor(self, endpoint: str) -> t.Dict[str, t.Any]: + rv: t.Dict[str, t.Any] = {} + for g in ["global", endpoint]: + for fn in self._context_processors.setdefault(g, []): + rv.update(fn()) + return rv + + def context_processor(self, fn: t.Callable[[], t.Dict[str, t.Any]]) -> None: + self._add_ctx_processor("global", fn) + + def forgot_password_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("forgot_password", fn) + + def login_context_processor(self, fn: t.Callable[[], t.Dict[str, t.Any]]) -> None: + self._add_ctx_processor("login", fn) + + def register_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("register", fn) + + def reset_password_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("reset_password", fn) + + def change_password_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("change_password", fn) + + def send_confirmation_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("send_confirmation", fn) + + def send_login_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("send_login", fn) + + def verify_context_processor(self, fn: t.Callable[[], t.Dict[str, t.Any]]) -> None: + self._add_ctx_processor("verify", fn) + + def mail_context_processor(self, fn: t.Callable[[], t.Dict[str, t.Any]]) -> None: + self._add_ctx_processor("mail", fn) - def __getattr__(self, name): - return getattr(self._state, name, None) + def tf_setup_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("tf_setup", fn) + + def tf_token_validation_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("tf_token_validation", fn) + + def us_signin_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("us_signin", fn) + + def us_setup_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("us_setup", fn) + + def us_verify_context_processor( + self, fn: t.Callable[[], t.Dict[str, t.Any]] + ) -> None: + self._add_ctx_processor("us_verify", fn) diff --git a/flask_security/decorators.py b/flask_security/decorators.py index 7734ba69..c658a195 100644 --- a/flask_security/decorators.py +++ b/flask_security/decorators.py @@ -273,8 +273,8 @@ def decorated(*args, **kwargs): def auth_required( *auth_methods: t.Union[str, t.Callable[[], t.List[str]], None], - within: t.Union[int, t.Callable[[], datetime.timedelta]] = -1, - grace: t.Optional[t.Union[int, t.Callable[[], datetime.timedelta]]] = None + within: t.Union[int, float, t.Callable[[], datetime.timedelta]] = -1, + grace: t.Optional[t.Union[int, float, t.Callable[[], datetime.timedelta]]] = None ) -> DecoratedView: """ Decorator that protects endpoints through multiple mechanisms diff --git a/flask_security/proxies.py b/flask_security/proxies.py index 2ce7f938..c1a761af 100644 --- a/flask_security/proxies.py +++ b/flask_security/proxies.py @@ -11,7 +11,6 @@ from .datastore import CanonicalUserDatastore # Convenient references -# noinspection PyTypeChecker _security: "Security" = LocalProxy( # type: ignore lambda: current_app.extensions["security"] ) diff --git a/flask_security/unified_signin.py b/flask_security/unified_signin.py index 6980a4e0..4c8431a8 100644 --- a/flask_security/unified_signin.py +++ b/flask_security/unified_signin.py @@ -53,7 +53,7 @@ SmsSenderFactory, base_render_json, check_and_get_token_status, - config_value, + config_value as cv, do_flash, find_user, get_identity_attributes, @@ -72,7 +72,7 @@ ) if t.TYPE_CHECKING: # pragma: no cover - from flask import Response + from flask.typing import ResponseValue if get_quart_status(): # pragma: no cover from quart import redirect @@ -82,21 +82,21 @@ def _compute_code_methods(): # Return list of methods that actually send codes - return list(set(config_value("US_ENABLED_METHODS")) - {"password", "authenticator"}) + return list(set(cv("US_ENABLED_METHODS")) - {"password", "authenticator"}) def _compute_setup_methods(): # Return list of methods that require setup - return list(set(config_value("US_ENABLED_METHODS")) - {"password"}) + return list(set(cv("US_ENABLED_METHODS")) - {"password"}) def _compute_active_methods(user): # Compute methods already setup. The only oddity is that 'email' # can be 'auto-setup' - so include that. - active_methods = set(config_value("US_ENABLED_METHODS")) & set( + active_methods = set(cv("US_ENABLED_METHODS")) & set( _datastore.us_get_totp_secrets(user).keys() ) - if "email" in config_value("US_ENABLED_METHODS"): + if "email" in cv("US_ENABLED_METHODS"): active_methods |= {"email"} return list(active_methods) @@ -161,7 +161,7 @@ def validate(self): passcode = str(passcode) ok = False - for method in config_value("US_ENABLED_METHODS"): + for method in cv("US_ENABLED_METHODS"): if method == "password": passcode = _security._password_util.normalize(passcode) if self.user.verify_and_update_password(passcode): @@ -172,7 +172,7 @@ def validate(self): token=passcode, totp_secret=totp_secrets[method], user=self.user, - window=config_value("US_TOKEN_VALIDITY"), + window=cv("US_TOKEN_VALIDITY"), ): ok = True break @@ -185,7 +185,7 @@ def validate(self): elif self.submit_send_code.data: # Send a code - chosen_method must be valid cm = self.chosen_method.data - if cm not in config_value("US_ENABLED_METHODS"): + if cm not in cv("US_ENABLED_METHODS"): self.chosen_method.errors.append( get_message("US_METHOD_NOT_AVAILABLE")[0] ) @@ -221,7 +221,7 @@ class UnifiedSigninForm(_UnifiedPassCodeForm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.remember.default = config_value("DEFAULT_REMEMBER_ME") + self.remember.default = cv("DEFAULT_REMEMBER_ME") self.requires_confirmation = False def validate(self): @@ -276,7 +276,7 @@ def __init__(self, *args, **kwargs): def validate(self): if not super().validate(): return False - if self.chosen_method.data not in config_value("US_ENABLED_METHODS"): + if self.chosen_method.data not in cv("US_ENABLED_METHODS"): self.chosen_method.errors.append(get_message("US_METHOD_NOT_AVAILABLE")[0]) return False @@ -310,7 +310,7 @@ def validate(self): token=self.passcode.data, totp_secret=self.totp_secret, user=self.user, - window=config_value("US_TOKEN_VALIDITY"), + window=cv("US_TOKEN_VALIDITY"), ): self.passcode.errors.append(get_message("INVALID_PASSWORD_CODE")[0]) return False @@ -375,9 +375,9 @@ def us_signin_send_code(): ) return _security.render_template( - config_value("US_SIGNIN_TEMPLATE"), + cv("US_SIGNIN_TEMPLATE"), us_signin_form=form, - available_methods=config_value("US_ENABLED_METHODS"), + available_methods=cv("US_ENABLED_METHODS"), code_methods=code_methods, chosen_method=form.chosen_method.data, code_sent=code_sent, @@ -388,23 +388,23 @@ def us_signin_send_code(): # Here on GET or failed validation if _security._want_json(request): payload = { - "available_methods": config_value("US_ENABLED_METHODS"), + "available_methods": cv("US_ENABLED_METHODS"), "code_methods": code_methods, "identity_attributes": get_identity_attributes(), } return base_render_json(form, include_user=False, additional=payload) return _security.render_template( - config_value("US_SIGNIN_TEMPLATE"), + cv("US_SIGNIN_TEMPLATE"), us_signin_form=form, - available_methods=config_value("US_ENABLED_METHODS"), + available_methods=cv("US_ENABLED_METHODS"), code_methods=code_methods, skip_loginmenu=True, **_security._run_ctx_processor("us_signin") ) -@auth_required(lambda: config_value("API_ENABLED_METHODS")) +@auth_required(lambda: cv("API_ENABLED_METHODS")) def us_verify_send_code(): """ Send code during verify. @@ -431,9 +431,9 @@ def us_verify_send_code(): ) return _security.render_template( - config_value("US_VERIFY_TEMPLATE"), + cv("US_VERIFY_TEMPLATE"), us_verify_form=form, - available_methods=config_value("US_ENABLED_METHODS"), + available_methods=cv("US_ENABLED_METHODS"), code_methods=code_methods, chosen_method=form.chosen_method.data, code_sent=code_sent, @@ -448,15 +448,15 @@ def us_verify_send_code(): # Here on GET or failed validation if _security._want_json(request): payload = { - "available_methods": config_value("US_ENABLED_METHODS"), + "available_methods": cv("US_ENABLED_METHODS"), "code_methods": code_methods, } return base_render_json(form, additional=payload) return _security.render_template( - config_value("US_VERIFY_TEMPLATE"), + cv("US_VERIFY_TEMPLATE"), us_verify_form=form, - available_methods=config_value("US_ENABLED_METHODS"), + available_methods=cv("US_ENABLED_METHODS"), code_methods=code_methods, skip_login_menu=True, send_code_to=get_url( @@ -468,7 +468,7 @@ def us_verify_send_code(): @unauth_csrf(fall_through=True) -def us_signin() -> "Response": +def us_signin() -> "ResponseValue": """ Unified sign in view. This takes an identity (as configured in USER_IDENTITY_ATTRIBUTES) @@ -514,9 +514,7 @@ def us_signin() -> "Response": # we authenticated with requires it and either user has requested MFA or it is # required. remember_me = form.remember.data if "remember" in form else None - if config_value("TWO_FACTOR") and form.authn_via in config_value( - "US_MFA_REQUIRED" - ): + if cv("TWO_FACTOR") and form.authn_via in cv("US_MFA_REQUIRED"): if request.is_json and request.content_length: tf_validity_token = request.get_json().get( # type: ignore "tf_validity_token", None @@ -527,10 +525,8 @@ def us_signin() -> "Response": tf_validity_token_is_valid = tf_verify_validility_token( tf_validity_token, form.user.fs_uniquifier ) - if config_value("TWO_FACTOR_REQUIRED") or is_tf_setup(form.user): - if config_value("TWO_FACTOR_ALWAYS_VALIDATE") or ( - not tf_validity_token_is_valid - ): + if cv("TWO_FACTOR_REQUIRED") or is_tf_setup(form.user): + if cv("TWO_FACTOR_ALWAYS_VALIDATE") or (not tf_validity_token_is_valid): return tf_login( form.user, @@ -550,7 +546,7 @@ def us_signin() -> "Response": code_methods = _compute_code_methods() if _security._want_json(request): payload = { - "available_methods": config_value("US_ENABLED_METHODS"), + "available_methods": cv("US_ENABLED_METHODS"), "code_methods": code_methods, "identity_attributes": get_identity_attributes(), } @@ -564,22 +560,22 @@ def us_signin() -> "Response": # On error - wipe code form.passcode.data = None - if form.requires_confirmation and _security.requires_confirmation_error_view: + if form.requires_confirmation and cv("REQUIRES_CONFIRMATION_ERROR_VIEW"): do_flash(*get_message("CONFIRMATION_REQUIRED")) - return redirect(get_url(_security.requires_confirmation_error_view)) + return redirect(get_url(cv("REQUIRES_CONFIRMATION_ERROR_VIEW"))) return _security.render_template( - config_value("US_SIGNIN_TEMPLATE"), + cv("US_SIGNIN_TEMPLATE"), us_signin_form=form, - available_methods=config_value("US_ENABLED_METHODS"), + available_methods=cv("US_ENABLED_METHODS"), code_methods=code_methods, skip_login_menu=True, **_security._run_ctx_processor("us_signin") ) -@auth_required(lambda: config_value("API_ENABLED_METHODS")) -def us_verify(): +@auth_required(lambda: cv("API_ENABLED_METHODS")) +def us_verify() -> "ResponseValue": """ Re-authenticate to reset freshness time. This is likely the result of a reauthn_handler redirect, which @@ -612,7 +608,7 @@ def us_verify(): # Here on GET or failed POST validate if _security._want_json(request): payload = { - "available_methods": config_value("US_ENABLED_METHODS"), + "available_methods": cv("US_ENABLED_METHODS"), "code_methods": code_methods, } return base_render_json(form, additional=payload) @@ -620,12 +616,12 @@ def us_verify(): # On error - wipe code form.passcode.data = None return _security.render_template( - config_value("US_VERIFY_TEMPLATE"), + cv("US_VERIFY_TEMPLATE"), us_verify_form=form, code_methods=code_methods, skip_login_menu=True, send_code_to=get_url( - _security.us_verify_send_code_url, + cv("US_VERIFY_SEND_CODE_URL"), qparams={"next": propagate_next(request.url)}, ), **_security._run_ctx_processor("us_verify") @@ -633,40 +629,42 @@ def us_verify(): @anonymous_user_required -def us_verify_link(): +def us_verify_link() -> "ResponseValue": """ Used to verify a magic email link. GET only """ - if not all(v in request.args for v in ["email", "code"]): + email = request.args.get("email", None) + code = request.args.get("code", None) + if not email or not code: m, c = get_message("API_ERROR") if _security.redirect_behavior == "spa": - return redirect(get_url(_security.login_error_view, qparams={c: m})) + return redirect(get_url(cv("LOGIN_ERROR_VIEW"), qparams={c: m})) do_flash(m, c) return redirect(url_for_security("us_signin")) - user = _datastore.find_user(email=request.args.get("email")) + user = _datastore.find_user(email=email) if not user or not user.active: if not user: m, c = get_message("USER_DOES_NOT_EXIST") else: m, c = get_message("DISABLED_ACCOUNT") if _security.redirect_behavior == "spa": - return redirect(get_url(_security.login_error_view, qparams={c: m})) + return redirect(get_url(cv("LOGIN_ERROR_VIEW"), qparams={c: m})) do_flash(m, c) return redirect(url_for_security("us_signin")) totp_secrets = _datastore.us_get_totp_secrets(user) if "email" not in totp_secrets or not _security._totp_factory.verify_totp( - token=request.args.get("code"), + token=code, totp_secret=totp_secrets["email"], user=user, - window=config_value("US_TOKEN_VALIDITY"), + window=cv("US_TOKEN_VALIDITY"), ): m, c = get_message("INVALID_CODE") if _security.redirect_behavior == "spa": return redirect( get_url( - _security.login_error_view, + cv("LOGIN_ERROR_VIEW"), qparams=user.get_redirect_qparams({c: m}), ) ) @@ -674,9 +672,9 @@ def us_verify_link(): return redirect(url_for_security("us_signin")) if ( - config_value("TWO_FACTOR") - and "email" in config_value("US_MFA_REQUIRED") - and (config_value("TWO_FACTOR_REQUIRED") or is_tf_setup(user)) + cv("TWO_FACTOR") + and "email" in cv("US_MFA_REQUIRED") + and (cv("TWO_FACTOR_REQUIRED") or is_tf_setup(user)) ): # tf_login doesn't know anything about "spa" etc. In general two-factor # isn't quite ready for SPA. So we return an error via a redirect rather @@ -685,7 +683,7 @@ def us_verify_link(): if _security.redirect_behavior == "spa": return redirect( get_url( - _security.login_error_view, + cv("LOGIN_ERROR_VIEW"), qparams=user.get_redirect_qparams({"tf_required": 1}), ) ) @@ -700,7 +698,7 @@ def us_verify_link(): # This means that this can only work if sessions are active which sort of # makes sense - otherwise you need to use /us-signin with a code. return redirect( - get_url(_security.post_login_view, qparams=user.get_redirect_qparams()) + get_url(cv("POST_LOGIN_VIEW"), qparams=user.get_redirect_qparams()) ) do_flash(*get_message("PASSWORDLESS_LOGIN_SUCCESSFUL")) @@ -708,11 +706,11 @@ def us_verify_link(): @auth_required( - lambda: config_value("API_ENABLED_METHODS"), - within=lambda: config_value("FRESHNESS"), - grace=lambda: config_value("FRESHNESS_GRACE_PERIOD"), + lambda: cv("API_ENABLED_METHODS"), + within=lambda: cv("FRESHNESS"), + grace=lambda: cv("FRESHNESS_GRACE_PERIOD"), ) -def us_setup() -> "Response": +def us_setup() -> "ResponseValue": """ Change unified sign in methods. We want to verify the new method - so don't store anything yet in DB @@ -765,8 +763,8 @@ def us_setup() -> "Response": form, include_user=False, error_status_code=500 if msg else 400 ) return _security.render_template( - config_value("US_SETUP_TEMPLATE"), - available_methods=config_value("US_ENABLED_METHODS"), + cv("US_SETUP_TEMPLATE"), + available_methods=cv("US_ENABLED_METHODS"), active_methods=active_methods, setup_methods=setup_methods, us_setup_form=form, @@ -800,8 +798,8 @@ def us_setup() -> "Response": if _security._want_json(request): return base_render_json(form, include_user=False, additional=json_response) return _security.render_template( - config_value("US_SETUP_TEMPLATE"), - available_methods=config_value("US_ENABLED_METHODS"), + cv("US_SETUP_TEMPLATE"), + available_methods=cv("US_ENABLED_METHODS"), active_methods=active_methods, setup_methods=setup_methods, code_sent=form.chosen_method.data in _compute_code_methods(), @@ -818,7 +816,7 @@ def us_setup() -> "Response": if _security._want_json(request): payload = { "identity_attributes": get_identity_attributes(), - "available_methods": config_value("US_ENABLED_METHODS"), + "available_methods": cv("US_ENABLED_METHODS"), "active_methods": active_methods, "setup_methods": setup_methods, "phone": current_user.us_phone_number, @@ -828,8 +826,8 @@ def us_setup() -> "Response": # Show user existing phone number form.phone.data = current_user.us_phone_number return _security.render_template( - config_value("US_SETUP_TEMPLATE"), - available_methods=config_value("US_ENABLED_METHODS"), + cv("US_SETUP_TEMPLATE"), + available_methods=cv("US_ENABLED_METHODS"), active_methods=active_methods, setup_methods=setup_methods, us_setup_form=form, @@ -837,8 +835,8 @@ def us_setup() -> "Response": ) -@auth_required(lambda: config_value("API_ENABLED_METHODS")) -def us_setup_validate(token): +@auth_required(lambda: cv("API_ENABLED_METHODS")) +def us_setup_validate(token: str) -> "ResponseValue": """ Validate new setup. The token is the state variable which is signed and timed @@ -858,7 +856,7 @@ def us_setup_validate(token): if invalid: m, c = get_message("API_ERROR") if expired: - m, c = get_message("US_SETUP_EXPIRED", within=config_value("US_SETUP_WITHIN")) + m, c = get_message("US_SETUP_EXPIRED", within=cv("US_SETUP_WITHIN")) if invalid or expired: if _security._want_json(request): payload = json_error_response(errors=m) @@ -876,7 +874,7 @@ def us_setup_validate(token): _datastore.us_set(current_user, method, state["totp_secret"], phone) us_profile_changed.send( - app._get_current_object(), user=current_user, method=method + app._get_current_object(), user=current_user, method=method # type: ignore ) if _security._want_json(request): return base_render_json( @@ -889,8 +887,7 @@ def us_setup_validate(token): else: do_flash(*get_message("US_SETUP_SUCCESSFUL")) return redirect( - get_url(_security.us_post_setup_view) - or get_url(_security.post_login_view) + get_url(cv("US_POST_SETUP_VIEW")) or get_url(cv("POST_LOGIN_VIEW")) ) # Code not correct/outdated. @@ -930,7 +927,7 @@ def us_send_security_token( "us_verify_link", email=user.email, code=token, _external=True ) send_mail( - config_value("US_EMAIL_SUBJECT"), + cv("US_EMAIL_SUBJECT"), user.email, "us_instructions", user=user, @@ -941,9 +938,9 @@ def us_send_security_token( ) elif method == "sms": m, c = get_message("USE_CODE", code=token) - from_number = config_value("SMS_SERVICE_CONFIG")["PHONE_NUMBER"] + from_number = cv("SMS_SERVICE_CONFIG")["PHONE_NUMBER"] to_number = phone_number - sms_sender = SmsSenderFactory.createSender(config_value("SMS_SERVICE")) + sms_sender = SmsSenderFactory.createSender(cv("SMS_SERVICE")) sms_sender.send_sms(from_number=from_number, to_number=to_number, msg=m) elif method == "authenticator" or method == "password": diff --git a/flask_security/utils.py b/flask_security/utils.py index 7790a69b..7be8122f 100644 --- a/flask_security/utils.py +++ b/flask_security/utils.py @@ -29,6 +29,7 @@ flash, g, request, + render_template, session, url_for, ) @@ -149,7 +150,7 @@ def login_user( old_current_login, new_current_login = ( user.current_login_at, - _security.datetime_factory(), + config_value("DATETIME_FACTORY")(), ) old_current_ip, new_current_ip = user.current_login_ip, remote_addr @@ -286,13 +287,13 @@ def get_hmac(password: SB) -> bytes: :param password: The password to sign """ - salt = _security.password_salt + salt = config_value("PASSWORD_SALT") if salt is None: raise RuntimeError( "The configuration value `SECURITY_PASSWORD_SALT` must " "not be None when the value of `SECURITY_PASSWORD_HASH` is " - 'set to "%s"' % _security.password_hash + 'set to "%s"' % config_value("PASSWORD_HASH") ) h = hmac.new(encode_string(salt), encode_string(password), hashlib.sha512) @@ -385,7 +386,7 @@ def hash_password(password: SB) -> t.Any: return _pwd_context.hash( password, **config_value("PASSWORD_HASH_OPTIONS", default={}).get( - _security.password_hash, {} + config_value("PASSWORD_HASH"), {} ), ) @@ -456,9 +457,9 @@ def get_url( # For (mostly) testing - allow changing/adding the url - for example # add a different host:port for cases where the UI is running # separately. - if _security.redirect_host: + if config_value("REDIRECT_HOST"): url = transform_url( - endpoint_or_url, qparams, netloc=_security.redirect_host + endpoint_or_url, qparams, netloc=config_value("REDIRECT_HOST") ) else: url = transform_url(endpoint_or_url, qparams) @@ -649,11 +650,11 @@ def config_value(key, app=None, default=None, strict=True): :param strict: if True, will raise ValueError if key doesn't exist """ app = app or current_app + key = f"SECURITY_{key.upper()}" # protect against spelling mistakes - config = get_config(app) - if strict and key.upper() not in config: + if strict and key not in app.config: raise ValueError(f"Key {key} doesn't exist") - return config.get(key.upper(), default) + return app.config.get(key, default) def get_max_age(key, app=None): @@ -994,7 +995,7 @@ def base_render_json( if additional: payload.update(additional) - return _security._render_json(payload, code, headers=None, user=user) + return _security._render_json(payload, code, None, user) def default_want_json(req): @@ -1047,6 +1048,10 @@ def default(self, obj): return JSONEncoder.default(self, obj) +def default_render_template(*args, **kwargs): + return render_template(*args, **kwargs) + + class SmsSenderBaseClass(metaclass=abc.ABCMeta): def __init__(self, *args, **kwargs): pass diff --git a/flask_security/views.py b/flask_security/views.py index a9cb65aa..b7893523 100644 --- a/flask_security/views.py +++ b/flask_security/views.py @@ -5,7 +5,7 @@ Flask-Security views module :copyright: (c) 2012 by Matt Wright. - :copyright: (c) 2019-2020 by J. Christopher Wagner (jwag). + :copyright: (c) 2019-2021 by J. Christopher Wagner (jwag). :license: MIT, see LICENSE for more details. CSRF is tricky. By default all our forms have CSRF protection built in via @@ -26,12 +26,11 @@ so app must set CSRF_IGNORE_UNAUTH_ENDPOINTS (or use CSRF/session cookie for logging in then once they have a token, no need for cookie). - TODO: two-factor routes such as tf_setup need work. They seem to support both - authenticated (via session?) as well as unauthenticated access. """ from functools import partial import time +import typing as t from flask import ( Blueprint, @@ -41,7 +40,7 @@ session, ) from flask_login import current_user -from werkzeug.datastructures import MultiDict +from werkzeug.datastructures import ImmutableMultiDict, MultiDict from .changeable import change_user_password from .confirmable import ( @@ -81,7 +80,7 @@ from .utils import ( base_render_json, check_and_update_authn_fresh, - config_value, + config_value as cv, do_flash, get_message, get_post_login_redirect, @@ -105,6 +104,9 @@ else: from flask import make_response, redirect +if t.TYPE_CHECKING: # pragma: no cover + from flask.typing import ResponseValue + def default_render_json(payload, code, headers, user): """Default JSON response handler.""" @@ -121,7 +123,7 @@ def _ctx(endpoint): @unauth_csrf(fall_through=True) -def login(): +def login() -> "ResponseValue": """View function for login view Allow already authenticated users. For GET this is useful for @@ -144,7 +146,7 @@ def login(): ) return _security._render_json(payload, 400, None, None) else: - return redirect(get_url(_security.post_login_view)) + return redirect(get_url(cv("POST_LOGIN_VIEW"))) form_class = _security.login_form @@ -159,9 +161,11 @@ def login(): if form.validate_on_submit(): remember_me = form.remember.data if "remember" in form else None - if config_value("TWO_FACTOR"): + if cv("TWO_FACTOR"): if request.is_json and request.content_length: - tf_validity_token = request.get_json().get("tf_validity_token", None) + tf_validity_token = request.get_json().get( # type: ignore + "tf_validity_token", None + ) else: tf_validity_token = request.cookies.get("tf_validity", default=None) @@ -169,10 +173,8 @@ def login(): tf_validity_token, form.user.fs_uniquifier ) - if config_value("TWO_FACTOR_REQUIRED") or (is_tf_setup(form.user)): - if config_value("TWO_FACTOR_ALWAYS_VALIDATE") or ( - not tf_validity_token_is_valid - ): + if cv("TWO_FACTOR_REQUIRED") or (is_tf_setup(form.user)): + if cv("TWO_FACTOR_ALWAYS_VALIDATE") or (not tf_validity_token_is_valid): return tf_login( form.user, remember=remember_me, primary_authn_via="password" @@ -191,22 +193,22 @@ def login(): return base_render_json(form) if current_user.is_authenticated: - return redirect(get_url(_security.post_login_view)) + return redirect(get_url(cv("POST_LOGIN_VIEW"))) else: - if form.requires_confirmation and _security.requires_confirmation_error_view: + if form.requires_confirmation and cv("REQUIRES_CONFIRMATION_ERROR_VIEW"): do_flash(*get_message("CONFIRMATION_REQUIRED")) return redirect( get_url( - _security.requires_confirmation_error_view, + cv("REQUIRES_CONFIRMATION_ERROR_VIEW"), qparams={"email": form.email.data}, ) ) return _security.render_template( - config_value("LOGIN_USER_TEMPLATE"), login_user_form=form, **_ctx("login") + cv("LOGIN_USER_TEMPLATE"), login_user_form=form, **_ctx("login") ) -@auth_required(lambda: config_value("API_ENABLED_METHODS")) +@auth_required(lambda: cv("API_ENABLED_METHODS")) def verify(): """View function which handles a authentication verification request.""" form_class = _security.verify_form @@ -233,7 +235,7 @@ def verify(): return base_render_json(form) return _security.render_template( - config_value("VERIFY_TEMPLATE"), verify_form=form, **_ctx("verify") + cv("VERIFY_TEMPLATE"), verify_form=form, **_ctx("verify") ) @@ -252,7 +254,7 @@ def logout(): @anonymous_user_required -def register(): +def register() -> "ResponseValue": """View function which handles a registration request.""" # For some unknown historic reason - if you don't require confirmation @@ -265,7 +267,7 @@ def register(): form_class = _security.register_form if request.is_json: - form_data = MultiDict(request.get_json()) + form_data: ImmutableMultiDict = ImmutableMultiDict(request.get_json()) else: form_data = request.form @@ -279,8 +281,8 @@ def register(): # an application that would want random email accounts. It has been like this # since the beginning. Note that we still enforce 2FA - however for unified # signin - we adhere to historic behavior. - if not _security.confirmable or _security.login_without_confirmation: - if config_value("TWO_FACTOR") and config_value("TWO_FACTOR_REQUIRED"): + if not _security.confirmable or cv("LOGIN_WITHOUT_CONFIRMATION"): + if cv("TWO_FACTOR") and cv("TWO_FACTOR_REQUIRED"): return tf_login(user, primary_authn_via="register") after_this_request(view_commit) login_user(user, authn_via=["register"]) @@ -295,7 +297,7 @@ def register(): return base_render_json(form) return _security.render_template( - config_value("REGISTER_USER_TEMPLATE"), + cv("REGISTER_USER_TEMPLATE"), register_user_form=form, **_ctx("register"), ) @@ -321,7 +323,7 @@ def send_login(): return base_render_json(form) return _security.render_template( - config_value("SEND_LOGIN_TEMPLATE"), send_login_form=form, **_ctx("send_login") + cv("SEND_LOGIN_TEMPLATE"), send_login_form=form, **_ctx("send_login") ) @@ -387,7 +389,7 @@ def send_confirmation(): return base_render_json(form) return _security.render_template( - config_value("SEND_CONFIRMATION_TEMPLATE"), + cv("SEND_CONFIRMATION_TEMPLATE"), send_confirmation_form=form, **_ctx("send_confirmation"), ) @@ -440,12 +442,12 @@ def confirm_email(token): if user != current_user: logout_user() - if config_value("AUTO_LOGIN_AFTER_CONFIRM"): + if cv("AUTO_LOGIN_AFTER_CONFIRM"): # N.B. this is a (small) security risk if email went to wrong place. # and you have the LOGIN_WITH_CONFIRMATION flag since in that case # you can be logged in and doing stuff - but another person could # get the email. - if config_value("TWO_FACTOR") and config_value("TWO_FACTOR_REQUIRED"): + if cv("TWO_FACTOR") and cv("TWO_FACTOR_REQUIRED"): return tf_login(user, primary_authn_via="confirm") login_user(user, authn_via=["confirm"]) @@ -461,7 +463,7 @@ def confirm_email(token): get_url(_security.post_confirm_view) or get_url( _security.post_login_view - if config_value("AUTO_LOGIN_AFTER_CONFIRM") + if cv("AUTO_LOGIN_AFTER_CONFIRM") else _security.login_url ) ) @@ -497,7 +499,7 @@ def forgot_password(): ) return _security.render_template( - config_value("FORGOT_PASSWORD_TEMPLATE"), + cv("FORGOT_PASSWORD_TEMPLATE"), forgot_password_form=form, **_ctx("forgot_password"), ) @@ -564,7 +566,7 @@ def reset_password(token): ) # for forms - render the reset password form return _security.render_template( - config_value("RESET_PASSWORD_TEMPLATE"), + cv("RESET_PASSWORD_TEMPLATE"), reset_password_form=form, reset_password_token=token, **_ctx("reset_password"), @@ -597,8 +599,8 @@ def reset_password(token): if form.validate_on_submit(): after_this_request(view_commit) update_password(user, form.password.data) - if config_value("TWO_FACTOR") and ( - config_value("TWO_FACTOR_REQUIRED") + if cv("TWO_FACTOR") and ( + cv("TWO_FACTOR_REQUIRED") or (form.user.tf_totp_secret and form.user.tf_primary_method) ): return tf_login(user, primary_authn_via="reset") @@ -618,14 +620,14 @@ def reset_password(token): if _security._want_json(request): return base_render_json(form) return _security.render_template( - config_value("RESET_PASSWORD_TEMPLATE"), + cv("RESET_PASSWORD_TEMPLATE"), reset_password_form=form, reset_password_token=token, **_ctx("reset_password"), ) -@auth_required(lambda: config_value("API_ENABLED_METHODS")) +@auth_required(lambda: cv("API_ENABLED_METHODS")) def change_password(): """View function which handles a change password request.""" @@ -653,7 +655,7 @@ def change_password(): return base_render_json(form) return _security.render_template( - config_value("CHANGE_PASSWORD_TEMPLATE"), + cv("CHANGE_PASSWORD_TEMPLATE"), change_password_form=form, **_ctx("change_password"), ) @@ -707,12 +709,12 @@ def two_factor_setup(): else: # Caller is changing their TFA profile. This requires a 'fresh' authentication if not check_and_update_authn_fresh( - config_value("FRESHNESS"), - config_value("FRESHNESS_GRACE_PERIOD"), + cv("FRESHNESS"), + cv("FRESHNESS_GRACE_PERIOD"), get_request_attr("fs_authn_via"), ): return _security._reauthn_handler( - config_value("FRESHNESS"), config_value("FRESHNESS_GRACE_PERIOD") + cv("FRESHNESS"), cv("FRESHNESS_GRACE_PERIOD") ) user = current_user @@ -791,10 +793,10 @@ def two_factor_setup(): code_form = _security.two_factor_verify_code_form() if not _security._want_json(request): return _security.render_template( - config_value("TWO_FACTOR_SETUP_TEMPLATE"), + cv("TWO_FACTOR_SETUP_TEMPLATE"), two_factor_setup_form=form, two_factor_verify_code_form=code_form, - choices=config_value("TWO_FACTOR_ENABLED_METHODS"), + choices=cv("TWO_FACTOR_ENABLED_METHODS"), chosen_method=pm, **qrcode_values, **_ctx("tf_setup"), @@ -804,14 +806,14 @@ def two_factor_setup(): # We get here on GET and POST with failed validation. # For things like phone number - we've already done one POST # that succeeded and now it failed - so retain the initial info - choices = config_value("TWO_FACTOR_ENABLED_METHODS") - if not config_value("TWO_FACTOR_REQUIRED"): + choices = cv("TWO_FACTOR_ENABLED_METHODS") + if not cv("TWO_FACTOR_REQUIRED"): choices.append("disable") if _security._want_json(request): # Provide information application/UI might need to render their own form/input json_response = { - "tf_required": config_value("TWO_FACTOR_REQUIRED"), + "tf_required": cv("TWO_FACTOR_REQUIRED"), "tf_primary_method": getattr(user, "tf_primary_method", None), "tf_phone_number": getattr(user, "tf_phone_number", None), "tf_available_methods": choices, @@ -820,12 +822,12 @@ def two_factor_setup(): code_form = _security.two_factor_verify_code_form() return _security.render_template( - config_value("TWO_FACTOR_SETUP_TEMPLATE"), + cv("TWO_FACTOR_SETUP_TEMPLATE"), two_factor_setup_form=form, two_factor_verify_code_form=code_form, choices=choices, chosen_method=form.setup.data, - two_factor_required=config_value("TWO_FACTOR_REQUIRED"), + two_factor_required=cv("TWO_FACTOR_REQUIRED"), **_ctx("tf_setup"), ) @@ -915,9 +917,9 @@ def two_factor_token_validation(): return redirect(get_post_login_redirect()) - if ( - not config_value("TWO_FACTOR_ALWAYS_VALIDATE") and remember - ) and _security._want_json(request): + if (not cv("TWO_FACTOR_ALWAYS_VALIDATE") and remember) and _security._want_json( + request + ): token = generate_tf_validity_token(form.user.fs_uniquifier) json_response = {"tf_validity_token": token} return base_render_json(form, additional=json_response) @@ -930,10 +932,10 @@ def two_factor_token_validation(): setup_form = _security.two_factor_setup_form() return _security.render_template( - config_value("TWO_FACTOR_SETUP_TEMPLATE"), + cv("TWO_FACTOR_SETUP_TEMPLATE"), two_factor_setup_form=setup_form, two_factor_verify_code_form=form, - choices=config_value("TWO_FACTOR_ENABLED_METHODS"), + choices=cv("TWO_FACTOR_ENABLED_METHODS"), **_ctx("tf_setup"), ) @@ -942,7 +944,7 @@ def two_factor_token_validation(): rescue_form = _security.two_factor_rescue_form() return _security.render_template( - config_value("TWO_FACTOR_VERIFY_CODE_TEMPLATE"), + cv("TWO_FACTOR_VERIFY_CODE_TEMPLATE"), two_factor_rescue_form=rescue_form, two_factor_verify_code_form=form, problem=None, @@ -1003,8 +1005,8 @@ def two_factor_rescue(): # send app provider a mail message regarding trouble elif problem == "no_mail_access": send_mail( - config_value("EMAIL_SUBJECT_TWO_FACTOR_RESCUE"), - config_value("TWO_FACTOR_RESCUE_MAIL"), + cv("EMAIL_SUBJECT_TWO_FACTOR_RESCUE"), + cv("TWO_FACTOR_RESCUE_MAIL"), "two_factor_rescue", user=form.user, ) @@ -1016,10 +1018,10 @@ def two_factor_rescue(): code_form = _security.two_factor_verify_code_form() return _security.render_template( - config_value("TWO_FACTOR_VERIFY_CODE_TEMPLATE"), + cv("TWO_FACTOR_VERIFY_CODE_TEMPLATE"), two_factor_verify_code_form=code_form, two_factor_rescue_form=form, - rescue_mail=config_value("TWO_FACTOR_RESCUE_MAIL"), + rescue_mail=cv("TWO_FACTOR_RESCUE_MAIL"), problem=rproblem, **_ctx("tf_token_validation"), ) @@ -1058,13 +1060,13 @@ def create_blueprint(app, state, import_name, json_encoder=None): state.login_url + slash_url_suffix(state.login_url, ""), endpoint="token_login", )(token_login) - elif config_value("US_SIGNIN_REPLACES_LOGIN", app=app): + elif cv("US_SIGNIN_REPLACES_LOGIN", app=app): bp.route(state.login_url, methods=["GET", "POST"], endpoint="login")(us_signin) else: bp.route(state.login_url, methods=["GET", "POST"], endpoint="login")(login) - if config_value("FRESHNESS", app=app).total_seconds() >= 0: + if cv("FRESHNESS", app=app).total_seconds() >= 0: bp.route(state.verify_url, methods=["GET", "POST"], endpoint="verify")(verify) if state.unified_signin: @@ -1086,7 +1088,7 @@ def create_blueprint(app, state, import_name, json_encoder=None): )(us_setup_validate) # Freshness verification - if config_value("FRESHNESS", app=app).total_seconds() >= 0: + if cv("FRESHNESS", app=app).total_seconds() >= 0: bp.route( state.us_verify_url, methods=["GET", "POST"], endpoint="us_verify" )(us_verify) diff --git a/requirements/dev.txt b/requirements/dev.txt index c35df80e..8a7b8416 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -7,6 +7,7 @@ pymysql pre-commit tox sqlalchemy[mypy] +types-requests # for dev - might not install Flask-Security - list those dependencies here flask diff --git a/tests/conftest.py b/tests/conftest.py index d363c555..e97accd4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,7 @@ import os import tempfile import time +import typing as t from datetime import datetime from urllib.parse import urlsplit @@ -54,6 +55,9 @@ except ImportError: NO_BABEL = True +if t.TYPE_CHECKING: # pragma: no cover + from flask.testing import FlaskClient + @pytest.fixture() def app(request: pytest.FixtureRequest) -> "Flask": @@ -605,9 +609,12 @@ def tear_down(): @pytest.fixture() -def sqlalchemy_app(app, sqlalchemy_datastore): - def create(): - app.security = Security(app, datastore=sqlalchemy_datastore) +def sqlalchemy_app( + app: "Flask", sqlalchemy_datastore: SQLAlchemyUserDatastore +) -> t.Callable[[], "Flask"]: + def create() -> "Flask": + security = Security(app, datastore=sqlalchemy_datastore) + app.security = security # type: ignore return app return create @@ -650,7 +657,7 @@ def create(): @pytest.fixture() -def client(request, sqlalchemy_app): +def client(request: pytest.FixtureRequest, sqlalchemy_app: t.Callable) -> "FlaskClient": app = sqlalchemy_app() populate_data(app) return app.test_client() @@ -694,7 +701,7 @@ def in_app_context(request, sqlalchemy_app): @pytest.fixture() -def get_message(app): +def get_message(app: "Flask") -> t.Callable[..., bytes]: def fn(key, **kwargs): rv = app.config["SECURITY_MSG_" + key][0] % kwargs return rv.encode("utf-8") diff --git a/tests/test_changeable.py b/tests/test_changeable.py index b19dc20f..a78e270a 100644 --- a/tests/test_changeable.py +++ b/tests/test_changeable.py @@ -317,11 +317,10 @@ def test_xlation(app, client, get_message_local): with app.test_request_context(): # Check header assert ( - f'

{localize_callback("Change password")}

'.encode("utf-8") - in response.data + f'

{localize_callback("Change password")}

'.encode() in response.data ) submit = localize_callback(_default_field_labels["change_password"]) - assert f'value="{submit}"'.encode("utf-8") in response.data + assert f'value="{submit}"'.encode() in response.data with app.mail.record_messages() as outbox: response = client.post( diff --git a/tests/test_misc.py b/tests/test_misc.py index a84534f0..2b682b75 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -5,7 +5,7 @@ Lots of tests :copyright: (c) 2012 by Matt Wright. - :copyright: (c) 2019-2020 by J. Christopher Wagner (jwag). + :copyright: (c) 2019-2021 by J. Christopher Wagner (jwag). :license: MIT, see LICENSE for more details. """ @@ -17,6 +17,7 @@ import pkg_resources import sys import time +import typing as t import pytest @@ -64,6 +65,10 @@ verify_hash, ) +if t.TYPE_CHECKING: # pragma: no cover + from flask import Flask + from flask.testing import FlaskClient + @pytest.mark.recoverable() def test_my_mail_util(app, sqlalchemy_datastore): @@ -424,7 +429,7 @@ def test_without_babel(app, client): assert response.status_code == 200 -def test_no_email_sender(app): +def test_no_email_sender(app, sqlalchemy_datastore): """Verify that if SECURITY_EMAIL_SENDER is default (which is a local proxy) that send_mail picks up MAIL_DEFAULT_SENDER. """ @@ -435,7 +440,7 @@ def __init__(self, email): self.email = email security = Security() - security.init_app(app) + security.init_app(app, sqlalchemy_datastore) with app.app_context(): app.try_trigger_before_first_request_functions() @@ -446,7 +451,7 @@ def __init__(self, email): assert "test@testme.com" == outbox[0].sender -def test_sender_tuple(app): +def test_sender_tuple(app, sqlalchemy_datastore): """Verify that if sender is a (name, address) tuple, in the received email sender is properly formatted as "name
" """ @@ -457,7 +462,7 @@ def __init__(self, email): self.email = email security = Security() - security.init_app(app) + security.init_app(app, sqlalchemy_datastore) with app.app_context(): app.try_trigger_before_first_request_functions() @@ -511,10 +516,10 @@ def test_myxlation(app, sqlalchemy_datastore, pytestconfig): @pytest.mark.babel() -def test_form_labels(app): +def test_form_labels(app, sqlalchemy_datastore): app.config["BABEL_DEFAULT_LOCALE"] = "fr_FR" app.security = Security() - app.security.init_app(app) + app.security.init_app(app, sqlalchemy_datastore) assert check_xlation(app, "fr_FR"), "You must run python setup.py compile_catalog" with app.test_request_context(): @@ -645,7 +650,7 @@ def __init__(self, email): @pytest.mark.skipif(sys.version_info < (3, 0), reason="requires python3 or higher") @pytest.mark.settings(password_check_breached="strict") -def test_breached(app): +def test_breached(app, sqlalchemy_datastore): # partial response from: https://api.pwnedpasswords.com/range/07003 pwned_response = b"AF5A73CD3CBCFDCD12B0B68CB7930F3E888:2\r\n\ @@ -657,7 +662,7 @@ def test_breached(app): B3902FD808DCA504AAAD30F3C14BD3ACE7C:10" app.security = Security() - app.security.init_app(app) + app.security.init_app(app, sqlalchemy_datastore) with app.test_request_context(): with mock.patch("urllib.request.urlopen") as mock_urlopen: mock_urlopen.return_value.__enter__.return_value.read.return_value = ( @@ -674,7 +679,7 @@ def test_breached(app): password_breached_count=16, password_complexity_checker="zxcvbn", ) -def test_breached_cnt(app): +def test_breached_cnt(app, sqlalchemy_datastore): # partial response from: https://api.pwnedpasswords.com/range/07003 pwned_response = b"AF5A73CD3CBCFDCD12B0B68CB7930F3E888:2\r\n\ @@ -686,7 +691,7 @@ def test_breached_cnt(app): B3902FD808DCA504AAAD30F3C14BD3ACE7C:10" app.security = Security() - app.security.init_app(app) + app.security.init_app(app, sqlalchemy_datastore) with app.test_request_context(): with mock.patch("urllib.request.urlopen") as mock_urlopen: mock_urlopen.return_value.__enter__.return_value.read.return_value = ( @@ -700,11 +705,11 @@ def test_breached_cnt(app): @pytest.mark.skip @pytest.mark.settings(password_check_breached="strict") -def test_breached_real(app): +def test_breached_real(app, sqlalchemy_datastore): """Actually go out to internet..""" app.security = Security() - app.security.init_app(app) + app.security.init_app(app, sqlalchemy_datastore) with app.test_request_context(): pbad, pnorm = app.security._password_util.validate("flaskflask", True) assert len(pbad) == 1 @@ -765,7 +770,7 @@ def get(self): assert b"Hi view" in response.data -def test_phone_util_override(app): +def test_phone_util_override(app, sqlalchemy_datastore): class MyPhoneUtil: def __init__(self, app): pass @@ -777,13 +782,15 @@ def get_canonical_form(self, input_data): return "very-canonical" app.security = Security() - app.security.init_app(app, phone_util_cls=MyPhoneUtil) + app.security.init_app(app, sqlalchemy_datastore, phone_util_cls=MyPhoneUtil) with app.app_context(): assert uia_phone_mapper("55") == "very-canonical" -def test_authn_freshness(app, client, get_message): +def test_authn_freshness( + app: "Flask", client: "FlaskClient", get_message: t.Callable[..., bytes] +) -> None: """Test freshness using default reauthn_handler""" @auth_required(within=30, grace=0) @@ -819,9 +826,9 @@ def myspecialview(): # Test json error response response = client.get("/myspecialview", headers={"accept": "application/json"}) assert response.status_code == 401 - assert response.json["response"]["error"].encode("utf-8") == get_message( - "REAUTHENTICATION_REQUIRED" - ) + assert response.json and response.json["response"]["error"].encode( + "utf-8" + ) == get_message("REAUTHENTICATION_REQUIRED") def test_authn_freshness_handler(app, client, get_message): @@ -1123,3 +1130,17 @@ def test_validate_redirect(app, sqlalchemy_datastore): assert not validate_redirect_url(" //github.com") assert not validate_redirect_url("\t//github.com") assert not validate_redirect_url("//github.com") # this is normal urlsplit + + +def test_kwargs(): + import warnings + + warnings.simplefilter("error") + with pytest.raises(DeprecationWarning): + Security(myownkwarg="hi") + + +def test_nodatastore(app): + with pytest.raises(ValueError): + s = Security(app) + s.init_app(app) diff --git a/tests/test_registerable.py b/tests/test_registerable.py index 0905bcdf..f97bdb0f 100644 --- a/tests/test_registerable.py +++ b/tests/test_registerable.py @@ -125,11 +125,9 @@ def test_xlation(app, client, get_message_local): response = client.get("/register", follow_redirects=True) with app.test_request_context(): # Check header - assert ( - f'

{localize_callback("Register")}

'.encode("utf-8") in response.data - ) + assert f'

{localize_callback("Register")}

'.encode() in response.data submit = localize_callback(_default_field_labels["register"]) - assert f'value="{submit}"'.encode("utf-8") in response.data + assert f'value="{submit}"'.encode() in response.data with app.mail.record_messages() as outbox: response = client.post( diff --git a/tox.ini b/tox.ini index 9a0f1eb5..5b439654 100644 --- a/tox.ini +++ b/tox.ini @@ -56,6 +56,7 @@ deps = git+git://github.com/pallets/flask-sqlalchemy@main#egg=flask-sqlalchemy git+git://github.com/pallets/jinja@main#egg=jinja2 git+git://github.com/wtforms/wtforms@master#egg=wtforms + git+git://github.com/maxcountryman/flask-login@main#egg=flask-login commands = python setup.py compile_catalog pytest --basetemp={envtmpdir} {posargs:tests}