From 69b327baa201316525e3bd4ba9b53aecfe0f84af Mon Sep 17 00:00:00 2001 From: Kevin O'Gorman Date: Tue, 16 Aug 2022 16:43:53 -0400 Subject: [PATCH] ran isort and black again after rebase --- ..._dropped_session_nonce_from_journalist_.py | 103 +++--- securedrop/journalist_app/__init__.py | 13 +- securedrop/journalist_app/account.py | 8 +- securedrop/journalist_app/admin.py | 48 +-- securedrop/journalist_app/api.py | 32 +- securedrop/journalist_app/col.py | 14 +- securedrop/journalist_app/main.py | 25 +- securedrop/journalist_app/sessions.py | 20 +- securedrop/journalist_app/utils.py | 39 +-- securedrop/models.py | 83 +---- .../migrations/migration_c5a02eb52f2d.py | 42 +-- securedrop/tests/test_2fa.py | 20 +- securedrop/tests/test_integration.py | 38 +- securedrop/tests/test_journalist.py | 326 +++++------------- securedrop/tests/test_journalist_api.py | 153 ++------ securedrop/tests/test_journalist_session.py | 73 +--- securedrop/tests/utils/__init__.py | 2 +- 17 files changed, 290 insertions(+), 749 deletions(-) diff --git a/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py b/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py index dc6c7051fe9..dc3c7a9065c 100644 --- a/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py +++ b/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py @@ -5,88 +5,95 @@ Create Date: 2022-04-16 21:25:22.398189 """ -from alembic import op import sqlalchemy as sa - +from alembic import op # revision identifiers, used by Alembic. -revision = 'c5a02eb52f2d' -down_revision = 'b7f98cfd6a70' +revision = "c5a02eb52f2d" +down_revision = "b7f98cfd6a70" branch_labels = None depends_on = None def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.drop_table('revoked_tokens') - with op.batch_alter_table('journalists', schema=None) as batch_op: - batch_op.drop_column('session_nonce') + op.drop_table("revoked_tokens") + with op.batch_alter_table("journalists", schema=None) as batch_op: + batch_op.drop_column("session_nonce") # ### end Alembic commands ### def downgrade() -> None: - '''This would have been the easy way, however previous does not have - default value and thus up/down assertion fails''' - #op.add_column('journalists', sa.Column('session_nonce', sa.Integer(), nullable=False, server_default='0')) + """This would have been the easy way, however previous does not have + default value and thus up/down assertion fails""" + # op.add_column('journalists', sa.Column('session_nonce', sa.Integer(), nullable=False, server_default='0')) conn = op.get_bind() conn.execute("PRAGMA legacy_alter_table=ON") # Save existing journalist table. - op.rename_table('journalists', 'journalists_tmp') + op.rename_table("journalists", "journalists_tmp") # Add nonce column. - op.add_column('journalists_tmp', sa.Column('session_nonce', sa.Integer())) + op.add_column("journalists_tmp", sa.Column("session_nonce", sa.Integer())) # Populate nonce column. - journalists = conn.execute( - sa.text("SELECT * FROM journalists_tmp")).fetchall() + journalists = conn.execute(sa.text("SELECT * FROM journalists_tmp")).fetchall() for journalist in journalists: conn.execute( - sa.text("""UPDATE journalists_tmp SET session_nonce=0 WHERE - id=:id""").bindparams(id=journalist.id) - ) + sa.text( + """UPDATE journalists_tmp SET session_nonce=0 WHERE + id=:id""" + ).bindparams(id=journalist.id) + ) # Now create new table with null constraint applied. - op.create_table('journalists', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('uuid', sa.String(length=36), nullable=False), - sa.Column('username', sa.String(length=255), nullable=False), - sa.Column('first_name', sa.String(length=255), nullable=True), - sa.Column('last_name', sa.String(length=255), nullable=True), - sa.Column('pw_salt', sa.Binary(), nullable=True), - sa.Column('pw_hash', sa.Binary(), nullable=True), - sa.Column('passphrase_hash', sa.String(length=256), nullable=True), - sa.Column('is_admin', sa.Boolean(), nullable=True), - sa.Column('session_nonce', sa.Integer(), nullable=False), - sa.Column('otp_secret', sa.String(length=32), nullable=True), - sa.Column('is_totp', sa.Boolean(), nullable=True), - sa.Column('hotp_counter', sa.Integer(), nullable=True), - sa.Column('last_token', sa.String(length=6), nullable=True), - sa.Column('created_on', sa.DateTime(), nullable=True), - sa.Column('last_access', sa.DateTime(), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('username'), - sa.UniqueConstraint('uuid') + op.create_table( + "journalists", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("uuid", sa.String(length=36), nullable=False), + sa.Column("username", sa.String(length=255), nullable=False), + sa.Column("first_name", sa.String(length=255), nullable=True), + sa.Column("last_name", sa.String(length=255), nullable=True), + sa.Column("pw_salt", sa.Binary(), nullable=True), + sa.Column("pw_hash", sa.Binary(), nullable=True), + sa.Column("passphrase_hash", sa.String(length=256), nullable=True), + sa.Column("is_admin", sa.Boolean(), nullable=True), + sa.Column("session_nonce", sa.Integer(), nullable=False), + sa.Column("otp_secret", sa.String(length=32), nullable=True), + sa.Column("is_totp", sa.Boolean(), nullable=True), + sa.Column("hotp_counter", sa.Integer(), nullable=True), + sa.Column("last_token", sa.String(length=6), nullable=True), + sa.Column("created_on", sa.DateTime(), nullable=True), + sa.Column("last_access", sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("username"), + sa.UniqueConstraint("uuid"), ) - conn.execute(''' + conn.execute( + """ INSERT INTO journalists SELECT id, uuid, username, first_name, last_name, pw_salt, pw_hash, passphrase_hash, is_admin, session_nonce, otp_secret, is_totp, hotp_counter, last_token, created_on, last_access FROM journalists_tmp - ''') + """ + ) # Now delete the old table. - op.drop_table('journalists_tmp') - - op.create_table('revoked_tokens', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('journalist_id', sa.INTEGER(), nullable=False), - sa.Column('token', sa.TEXT(), nullable=False), - sa.ForeignKeyConstraint(['journalist_id'], ['journalists.id'], ), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('token') + op.drop_table("journalists_tmp") + + op.create_table( + "revoked_tokens", + sa.Column("id", sa.INTEGER(), nullable=False), + sa.Column("journalist_id", sa.INTEGER(), nullable=False), + sa.Column("token", sa.TEXT(), nullable=False), + sa.ForeignKeyConstraint( + ["journalist_id"], + ["journalists.id"], + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("token"), ) diff --git a/securedrop/journalist_app/__init__.py b/securedrop/journalist_app/__init__.py index 3f2d6c10a90..443e5ce8079 100644 --- a/securedrop/journalist_app/__init__.py +++ b/securedrop/journalist_app/__init__.py @@ -10,7 +10,6 @@ import version from db import db from flask import Flask, abort, g, json, redirect, render_template, request, url_for -from flask_assets import Environment from flask_babel import gettext from flask_wtf.csrf import CSRFError, CSRFProtect from journalist_app import account, admin, api, col, main @@ -112,9 +111,7 @@ def _handle_http_exception( app.jinja_env.globals["version"] = version.__version__ app.jinja_env.filters["rel_datetime_format"] = template_filters.rel_datetime_format app.jinja_env.filters["filesizeformat"] = template_filters.filesizeformat - app.jinja_env.filters[ - "html_datetime_format" - ] = template_filters.html_datetime_format + app.jinja_env.filters["html_datetime_format"] = template_filters.html_datetime_format app.jinja_env.add_extension("jinja2.ext.do") @app.before_request @@ -132,9 +129,7 @@ def setup_g() -> "Optional[Response]": InstanceConfig.get_default().organization_name ) # pylint: disable=assigning-non-slot else: - g.organization_name = gettext( - "SecureDrop" - ) # pylint: disable=assigning-non-slot + g.organization_name = gettext("SecureDrop") # pylint: disable=assigning-non-slot try: g.logo = get_logo_url(app) # pylint: disable=assigning-non-slot @@ -152,9 +147,7 @@ def setup_g() -> "Optional[Response]": filesystem_id = request.form.get("filesystem_id") if filesystem_id: g.filesystem_id = filesystem_id # pylint: disable=assigning-non-slot - g.source = get_source( - filesystem_id - ) # pylint: disable=assigning-non-slot + g.source = get_source(filesystem_id) # pylint: disable=assigning-non-slot return None diff --git a/securedrop/journalist_app/account.py b/securedrop/journalist_app/account.py index a37b4d49f4d..f9f6ad0a368 100644 --- a/securedrop/journalist_app/account.py +++ b/securedrop/journalist_app/account.py @@ -52,17 +52,13 @@ def new_two_factor() -> Union[str, werkzeug.Response]: token = request.form["token"] if session.get_user().verify_token(token): flash( - gettext( - "Your two-factor credentials have been reset successfully." - ), + gettext("Your two-factor credentials have been reset successfully."), "notification", ) return redirect(url_for("account.edit")) else: flash( - gettext( - "There was a problem verifying the two-factor code. Please try again." - ), + gettext("There was a problem verifying the two-factor code. Please try again."), "error", ) diff --git a/securedrop/journalist_app/admin.py b/securedrop/journalist_app/admin.py index 637314926b4..c157394e5c0 100644 --- a/securedrop/journalist_app/admin.py +++ b/securedrop/journalist_app/admin.py @@ -20,18 +20,9 @@ ) from flask_babel import gettext from journalist_app.decorators import admin_required -from journalist_app.forms import ( - LogoForm, - NewUserForm, - OrgNameForm, - SubmissionPreferencesForm, -) +from journalist_app.forms import LogoForm, NewUserForm, OrgNameForm, SubmissionPreferencesForm from journalist_app.sessions import logout_user, session -from journalist_app.utils import ( - commit_account_changes, - set_diceware_password, - validate_hotp_secret, -) +from journalist_app.utils import commit_account_changes, set_diceware_password, validate_hotp_secret from models import ( FirstOrLastNameError, InstanceConfig, @@ -79,18 +70,14 @@ def manage_config() -> Union[str, werkzeug.Response]: if current_app.static_folder is None: abort(500) - custom_logo_filepath = os.path.join( - current_app.static_folder, "i", "custom_logo.png" - ) + custom_logo_filepath = os.path.join(current_app.static_folder, "i", "custom_logo.png") try: f.save(custom_logo_filepath) flash(gettext("Image updated."), "logo-success") except Exception: flash( # Translators: This error is shown when an uploaded image cannot be used. - gettext( - "Unable to process the image file. Please try another one." - ), + gettext("Unable to process the image file. Please try another one."), "logo-error", ) finally: @@ -122,9 +109,7 @@ def update_submission_preferences() -> Optional[werkzeug.Response]: reject_codenames = form.reject_codename_messages.data - InstanceConfig.update_submission_prefs( - allow_uploads, msg_length, reject_codenames - ) + InstanceConfig.update_submission_prefs(allow_uploads, msg_length, reject_codenames) flash(gettext("Preferences saved."), "submission-preferences-success") return redirect(url_for("admin.manage_config") + "#config-preventuploads") else: @@ -200,9 +185,7 @@ def add_user() -> Union[str, werkzeug.Response]: ) else: flash( - gettext( - "An unexpected error occurred! " "Please inform your admin." - ), + gettext("An unexpected error occurred! " "Please inform your admin."), "error", ) form_valid = False @@ -215,9 +198,7 @@ def add_user() -> Union[str, werkzeug.Response]: form_valid = False if "UNIQUE constraint failed: journalists.username" in str(e): flash( - gettext('Username "{username}" already taken.').format( - username=username - ), + gettext('Username "{username}" already taken.').format(username=username), "error", ) else: @@ -229,9 +210,7 @@ def add_user() -> Union[str, werkzeug.Response]: ), "error", ) - current_app.logger.error( - "Adding user " "'{}' failed: {}".format(username, e) - ) + current_app.logger.error("Adding user " "'{}' failed: {}".format(username, e)) if form_valid: return redirect(url_for("admin.new_user_two_factor", uid=new_user.id)) @@ -251,17 +230,14 @@ def new_user_two_factor() -> Union[str, werkzeug.Response]: if user.verify_token(token): flash( gettext( - 'The two-factor code for user "{user}" was verified ' - "successfully." + 'The two-factor code for user "{user}" was verified ' "successfully." ).format(user=user.username), "notification", ) return redirect(url_for("admin.index")) else: flash( - gettext( - "There was a problem verifying the two-factor code. Please try again." - ), + gettext("There was a problem verifying the two-factor code. Please try again."), "error", ) @@ -370,9 +346,7 @@ def delete_user(user_id: int) -> werkzeug.Response: # Do not flash because the interface does not expose this. # It can only happen by manually crafting a POST request current_app.logger.error( - 'Admin {} tried to delete "deleted" user'.format( - session.get_user().username - ) + 'Admin {} tried to delete "deleted" user'.format(session.get_user().username) ) abort(403) else: diff --git a/securedrop/journalist_app/api.py b/securedrop/journalist_app/api.py index 259a7ef6319..0f46e9f09c0 100644 --- a/securedrop/journalist_app/api.py +++ b/securedrop/journalist_app/api.py @@ -172,19 +172,11 @@ def source_conversation(source_uuid: str) -> Tuple[flask.Response, int]: def all_source_submissions(source_uuid: str) -> Tuple[flask.Response, int]: source = get_or_404(Source, source_uuid, column=Source.uuid) return ( - jsonify( - { - "submissions": [ - submission.to_json() for submission in source.submissions - ] - } - ), + jsonify({"submissions": [submission.to_json() for submission in source.submissions]}), 200, ) - @api.route( - "/sources//submissions//download", methods=["GET"] - ) + @api.route("/sources//submissions//download", methods=["GET"]) def download_submission(source_uuid: str, submission_uuid: str) -> flask.Response: get_or_404(Source, source_uuid, column=Source.uuid) submission = get_or_404(Submission, submission_uuid, column=Submission.uuid) @@ -201,9 +193,7 @@ def download_reply(source_uuid: str, reply_uuid: str) -> flask.Response: "/sources//submissions/", methods=["GET", "DELETE"], ) - def single_submission( - source_uuid: str, submission_uuid: str - ) -> Tuple[flask.Response, int]: + def single_submission(source_uuid: str, submission_uuid: str) -> Tuple[flask.Response, int]: if request.method == "GET": get_or_404(Source, source_uuid, column=Source.uuid) submission = get_or_404(Submission, submission_uuid, column=Submission.uuid) @@ -306,9 +296,7 @@ def get_all_submissions() -> Tuple[flask.Response, int]: jsonify( { "submissions": [ - submission.to_json() - for submission in submissions - if submission.source + submission.to_json() for submission in submissions if submission.source ] } ), @@ -319,9 +307,7 @@ def get_all_submissions() -> Tuple[flask.Response, int]: def get_all_replies() -> Tuple[flask.Response, int]: replies = Reply.query.all() return ( - jsonify( - {"replies": [reply.to_json() for reply in replies if reply.source]} - ), + jsonify({"replies": [reply.to_json() for reply in replies if reply.source]}), 200, ) @@ -332,9 +318,7 @@ def seen() -> Tuple[flask.Response, int]: """ if request.method == "POST": - if request.json is None or not isinstance( - request.json, collections.abc.Mapping - ): + if request.json is None or not isinstance(request.json, collections.abc.Mapping): abort(400, "Please send requests in valid JSON.") if not any(map(request.json.get, ["files", "messages", "replies"])): @@ -350,9 +334,7 @@ def seen() -> Tuple[flask.Response, int]: targets.add(f) for message_uuid in request.json.get("messages", []): - m = Submission.query.filter( - Submission.uuid == message_uuid - ).one_or_none() + m = Submission.query.filter(Submission.uuid == message_uuid).one_or_none() if m is None or not m.is_message: abort(404, "message not found: {}".format(message_uuid)) targets.add(m) diff --git a/securedrop/journalist_app/col.py b/securedrop/journalist_app/col.py index 59475b003a4..cda17ba5f7d 100644 --- a/securedrop/journalist_app/col.py +++ b/securedrop/journalist_app/col.py @@ -65,9 +65,7 @@ def col(filesystem_id: str) -> str: except GpgKeyNotFoundError: source.has_key = False - return render_template( - "col.html", filesystem_id=filesystem_id, source=source, form=form - ) + return render_template("col.html", filesystem_id=filesystem_id, source=source, form=form) @view.route("/delete/", methods=("POST",)) def delete_single(filesystem_id: str) -> werkzeug.Response: @@ -85,9 +83,9 @@ def delete_single(filesystem_id: str) -> werkzeug.Response: # Translators: Precedes a message confirming the success of an operation. escape(gettext("Success!")), escape( - gettext( - "The account and data for the source {} have been deleted." - ).format(source.journalist_designation) + gettext("The account and data for the source {} have been deleted.").format( + source.journalist_designation + ) ), ) ), @@ -158,9 +156,7 @@ def download_single_file(filesystem_id: str, fn: str) -> werkzeug.Response: reply = Reply.query.filter(Reply.filename == fn).one() mark_seen([reply], journalist) elif fn.endswith("-doc.gz.gpg") or fn.endswith("doc.zip.gpg"): - submitted_file = Submission.query.filter( - Submission.filename == fn - ).one() + submitted_file = Submission.query.filter(Submission.filename == fn).one() mark_seen([submitted_file], journalist) else: message = Submission.query.filter(Submission.filename == fn).one() diff --git a/securedrop/journalist_app/main.py b/securedrop/journalist_app/main.py index a24f435bfcd..4d3dcbd750d 100644 --- a/securedrop/journalist_app/main.py +++ b/securedrop/journalist_app/main.py @@ -148,9 +148,7 @@ def reply() -> werkzeug.Response: EncryptionManager.get_default().encrypt_journalist_reply( for_source_with_filesystem_id=g.filesystem_id, reply_in=form.message.data, - encrypted_reply_path_out=Path( - Storage.get_default().path(g.filesystem_id, filename) - ), + encrypted_reply_path_out=Path(Storage.get_default().path(g.filesystem_id, filename)), ) try: @@ -181,10 +179,7 @@ def reply() -> werkzeug.Response: # Translators: Precedes a message confirming the success of an operation. escape(gettext("Success!")), escape( - gettext( - "The source will receive your reply " - "next time they log in." - ) + gettext("The source will receive your reply " "next time they log in.") ), ) ), @@ -198,9 +193,7 @@ def bulk() -> Union[str, werkzeug.Response]: action = request.form["action"] error_redirect = url_for("col.col", filesystem_id=g.filesystem_id) doc_names_selected = request.form.getlist("doc_names_selected") - selected_docs = [ - doc for doc in g.source.collection if doc.filename in doc_names_selected - ] + selected_docs = [doc for doc in g.source.collection if doc.filename in doc_names_selected] if selected_docs == []: if action == "download": flash( @@ -208,11 +201,7 @@ def bulk() -> Union[str, werkzeug.Response]: "{} {}".format( # Translators: Error shown when a user has not selected items to act on. escape(gettext("Nothing Selected")), - escape( - gettext( - "You must select one or more items for download" - ) - ), + escape(gettext("You must select one or more items for download")), ) ), "error", @@ -223,11 +212,7 @@ def bulk() -> Union[str, werkzeug.Response]: "{} {}".format( # Translators: Error shown when a user has not selected items to act on. escape(gettext("Nothing Selected")), - escape( - gettext( - "You must select one or more items for deletion" - ) - ), + escape(gettext("You must select one or more items for deletion")), ) ), "error", diff --git a/securedrop/journalist_app/sessions.py b/securedrop/journalist_app/sessions.py index c0171d2d61e..eaef71a0814 100644 --- a/securedrop/journalist_app/sessions.py +++ b/securedrop/journalist_app/sessions.py @@ -18,9 +18,7 @@ class ServerSideSession(CallbackDict, SessionMixin): """Baseclass for server-side based sessions.""" - def __init__( - self, sid: str, token: str, lifetime: int = 0, initial: Any = None - ) -> None: + def __init__(self, sid: str, token: str, lifetime: int = 0, initial: Any = None) -> None: def on_update(self: ServerSideSession) -> None: self.modified = True @@ -125,14 +123,10 @@ def __init__( self.new = False self.has_same_site_capability = hasattr(self, "get_cookie_samesite") - def _new_session( - self, is_api: bool = False, initial: Any = None - ) -> ServerSideSession: + def _new_session(self, is_api: bool = False, initial: Any = None) -> ServerSideSession: sid = self._generate_sid() token: str = self._get_signer(app).dumps(sid) # type: ignore - session = ServerSideSession( - sid=sid, token=token, lifetime=self.lifetime, initial=initial - ) + session = ServerSideSession(sid=sid, token=token, lifetime=self.lifetime, initial=initial) session.new = True session.is_api = is_api return session @@ -214,15 +208,11 @@ def save_session( # type: ignore[override] # noqa session.sid = self._generate_sid() session.token = self._get_signer(app).dumps(session.sid) # type: ignore if session.new or session.to_regenerate: - self.redis.setex( - name=self.key_prefix + session.sid, value=val, time=expires - ) + self.redis.setex(name=self.key_prefix + session.sid, value=val, time=expires) elif session.modified: # To prevent race conditions where session is delete by an admin in the middle of a req # accept to save the session object if and only if alrady exists using the xx flag - self.redis.set( - name=self.key_prefix + session.sid, value=val, ex=expires, xx=True - ) + self.redis.set(name=self.key_prefix + session.sid, value=val, ex=expires, xx=True) if not session.is_api and (session.new or session.to_regenerate): response.headers.add("Vary", "Cookie") response.set_cookie( diff --git a/securedrop/journalist_app/utils.py b/securedrop/journalist_app/utils.py index 8ba441c84e8..17f5664670b 100644 --- a/securedrop/journalist_app/utils.py +++ b/securedrop/journalist_app/utils.py @@ -8,16 +8,7 @@ import werkzeug from db import db from encryption import EncryptionManager -from flask import ( - Markup, - abort, - current_app, - escape, - flash, - redirect, - send_file, - url_for, -) +from flask import Markup, abort, current_app, escape, flash, redirect, send_file, url_for from flask_babel import gettext, ngettext from journalist_app.sessions import session from models import ( @@ -54,9 +45,7 @@ def commit_account_changes(user: Journalist) -> None: gettext("An unexpected error occurred! Please " "inform your admin."), "error", ) - current_app.logger.error( - "Account changes for '{}' failed: {}".format(user, e) - ) + current_app.logger.error("Account changes for '{}' failed: {}".format(user, e)) db.session.rollback() else: flash(gettext("Account updated."), "success") @@ -117,8 +106,7 @@ def validate_user( elif isinstance(e, InvalidOTPSecretException): login_flashed_msg += " " login_flashed_msg += gettext( - "Your 2FA details are invalid" - " - please contact an administrator to reset them." + "Your 2FA details are invalid" " - please contact an administrator to reset them." ) else: try: @@ -163,8 +151,7 @@ def validate_hotp_secret(user: Journalist, otp_secret: str) -> bool: if "Non-hexadecimal digit found" in str(e): flash( gettext( - "Invalid HOTP secret format: " - "please only submit letters A-F and numbers 0-9." + "Invalid HOTP secret format: " "please only submit letters A-F and numbers 0-9." ), "error", ) @@ -223,9 +210,7 @@ def download( include in the ZIP-file. """ try: - zf = Storage.get_default().get_bulk_archive( - submissions, zip_directory=zip_basename - ) + zf = Storage.get_default().get_bulk_archive(submissions, zip_directory=zip_basename) except FileNotFoundError: flash( ngettext( @@ -256,9 +241,7 @@ def download( def delete_file_object(file_object: Union[Submission, Reply]) -> None: - path = Storage.get_default().path( - file_object.source.filesystem_id, file_object.filename - ) + path = Storage.get_default().path(file_object.source.filesystem_id, file_object.filename) try: Storage.get_default().move_to_shredder(path) except ValueError as e: @@ -432,11 +415,7 @@ def purge_deleted_sources() -> None: """ Deletes all Sources with a non-null `deleted_at` attribute. """ - sources = ( - Source.query.filter(Source.deleted_at.isnot(None)) - .order_by(Source.deleted_at) - .all() - ) + sources = Source.query.filter(Source.deleted_at.isnot(None)).order_by(Source.deleted_at).all() if sources: current_app.logger.info("Purging deleted sources (%s)", len(sources)) for source in sources: @@ -446,9 +425,7 @@ def purge_deleted_sources() -> None: current_app.logger.error("Error deleting source %s: %s", source.uuid, e) -def set_name( - user: Journalist, first_name: Optional[str], last_name: Optional[str] -) -> None: +def set_name(user: Journalist, first_name: Optional[str], last_name: Optional[str]) -> None: try: user.set_name(first_name, last_name) db.session.commit() diff --git a/securedrop/models.py b/securedrop/models.py index 752bee43133..b606f198fdb 100644 --- a/securedrop/models.py +++ b/securedrop/models.py @@ -23,15 +23,7 @@ from passlib.hash import argon2 from passphrases import PassphraseGenerator from pyotp import HOTP, TOTP -from sqlalchemy import ( - Boolean, - Column, - DateTime, - ForeignKey, - Integer, - LargeBinary, - String, -) +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, LargeBinary, String from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Query, backref, relationship from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound @@ -50,9 +42,7 @@ # Minimum length for ascii-encoded OTP secrets - by default, secrets are now 160-bit (32 chars) # but existing Journalist users may still have 80-bit (16-char) secrets -OTP_SECRET_MIN_ASCII_LENGTH = ( - 16 # 80 bits == 40 hex digits (== 16 ascii-encoded chars in db) -) +OTP_SECRET_MIN_ASCII_LENGTH = 16 # 80 bits == 40 hex digits (== 16 ascii-encoded chars in db) def get_one_or_else( @@ -105,11 +95,7 @@ def __repr__(self) -> str: def journalist_filename(self) -> str: valid_chars = "abcdefghijklmnopqrstuvwxyz1234567890-_" return "".join( - [ - c - for c in self.journalist_designation.lower().replace(" ", "_") - if c in valid_chars - ] + [c for c in self.journalist_designation.lower().replace(" ", "_") if c in valid_chars] ) def documents_messages_count(self) -> "Dict[str, int]": @@ -134,18 +120,14 @@ def collection(self) -> "List[Union[Submission, Reply]]": @property def fingerprint(self) -> "Optional[str]": try: - return EncryptionManager.get_default().get_source_key_fingerprint( - self.filesystem_id - ) + return EncryptionManager.get_default().get_source_key_fingerprint(self.filesystem_id) except GpgKeyNotFoundError: return None @property def public_key(self) -> "Optional[str]": try: - return EncryptionManager.get_default().get_source_public_key( - self.filesystem_id - ) + return EncryptionManager.get_default().get_source_public_key(self.filesystem_id) except GpgKeyNotFoundError: return None @@ -177,9 +159,7 @@ def to_json(self) -> "Dict[str, object]": }, "number_of_documents": docs_msg_count["documents"], "number_of_messages": docs_msg_count["messages"], - "submissions_url": url_for( - "api.all_source_submissions", source_uuid=self.uuid - ), + "submissions_url": url_for("api.all_source_submissions", source_uuid=self.uuid), "add_star_url": url_for("api.add_star", source_uuid=self.uuid), "remove_star_url": url_for("api.remove_star", source_uuid=self.uuid), "replies_url": url_for("api.all_source_replies", source_uuid=self.uuid), @@ -194,9 +174,7 @@ class Submission(db.Model): id = Column(Integer, primary_key=True) uuid = Column(String(36), unique=True, nullable=False) source_id = Column(Integer, ForeignKey("sources.id")) - source = relationship( - "Source", backref=backref("submissions", order_by=id, cascade="delete") - ) + source = relationship("Source", backref=backref("submissions", order_by=id, cascade="delete")) filename = Column(String(255), nullable=False) size = Column(Integer, nullable=False) @@ -219,9 +197,7 @@ def __repr__(self) -> str: @property def is_file(self) -> bool: - return self.filename.endswith("doc.gz.gpg") or self.filename.endswith( - "doc.zip.gpg" - ) + return self.filename.endswith("doc.gz.gpg") or self.filename.endswith("doc.zip.gpg") @property def is_message(self) -> bool: @@ -289,9 +265,7 @@ class Reply(db.Model): journalist = relationship("Journalist", backref=backref("replies", order_by=id)) source_id = Column(Integer, ForeignKey("sources.id")) - source = relationship( - "Source", backref=backref("replies", order_by=id, cascade="delete") - ) + source = relationship("Source", backref=backref("replies", order_by=id, cascade="delete")) filename = Column(String(255), nullable=False) size = Column(Integer, nullable=False) @@ -317,10 +291,7 @@ def __repr__(self) -> str: return "" % (self.filename) def to_json(self) -> "Dict[str, Any]": - seen_by = [ - r.journalist.uuid - for r in SeenReply.query.filter(SeenReply.reply_id == self.id) - ] + seen_by = [r.journalist.uuid for r in SeenReply.query.filter(SeenReply.reply_id == self.id)] json_reply = { "source_url": url_for("api.single_source", source_uuid=self.source.uuid) if self.source @@ -485,9 +456,7 @@ def __init__( self.set_hotp_secret(otp_secret) def __repr__(self) -> str: - return "".format( - self.username, " [admin]" if self.is_admin else "" - ) + return "".format(self.username, " [admin]" if self.is_admin else "") def _scrypt_hash(self, password: str, salt: bytes) -> bytes: backend = default_backend() @@ -588,9 +557,7 @@ def valid_password(self, passphrase: "Optional[str]") -> bool: # legacy support if self.pw_salt is None: raise ValueError( - "Should never happen: pw_salt is none for legacy Journalist {}".format( - self.id - ) + "Should never happen: pw_salt is none for legacy Journalist {}".format(self.id) ) # For type checking @@ -697,9 +664,7 @@ def throttle_login(cls, user: "Journalist") -> None: seconds=cls._LOGIN_ATTEMPT_PERIOD ) attempts_within_period = ( - JournalistLoginAttempt.query.filter( - JournalistLoginAttempt.journalist_id == user.id - ) + JournalistLoginAttempt.query.filter(JournalistLoginAttempt.journalist_id == user.id) .filter(JournalistLoginAttempt.timestamp > login_attempt_period) .all() ) @@ -737,9 +702,7 @@ def login( # For type checking assert isinstance(token, str) if pyotp.utils.compare_digest(token, user.last_token): - raise BadTokenException( - "previously used two-factor code " "{}".format(token) - ) + raise BadTokenException("previously used two-factor code " "{}".format(token)) if not user.verify_token(token): raise BadTokenException("invalid two-factor code") @@ -811,8 +774,7 @@ def delete(self) -> None: # For seen indicators, we need to make sure one doesn't already exist # otherwise it'll hit a unique key conflict already_seen_files = { - file.file_id - for file in SeenFile.query.filter_by(journalist_id=deleted.id).all() + file.file_id for file in SeenFile.query.filter_by(journalist_id=deleted.id).all() } for file in SeenFile.query.filter_by(journalist_id=self.id).all(): if file.file_id in already_seen_files: @@ -833,8 +795,7 @@ def delete(self) -> None: db.session.add(message) already_seen_replies = { - reply.reply_id - for reply in SeenReply.query.filter_by(journalist_id=deleted.id).all() + reply.reply_id for reply in SeenReply.query.filter_by(journalist_id=deleted.id).all() } for reply in SeenReply.query.filter_by(journalist_id=self.id).all(): if reply.reply_id in already_seen_replies: @@ -916,9 +877,7 @@ class InstanceConfig(db.Model): ) allow_document_uploads = Column(Boolean, default=True) organization_name = Column(String(255), nullable=True, default="SecureDrop") - initial_message_min_len = Column( - Integer, nullable=False, default=0, server_default="0" - ) + initial_message_min_len = Column(Integer, nullable=False, default=0, server_default="0") reject_message_with_codename = Column( Boolean, nullable=False, default=False, server_default="0" ) @@ -972,9 +931,7 @@ def get_current(cls) -> "InstanceConfig": """ try: - return cls.query.filter( - cls.valid_until == datetime.datetime.fromtimestamp(0) - ).one() + return cls.query.filter(cls.valid_until == datetime.datetime.fromtimestamp(0)).one() except NoResultFound: try: current = cls() @@ -982,9 +939,7 @@ def get_current(cls) -> "InstanceConfig": db.session.commit() return current except IntegrityError: - return cls.query.filter( - cls.valid_until == datetime.datetime.fromtimestamp(0) - ).one() + return cls.query.filter(cls.valid_until == datetime.datetime.fromtimestamp(0)).one() @classmethod def check_name_acceptable(cls, name: str) -> None: diff --git a/securedrop/tests/migrations/migration_c5a02eb52f2d.py b/securedrop/tests/migrations/migration_c5a02eb52f2d.py index 2c52790d5e4..22066f20d22 100644 --- a/securedrop/tests/migrations/migration_c5a02eb52f2d.py +++ b/securedrop/tests/migrations/migration_c5a02eb52f2d.py @@ -2,22 +2,20 @@ import random import uuid +from db import db +from journalist_app import create_app from sqlalchemy import text from sqlalchemy.exc import OperationalError -from db import db -from journalist_app import create_app from .helpers import random_chars class Helper: - def __init__(self): self.journalist_id = None class UpgradeTester(Helper): - def __init__(self, config): Helper.__init__(self) self.config = config @@ -25,22 +23,22 @@ def __init__(self, config): def create_journalist(self): params = { - 'uuid': str(uuid.uuid4()), - 'username': random_chars(50), - 'nonce': random.randint(20, 100) + "uuid": str(uuid.uuid4()), + "username": random_chars(50), + "nonce": random.randint(20, 100), } - sql = '''INSERT INTO journalists (uuid, username, session_nonce) - VALUES (:uuid, :username, :nonce)''' + sql = """INSERT INTO journalists (uuid, username, session_nonce) + VALUES (:uuid, :username, :nonce)""" return db.engine.execute(text(sql), **params).lastrowid def add_revoked_token(self): params = { - 'journalist_id': self.journalist_id, - 'token': 'abc123', + "journalist_id": self.journalist_id, + "token": "abc123", } - sql = '''INSERT INTO revoked_tokens (journalist_id, token) + sql = """INSERT INTO revoked_tokens (journalist_id, token) VALUES (:journalist_id, :token) - ''' + """ db.engine.execute(text(sql), **params) def load_data(self): @@ -51,12 +49,12 @@ def load_data(self): def check_upgrade(self): with self.app.app_context(): sql = "SELECT session_nonce FROM journalists WHERE id = :id" - params = {'id': self.journalist_id} + params = {"id": self.journalist_id} try: db.engine.execute(text(sql), **params).fetchall() except OperationalError: pass - sql = 'SELECT * FROM revoked_tokens WHERE id = :id' + sql = "SELECT * FROM revoked_tokens WHERE id = :id" try: db.engine.execute(text(sql), **params).fetchall() except OperationalError: @@ -64,19 +62,15 @@ def check_upgrade(self): class DowngradeTester(Helper): - def __init__(self, config): Helper.__init__(self) self.config = config self.app = create_app(config) def create_journalist(self): - params = { - 'uuid': str(uuid.uuid4()), - 'username': random_chars(50) - } - sql = '''INSERT INTO journalists (uuid, username) - VALUES (:uuid, :username)''' + params = {"uuid": str(uuid.uuid4()), "username": random_chars(50)} + sql = """INSERT INTO journalists (uuid, username) + VALUES (:uuid, :username)""" return db.engine.execute(text(sql), **params).lastrowid def load_data(self): @@ -88,9 +82,9 @@ def check_downgrade(self): sql = "SELECT session_nonce FROM journalists WHERE id = :id" params = {"id": self.journalist_id} res = db.engine.execute(text(sql), **params).fetchone() - assert(isinstance(res['session_nonce'], int)) + assert isinstance(res["session_nonce"], int) sql = """INSERT INTO revoked_tokens (journalist_id, token) VALUES (:journalist_id, :token)""" params = {"journalist_id": self.journalist_id, "token": "abc789"} res = db.engine.execute(text(sql), **params).lastrowid - assert(isinstance(res, int)) + assert isinstance(res, int) diff --git a/securedrop/tests/test_2fa.py b/securedrop/tests/test_2fa.py index 0e5b5315dbc..11fab0ecf24 100644 --- a/securedrop/tests/test_2fa.py +++ b/securedrop/tests/test_2fa.py @@ -84,9 +84,7 @@ def test_totp_reuse_protections2(journalist_app, test_journo, hardening): with journalist_app.app_context(): Journalist.login(test_journo["username"], test_journo["password"], token) with pytest.raises(BadTokenException): - Journalist.login( - test_journo["username"], test_journo["password"], token - ) + Journalist.login(test_journo["username"], test_journo["password"], token) def test_totp_reuse_protections3(journalist_app, test_journo, hardening): @@ -98,9 +96,7 @@ def test_totp_reuse_protections3(journalist_app, test_journo, hardening): with journalist_app.app_context(): Journalist.login(test_journo["username"], test_journo["password"], token) with pytest.raises(BadTokenException): - Journalist.login( - test_journo["username"], test_journo["password"], token + " " - ) + Journalist.login(test_journo["username"], test_journo["password"], token + " ") def test_totp_reuse_protections4(journalist_app, test_journo, hardening): @@ -116,13 +112,9 @@ def test_totp_reuse_protections4(journalist_app, test_journo, hardening): with journalist_app.app_context(): Journalist.login(test_journo["username"], test_journo["password"], token) with pytest.raises(BadTokenException): - Journalist.login( - test_journo["username"], test_journo["password"], invalid_token - ) + Journalist.login(test_journo["username"], test_journo["password"], invalid_token) with pytest.raises(BadTokenException): - Journalist.login( - test_journo["username"], test_journo["password"], token - ) + Journalist.login(test_journo["username"], test_journo["password"], token) def test_bad_token_fails_to_verify_on_admin_new_user_two_factor_page( @@ -164,9 +156,7 @@ def test_bad_token_fails_to_verify_on_new_user_two_factor_page( login_user(app, test_journo) # Submit the token once with InstrumentedApp(journalist_app) as ins: - resp = app.post( - url_for("account.new_two_factor"), data=dict(token=invalid_token) - ) + resp = app.post(url_for("account.new_two_factor"), data=dict(token=invalid_token)) assert resp.status_code == 200 ins.assert_message_flashed( diff --git a/securedrop/tests/test_integration.py b/securedrop/tests/test_integration.py index 2fcf13196c9..8cb8c0e6a41 100644 --- a/securedrop/tests/test_integration.py +++ b/securedrop/tests/test_integration.py @@ -270,9 +270,7 @@ def _helper_test_reply( text = resp.data.decode("utf-8") assert "Sources" in text soup = BeautifulSoup(resp.data, "html.parser") - col_url = soup.select("table#collections tr.source > th.designation a")[0][ - "href" - ] + col_url = soup.select("table#collections tr.source > th.designation a")[0]["href"] resp = app.get(col_url) assert resp.status_code == 200 @@ -386,9 +384,7 @@ def _can_decrypt_with_journalist_secret_key(msg: bytes) -> None: encryption_mgr = EncryptionManager.get_default() with import_journalist_private_key(encryption_mgr): # For GPG 2.1+, a non null passphrase _must_ be passed to decrypt() - decryption_result = encryption_mgr._gpg.decrypt( - msg, passphrase="dummy passphrase" - ) + decryption_result = encryption_mgr._gpg.decrypt(msg, passphrase="dummy passphrase") assert decryption_result.ok, "Could not decrypt msg with key, gpg says: {}".format( decryption_result.stderr @@ -464,9 +460,7 @@ def test_delete_collection(mocker, source_app, journalist_app, test_journo): resp = app.get("/") # navigate to the collection page soup = BeautifulSoup(resp.data.decode("utf-8"), "html.parser") - first_col_url = soup.select("table#collections tr.source > th.designation a")[ - 0 - ]["href"] + first_col_url = soup.select("table#collections tr.source > th.designation a")[0]["href"] resp = app.get(first_col_url) assert resp.status_code == 200 @@ -481,11 +475,7 @@ def test_delete_collection(mocker, source_app, journalist_app, test_journo): text = resp.data.decode("utf-8") assert ( - escape( - "The account and data for the source {} have been deleted.".format( - col_name - ) - ) + escape("The account and data for the source {} have been deleted.".format(col_name)) in text ) @@ -596,9 +586,7 @@ def test_filenames(source_app, journalist_app, test_journo): _login_user(app, test_journo) resp = app.get("/") soup = BeautifulSoup(resp.data.decode("utf-8"), "html.parser") - first_col_url = soup.select("table#collections tr.source > th.designation a")[ - 0 - ]["href"] + first_col_url = soup.select("table#collections tr.source > th.designation a")[0]["href"] resp = app.get(first_col_url) assert resp.status_code == 200 @@ -626,9 +614,7 @@ def test_filenames_delete(journalist_app, source_app, test_journo): _login_user(app, test_journo) resp = app.get("/") soup = BeautifulSoup(resp.data.decode("utf-8"), "html.parser") - first_col_url = soup.select("table#collections tr.source > th.designation a")[ - 0 - ]["href"] + first_col_url = soup.select("table#collections tr.source > th.designation a")[0]["href"] resp = app.get(first_col_url) assert resp.status_code == 200 soup = BeautifulSoup(resp.data.decode("utf-8"), "html.parser") @@ -641,21 +627,15 @@ def test_filenames_delete(journalist_app, source_app, test_journo): # test filenames and sort order submission_filename_re = r"^{0}-[a-z0-9-_]+(-msg|-doc\.gz)\.gpg$" filename = str( - soup.select("table#submissions tr.submission > th.filename a")[0].contents[ - 0 - ] + soup.select("table#submissions tr.submission > th.filename a")[0].contents[0] ) assert re.match(submission_filename_re.format(1), filename) filename = str( - soup.select("table#submissions tr.submission > th.filename a")[1].contents[ - 0 - ] + soup.select("table#submissions tr.submission > th.filename a")[1].contents[0] ) assert re.match(submission_filename_re.format(3), filename) filename = str( - soup.select("table#submissions tr.submission > th.filename a")[2].contents[ - 0 - ] + soup.select("table#submissions tr.submission > th.filename a")[2].contents[0] ) assert re.match(submission_filename_re.format(4), filename) diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index 894c6f946b1..45890845e9a 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -117,9 +117,7 @@ def test_reply_error_logging(journalist_app, test_journo, test_source): test_journo["otp_secret"], ) with patch.object(journalist_app.logger, "error") as mocked_error_logger: - with patch.object( - db.session, "commit", side_effect=exception_class(exception_msg) - ): + with patch.object(db.session, "commit", side_effect=exception_class(exception_msg)): resp = app.post( url_for("main.reply"), data={ @@ -141,9 +139,7 @@ def test_reply_error_logging(journalist_app, test_journo, test_source): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_reply_error_flashed_message( - config, journalist_app, test_journo, test_source, locale -): +def test_reply_error_flashed_message(config, journalist_app, test_journo, test_source, locale): exception_class = StaleDataError with journalist_app.test_client() as app: @@ -173,9 +169,7 @@ def test_reply_error_flashed_message( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_empty_replies_are_rejected( - config, journalist_app, test_journo, test_source, locale -): +def test_empty_replies_are_rejected(config, journalist_app, test_journo, test_source, locale): with journalist_app.test_client() as app: _login_user( app, @@ -197,9 +191,7 @@ def test_empty_replies_are_rejected( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_nonempty_replies_are_accepted( - config, journalist_app, test_journo, test_source, locale -): +def test_nonempty_replies_are_accepted(config, journalist_app, test_journo, test_source, locale): with journalist_app.test_client() as app: _login_user( app, @@ -226,9 +218,7 @@ def test_successful_reply_marked_as_seen_by_sender( ): with journalist_app.test_client() as app: journo = test_journo["journalist"] - _login_user( - app, journo.username, test_journo["password"], test_journo["otp_secret"] - ) + _login_user(app, journo.username, test_journo["password"], test_journo["otp_secret"]) seen_reply = SeenReply.query.filter_by(journalist_id=journo.id).one_or_none() assert not seen_reply @@ -307,9 +297,7 @@ def test_login_throttle(config, journalist_app, test_journo, locale): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_login_throttle_is_not_global( - config, journalist_app, test_journo, test_admin, locale -): +def test_login_throttle_is_not_global(config, journalist_app, test_journo, test_admin, locale): """The login throttling should be per-user, not global. Global login throttling can prevent all users logging into the application.""" @@ -384,9 +372,7 @@ def test_login_invalid_credentials(config, journalist_app, test_journo, locale): with journalist_app.test_client() as app: resp = app.post( url_for("main.login", l=locale), - data=dict( - username=test_journo["username"], password="invalid", token="mocked" - ), + data=dict(username=test_journo["username"], password="invalid", token="mocked"), ) assert page_language(resp.data) == language_tag(locale) msgids = ["Login failed."] @@ -596,9 +582,7 @@ def test_admin_delete_user(config, journalist_app, test_admin, test_journo, loca @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_cannot_delete_self( - config, journalist_app, test_admin, test_journo, locale -): +def test_admin_cannot_delete_self(config, journalist_app, test_admin, test_journo, locale): # Verify journalist is in the database with journalist_app.app_context(): assert Journalist.query.get(test_journo["id"]) is not None @@ -628,36 +612,20 @@ def test_admin_cannot_delete_self( # The user can be edited and deleted assert ( - escape( - gettext("Edit user {username}").format( - username=test_journo["username"] - ) - ) + escape(gettext("Edit user {username}").format(username=test_journo["username"])) in resp_text ) assert ( - escape( - gettext("Delete user {username}").format( - username=test_journo["username"] - ) - ) + escape(gettext("Delete user {username}").format(username=test_journo["username"])) in resp_text ) # The admin can be edited but cannot deleted assert ( - escape( - gettext("Edit user {username}").format( - username=test_admin["username"] - ) - ) + escape(gettext("Edit user {username}").format(username=test_admin["username"])) in resp_text ) assert ( - escape( - gettext("Delete user {username}").format( - username=test_admin["username"] - ) - ) + escape(gettext("Delete user {username}").format(username=test_admin["username"])) not in resp_text ) @@ -775,9 +743,7 @@ def test_admin_edits_user_password_error_response( test_admin["otp_secret"], ) - with patch( - "sqlalchemy.orm.scoping.scoped_session.commit", side_effect=Exception() - ): + with patch("sqlalchemy.orm.scoping.scoped_session.commit", side_effect=Exception()): with InstrumentedApp(journalist_app) as ins: resp = app.post( url_for("admin.new_password", user_id=test_journo["id"], l=locale), @@ -796,9 +762,7 @@ def test_admin_edits_user_password_error_response( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_user_edits_password_success_response( - config, journalist_app, test_journo, locale -): +def test_user_edits_password_success_response(config, journalist_app, test_journo, locale): original_hardening = models.LOGIN_HARDENING try: # Set this to false because we login then immediately reuse the same @@ -872,9 +836,7 @@ def test_user_edits_password_expires_session(journalist_app, test_journo): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_user_edits_password_error_response( - config, journalist_app, test_journo, locale -): +def test_user_edits_password_error_response(config, journalist_app, test_journo, locale): original_hardening = models.LOGIN_HARDENING try: # Set this to false because we login then immediately reuse the same @@ -893,9 +855,7 @@ def test_user_edits_password_error_response( # patch token verification because there are multiple commits # to the database and this isolates the one we want to fail with patch.object(Journalist, "verify_token", return_value=True): - with patch.object( - db.session, "commit", side_effect=[None, Exception()] - ): + with patch.object(db.session, "commit", side_effect=[None, Exception()]): with InstrumentedApp(journalist_app) as ins: resp = app.post( url_for("account.new_password", l=locale), @@ -923,9 +883,7 @@ def test_user_edits_password_error_response( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_add_user_when_username_already_taken( - config, journalist_app, test_admin, locale -): +def test_admin_add_user_when_username_already_taken(config, journalist_app, test_admin, locale): with journalist_app.test_client() as client: _login_user( client, @@ -994,9 +952,7 @@ def test_min_password_length(): Journalist(username="My Password is Too Small!", password=password) -def test_admin_edits_user_password_too_long_warning( - journalist_app, test_admin, test_journo -): +def test_admin_edits_user_password_too_long_warning(journalist_app, test_admin, test_journo): # append a bunch of a's to a diceware password to keep it "diceware-like" overly_long_password = VALID_PASSWORD + "a" * ( Journalist.MAX_PASSWORD_LEN - len(VALID_PASSWORD) + 1 @@ -1064,9 +1020,7 @@ def test_user_edits_password_too_long_warning(journalist_app, test_journo): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_add_user_password_too_long_warning( - config, journalist_app, test_admin, locale -): +def test_admin_add_user_password_too_long_warning(config, journalist_app, test_admin, locale): overly_long_password = VALID_PASSWORD + "a" * ( Journalist.MAX_PASSWORD_LEN - len(VALID_PASSWORD) + 1 ) @@ -1102,9 +1056,7 @@ def test_admin_add_user_password_too_long_warning( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_add_user_first_name_too_long_warning( - config, journalist_app, test_admin, locale -): +def test_admin_add_user_first_name_too_long_warning(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: overly_long_name = "a" * (Journalist.MAX_NAME_LEN + 1) _login_user( @@ -1137,9 +1089,7 @@ def test_admin_add_user_first_name_too_long_warning( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_add_user_last_name_too_long_warning( - config, journalist_app, test_admin, locale -): +def test_admin_add_user_last_name_too_long_warning(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: overly_long_name = "a" * (Journalist.MAX_NAME_LEN + 1) _login_user( @@ -1189,9 +1139,7 @@ def test_admin_edits_user_invalid_username_deleted( with InstrumentedApp(journalist_app) as ins: resp = app.post( url_for("admin.edit_user", user_id=test_admin["id"], l=locale), - data=dict( - username=new_username, first_name="", last_name="", is_admin=None - ), + data=dict(username=new_username, first_name="", last_name="", is_admin=None), follow_redirects=True, ) @@ -1206,9 +1154,7 @@ def test_admin_edits_user_invalid_username_deleted( ) -def test_admin_resets_user_hotp_format_non_hexa( - journalist_app, test_admin, test_journo -): +def test_admin_resets_user_hotp_format_non_hexa(journalist_app, test_admin, test_journo): with journalist_app.test_client() as app: _login_user( @@ -1241,8 +1187,7 @@ def test_admin_resets_user_hotp_format_non_hexa( assert journo.is_totp ins.assert_message_flashed( - "Invalid HOTP secret format: please only submit letters A-F and " - "numbers 0-9.", + "Invalid HOTP secret format: please only submit letters A-F and " "numbers 0-9.", "error", ) @@ -1283,9 +1228,7 @@ def test_admin_resets_user_hotp_format_too_short( ins.assert_message_flashed( "HOTP secrets are 40 characters long" - " - you have entered {num}.".format( - num=len(the_secret.replace(" ", "")) - ), + " - you have entered {num}.".format(num=len(the_secret.replace(" ", ""))), "error", ) @@ -1317,9 +1260,7 @@ def test_admin_resets_user_hotp(journalist_app, test_admin, test_journo): assert not journo.is_totp # Redirect to admin 2FA view - ins.assert_redirects( - resp, url_for("admin.new_user_two_factor", uid=journo.id) - ) + ins.assert_redirects(resp, url_for("admin.new_user_two_factor", uid=journo.id)) def test_admin_resets_user_hotp_error(mocker, journalist_app, test_admin, test_journo): @@ -1412,8 +1353,7 @@ def test_user_resets_user_hotp_format_non_hexa(journalist_app, test_journo): data=dict(otp_secret=non_hexa_secret), ) ins.assert_message_flashed( - "Invalid HOTP secret format: " - "please only submit letters A-F and numbers 0-9.", + "Invalid HOTP secret format: " "please only submit letters A-F and numbers 0-9.", "error", ) @@ -1479,9 +1419,7 @@ def test_admin_resets_user_totp(journalist_app, test_admin, test_journo): resp = app.post( url_for("admin.reset_two_factor_totp"), data=dict(uid=test_journo["id"]) ) - ins.assert_redirects( - resp, url_for("admin.new_user_two_factor", uid=test_journo["id"]) - ) + ins.assert_redirects(resp, url_for("admin.new_user_two_factor", uid=test_journo["id"])) # Re-fetch journalist to get fresh DB instance user = Journalist.query.get(test_journo["id"]) @@ -1515,9 +1453,7 @@ def test_user_resets_totp(journalist_app, test_journo): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_resets_hotp_with_missing_otp_secret_key( - config, journalist_app, test_admin, locale -): +def test_admin_resets_hotp_with_missing_otp_secret_key(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: _login_user( app, @@ -1616,16 +1552,12 @@ def test_admin_add_user(journalist_app, test_admin): ) new_user = Journalist.query.filter_by(username=username).one() - ins.assert_redirects( - resp, url_for("admin.new_user_two_factor", uid=new_user.id) - ) + ins.assert_redirects(resp, url_for("admin.new_user_two_factor", uid=new_user.id)) @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_add_user_with_invalid_username( - config, journalist_app, test_admin, locale -): +def test_admin_add_user_with_invalid_username(config, journalist_app, test_admin, locale): username = "deleted" with journalist_app.test_client() as app: @@ -1670,9 +1602,7 @@ def test_deleted_user_cannot_login(config, journalist_app, locale): with journalist_app.test_client() as app: resp = app.post( url_for("main.login", l=locale), - data=dict( - username="deleted", password=password, token=TOTP(otp_secret).now() - ), + data=dict(username="deleted", password=password, token=TOTP(otp_secret).now()), ) assert page_language(resp.data) == language_tag(locale) msgids = [ @@ -1777,8 +1707,8 @@ def test_admin_add_user_too_short_username(config, journalist_app, test_admin, l ( (locale, "a" * i) for locale in get_test_locales() - for i in get_plural_tests()[locale] - if i != 0 # pylint: disable=undefined-variable + for i in get_plural_tests()[locale] # pylint: disable=undefined-variable + if i != 0 ), ) def test_admin_add_user_yubikey_odd_length(journalist_app, test_admin, locale, secret): @@ -1819,9 +1749,7 @@ def test_admin_add_user_yubikey_odd_length(journalist_app, test_admin, locale, s "locale, secret", ((locale, " " * i) for locale in get_test_locales() for i in range(3)), ) -def test_admin_add_user_yubikey_blank_secret( - journalist_app, test_admin, locale, secret -): +def test_admin_add_user_yubikey_blank_secret(journalist_app, test_admin, locale, secret): with journalist_app.test_client() as app: _login_user( app, @@ -1887,9 +1815,7 @@ def test_admin_add_user_yubikey_valid_length(journalist_app, test_admin, locale) @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_add_user_yubikey_correct_length_with_whitespace( - journalist_app, test_admin, locale -): +def test_admin_add_user_yubikey_correct_length_with_whitespace(journalist_app, test_admin, locale): otp = "12 34 56 78 90 12 34 56 78 90 12 34 56 78 90 12 34 56 78 90" with journalist_app.test_client() as app: @@ -2034,9 +1960,7 @@ def test_admin_adds_first_name_last_name_to_user(journalist_app, test_admin): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_adds_invalid_first_last_name_to_user( - config, journalist_app, test_admin, locale -): +def test_admin_adds_invalid_first_last_name_to_user(config, journalist_app, test_admin, locale): with journalist_app.test_client() as client: new_user = "admin-invalid-first-name-last-name-user-test" @@ -2085,9 +2009,7 @@ def test_admin_adds_invalid_first_last_name_to_user( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_admin_add_user_integrity_error( - config, journalist_app, test_admin, mocker, locale -): +def test_admin_add_user_integrity_error(config, journalist_app, test_admin, mocker, locale): mocked_error_logger = mocker.patch("journalist_app.admin.current_app.logger.error") mocker.patch( "journalist_app.admin.Journalist", @@ -2156,9 +2078,7 @@ def test_prevent_document_uploads(config, journalist_app, test_admin, locale): assert page_language(resp.data) == language_tag(locale) msgids = ["Preferences saved."] with xfail_untranslated_messages(config, locale, msgids): - ins.assert_message_flashed( - gettext(msgids[0]), "submission-preferences-success" - ) + ins.assert_message_flashed(gettext(msgids[0]), "submission-preferences-success") @flaky(rerun_filter=utils.flaky_filter_xfail) @@ -2171,9 +2091,7 @@ def test_no_prevent_document_uploads(config, journalist_app, test_admin, locale) test_admin["password"], test_admin["otp_secret"], ) - form = journalist_app_module.forms.SubmissionPreferencesForm( - min_message_length=0 - ) + form = journalist_app_module.forms.SubmissionPreferencesForm(min_message_length=0) app.post( url_for("admin.update_submission_preferences"), data=form.data, @@ -2190,9 +2108,7 @@ def test_no_prevent_document_uploads(config, journalist_app, test_admin, locale) assert page_language(resp.data) == language_tag(locale) msgids = ["Preferences saved."] with xfail_untranslated_messages(config, locale, msgids): - ins.assert_message_flashed( - gettext(msgids[0]), "submission-preferences-success" - ) + ins.assert_message_flashed(gettext(msgids[0]), "submission-preferences-success") def test_prevent_document_uploads_invalid(journalist_app, test_admin): @@ -2261,9 +2177,7 @@ def test_message_filtering(config, journalist_app, test_admin): # Still 0 assert InstanceConfig.get_current().initial_message_min_len == 0 html = resp.data.decode("utf-8") - assert ( - "To configure a minimum message length, you must set the required" in html - ) + assert "To configure a minimum message length, you must set the required" in html # Now tick the "prevent short messages" checkbox form = journalist_app_module.forms.SubmissionPreferencesForm( @@ -2283,14 +2197,10 @@ def test_message_filtering(config, journalist_app, test_admin): follow_redirects=True, ) html = resp.data.decode("utf-8") - assert ( - "To configure a minimum message length, you must set the required" in html - ) + assert "To configure a minimum message length, you must set the required" in html # Now rejecting codenames assert InstanceConfig.get_current().reject_message_with_codename is False - form = journalist_app_module.forms.SubmissionPreferencesForm( - reject_codename_messages=True - ) + form = journalist_app_module.forms.SubmissionPreferencesForm(reject_codename_messages=True) app.post( url_for("admin.update_submission_preferences"), data=form.data, @@ -2418,16 +2328,12 @@ def test_orgname_html_escaped(config, journalist_app, test_admin, locale): msgids = ["Preferences saved."] with xfail_untranslated_messages(config, locale, msgids): ins.assert_message_flashed(gettext(msgids[0]), "org-name-success") - assert InstanceConfig.get_current().organization_name == htmlescape( - t_name, quote=True - ) + assert InstanceConfig.get_current().organization_name == htmlescape(t_name, quote=True) def test_logo_default_available(journalist_app): # if the custom image is available, this test will fail - custom_image_location = os.path.join( - config.SECUREDROP_ROOT, "static/i/custom_logo.png" - ) + custom_image_location = os.path.join(config.SECUREDROP_ROOT, "static/i/custom_logo.png") if os.path.exists(custom_image_location): os.remove(custom_image_location) @@ -2440,9 +2346,7 @@ def test_logo_default_available(journalist_app): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_logo_upload_with_valid_image_succeeds( - config, journalist_app, test_admin, locale -): +def test_logo_upload_with_valid_image_succeeds(config, journalist_app, test_admin, locale): # Save original logo to restore after test run logo_image_location = os.path.join(config.SECUREDROP_ROOT, "static/i/logo.png") with io.open(logo_image_location, "rb") as logo_file: @@ -2462,9 +2366,7 @@ def test_logo_upload_with_valid_image_succeeds( test_admin["otp_secret"], ) # Create 1px * 1px 'white' PNG file from its base64 string - form = journalist_app_module.forms.LogoForm( - logo=(BytesIO(logo_bytes), "test.png") - ) + form = journalist_app_module.forms.LogoForm(logo=(BytesIO(logo_bytes), "test.png")) # Create 1px * 1px 'white' PNG file from its base64 string form = journalist_app_module.forms.LogoForm(logo=(BytesIO(logo_bytes), "test.png")) with InstrumentedApp(journalist_app) as ins: @@ -2492,9 +2394,7 @@ def test_logo_upload_with_valid_image_succeeds( @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_logo_upload_with_invalid_filetype_fails( - config, journalist_app, test_admin, locale -): +def test_logo_upload_with_invalid_filetype_fails(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: _login_user( app, @@ -2503,9 +2403,7 @@ def test_logo_upload_with_invalid_filetype_fails( test_admin["otp_secret"], ) - form = journalist_app_module.forms.LogoForm( - logo=(BytesIO(b"filedata"), "bad.exe") - ) + form = journalist_app_module.forms.LogoForm(logo=(BytesIO(b"filedata"), "bad.exe")) with InstrumentedApp(journalist_app) as ins: resp = app.post( url_for("admin.manage_config", l=locale), @@ -2557,9 +2455,7 @@ def test_logo_upload_save_fails(config, journalist_app, test_admin, locale): ) assert page_language(resp.data) == language_tag(locale) - msgids = [ - "Unable to process the image file. Please try another one." - ] + msgids = ["Unable to process the image file. Please try another one."] with xfail_untranslated_messages(config, locale, msgids): ins.assert_message_flashed(gettext(msgids[0]), "logo-error") finally: @@ -2584,9 +2480,7 @@ def test_creation_of_ossec_test_log_event(journalist_app, test_admin, mocker): @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) -def test_logo_upload_with_empty_input_field_fails( - config, journalist_app, test_admin, locale -): +def test_logo_upload_with_empty_input_field_fails(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: _login_user( app, @@ -2698,9 +2592,7 @@ def test_incorrect_current_password_change(config, journalist_app, test_journo, with InstrumentedApp(journalist_app) as ins: resp = app.post( url_for("account.new_password", l=locale), - data=dict( - password=VALID_PASSWORD, token="mocked", current_password="badpw" - ), + data=dict(password=VALID_PASSWORD, token="mocked", current_password="badpw"), follow_redirects=True, ) assert page_language(resp.data) == language_tag(locale) @@ -2712,9 +2604,7 @@ def test_incorrect_current_password_change(config, journalist_app, test_journo, ), ] with xfail_untranslated_messages(config, locale, msgids): - ins.assert_message_flashed( - gettext(msgids[0]) + " " + gettext(msgids[1]), "error" - ) + ins.assert_message_flashed(gettext(msgids[0]) + " " + gettext(msgids[1]), "error") # need a journalist app for the app context @@ -2971,9 +2861,7 @@ def test_delete_data_deletes_submissions_retaining_source( assert len(source.collection) == 0 -def test_delete_source_deletes_submissions( - journalist_app, test_journo, test_source, app_storage -): +def test_delete_source_deletes_submissions(journalist_app, test_journo, test_source, app_storage): """Verify that when a source is deleted, the submissions that correspond to them are also deleted.""" @@ -2990,9 +2878,7 @@ def test_delete_source_deletes_submissions( assert res is None -def test_delete_collection_updates_db( - journalist_app, test_journo, test_source, app_storage -): +def test_delete_collection_updates_db(journalist_app, test_journo, test_source, app_storage): """ Verify that when a source is deleted, the Source record is deleted and all records associated with the source are deleted. @@ -3042,9 +2928,7 @@ def test_delete_collection_updates_db( assert not seen_reply -def test_delete_source_deletes_source_key( - journalist_app, test_source, test_journo, app_storage -): +def test_delete_source_deletes_source_key(journalist_app, test_source, test_journo, app_storage): """Verify that when a source is deleted, the PGP key that corresponds to them is also deleted.""" @@ -3119,9 +3003,7 @@ def test_bulk_delete_deletes_db_entries( file_list.extend(replies) with journalist_app.test_request_context("/"): - journalist_app_module.utils.bulk_delete( - test_source["filesystem_id"], file_list - ) + journalist_app_module.utils.bulk_delete(test_source["filesystem_id"], file_list) def db_assertion(): subs = Submission.query.filter_by(source_id=source.id).all() @@ -3164,9 +3046,7 @@ def test_bulk_delete_works_when_files_absent( with journalist_app.test_request_context("/"): with patch("store.Storage.move_to_shredder") as delMock: delMock.side_effect = ValueError - journalist_app_module.utils.bulk_delete( - test_source["filesystem_id"], file_list - ) + journalist_app_module.utils.bulk_delete(test_source["filesystem_id"], file_list) def db_assertion(): subs = Submission.query.filter_by(source_id=source.id).all() @@ -3183,9 +3063,7 @@ def test_login_with_invalid_password_doesnt_call_argon2(mocker, test_journo): invalid_pw = "a" * (Journalist.MAX_PASSWORD_LEN + 1) with pytest.raises(InvalidPasswordLength): - Journalist.login( - test_journo["username"], invalid_pw, TOTP(test_journo["otp_secret"]).now() - ) + Journalist.login(test_journo["username"], invalid_pw, TOTP(test_journo["otp_secret"]).now()) assert not mock_argon2.called @@ -3241,9 +3119,7 @@ def test_download_selected_submissions_and_replies( replies = utils.db_helper.reply(app_storage, journo, source, 4) selected_submissions = random.sample(submissions, 2) selected_replies = random.sample(replies, 2) - selected = [ - submission.filename for submission in selected_submissions + selected_replies - ] + selected = [submission.filename for submission in selected_submissions + selected_replies] selected.sort() with journalist_app.test_client() as app: @@ -3287,9 +3163,7 @@ def test_download_selected_submissions_and_replies( # The items not selected are absent from the zipfile not_selected_submissions = set(submissions).difference(selected_submissions) not_selected_replies = set(replies).difference(selected_replies) - not_selected = [ - i.filename for i in not_selected_submissions.union(not_selected_replies) - ] + not_selected = [i.filename for i in not_selected_submissions.union(not_selected_replies)] for filename in not_selected: with pytest.raises(KeyError): zipfile.ZipFile(BytesIO(resp.data)).getinfo( @@ -3311,17 +3185,13 @@ def test_download_selected_submissions_and_replies_previously_seen( replies = utils.db_helper.reply(app_storage, journo, source, 4) selected_submissions = random.sample(submissions, 2) selected_replies = random.sample(replies, 2) - selected = [ - submission.filename for submission in selected_submissions + selected_replies - ] + selected = [submission.filename for submission in selected_submissions + selected_replies] selected.sort() # Mark selected files, messages, and replies as seen seen_file = SeenFile(file_id=selected_submissions[0].id, journalist_id=journo.id) db.session.add(seen_file) - seen_message = SeenMessage( - message_id=selected_submissions[1].id, journalist_id=journo.id - ) + seen_message = SeenMessage(message_id=selected_submissions[1].id, journalist_id=journo.id) db.session.add(seen_message) mark_seen(selected_replies, journo) db.session.commit() @@ -3367,9 +3237,7 @@ def test_download_selected_submissions_and_replies_previously_seen( # The items not selected are absent from the zipfile not_selected_submissions = set(submissions).difference(selected_submissions) not_selected_replies = set(replies).difference(selected_replies) - not_selected = [ - i.filename for i in not_selected_submissions.union(not_selected_replies) - ] + not_selected = [i.filename for i in not_selected_submissions.union(not_selected_replies)] for filename in not_selected: with pytest.raises(KeyError): zipfile.ZipFile(BytesIO(resp.data)).getinfo( @@ -3391,9 +3259,7 @@ def test_download_selected_submissions_previously_downloaded( replies = utils.db_helper.reply(app_storage, journo, source, 4) selected_submissions = random.sample(submissions, 2) selected_replies = random.sample(replies, 2) - selected = [ - submission.filename for submission in selected_submissions + selected_replies - ] + selected = [submission.filename for submission in selected_submissions + selected_replies] selected.sort() # Mark selected submissions as downloaded @@ -3431,9 +3297,7 @@ def test_download_selected_submissions_previously_downloaded( # The items not selected are absent from the zipfile not_selected_submissions = set(submissions).difference(selected_submissions) not_selected_replies = set(replies).difference(selected_replies) - not_selected = [ - i.filename for i in not_selected_submissions.union(not_selected_replies) - ] + not_selected = [i.filename for i in not_selected_submissions.union(not_selected_replies)] for filename in not_selected: with pytest.raises(KeyError): zipfile.ZipFile(BytesIO(resp.data)).getinfo( @@ -3475,9 +3339,7 @@ def test_download_selected_submissions_missing_files( journo = Journalist.query.get(test_journo["id"]) with journalist_app.test_client() as app: - _login_user( - app, journo.username, test_journo["password"], test_journo["otp_secret"] - ) + _login_user(app, journo.username, test_journo["password"], test_journo["otp_secret"]) resp = app.post( url_for("main.bulk"), data=dict( @@ -3516,9 +3378,7 @@ def test_download_single_submission_missing_file( missing_file = selected_missing_files[0] with journalist_app.test_client() as app: - _login_user( - app, journo.username, test_journo["password"], test_journo["otp_secret"] - ) + _login_user(app, journo.username, test_journo["password"], test_journo["otp_secret"]) resp = app.get( url_for( "col.download_single_file", @@ -3549,9 +3409,7 @@ def test_download_unread_all_sources(journalist_app, test_journo, app_storage): bulk = utils.db_helper.bulk_setup_for_seen_only(journo, app_storage) with journalist_app.test_client() as app: - _login_user( - app, journo.username, test_journo["password"], test_journo["otp_secret"] - ) + _login_user(app, journo.username, test_journo["password"], test_journo["otp_secret"]) # Select all sources supplied from bulk_download_setup selected = [] @@ -3587,9 +3445,7 @@ def test_download_unread_all_sources(journalist_app, test_journo, app_storage): os.path.join( "unread", source.journalist_designation, - "{}_{}".format( - item.filename.split("-")[0], source.last_updated.date() - ), + "{}_{}".format(item.filename.split("-")[0], source.last_updated.date()), item.filename, ) ) @@ -3612,9 +3468,7 @@ def test_download_unread_all_sources(journalist_app, test_journo, app_storage): os.path.join( "unread", source.journalist_designation, - "{}_{}".format( - item.filename.split("-")[0], source.last_updated.date() - ), + "{}_{}".format(item.filename.split("-")[0], source.last_updated.date()), item.filename, ) ) @@ -3631,9 +3485,7 @@ def test_download_all_selected_sources(journalist_app, test_journo, app_storage) bulk = utils.db_helper.bulk_setup_for_seen_only(journo, app_storage) with journalist_app.test_client() as app: - _login_user( - app, journo.username, test_journo["password"], test_journo["otp_secret"] - ) + _login_user(app, journo.username, test_journo["password"], test_journo["otp_secret"]) # Select all sources supplied from bulk_download_setup selected = [] @@ -3663,16 +3515,12 @@ def test_download_all_selected_sources(journalist_app, test_journo, app_storage) not_downloaded = i["not_downloaded"] # Check that the zip file contains all submissions for the source - for item in ( - not_downloaded + unseen_files + unseen_messages + seen_files + seen_messages - ): + for item in not_downloaded + unseen_files + unseen_messages + seen_files + seen_messages: zipinfo = zipfile.ZipFile(BytesIO(resp.data)).getinfo( os.path.join( "all", source.journalist_designation, - "{}_{}".format( - item.filename.split("-")[0], source.last_updated.date() - ), + "{}_{}".format(item.filename.split("-")[0], source.last_updated.date()), item.filename, ) ) @@ -3695,18 +3543,14 @@ def test_download_all_selected_sources(journalist_app, test_journo, app_storage) os.path.join( "unread", source.journalist_designation, - "{}_{}".format( - item.filename.split("-")[0], source.last_updated.date() - ), + "{}_{}".format(item.filename.split("-")[0], source.last_updated.date()), item.filename, ) ) assert zipinfo -def test_single_source_is_successfully_starred( - journalist_app, test_journo, test_source -): +def test_single_source_is_successfully_starred(journalist_app, test_journo, test_source): with journalist_app.test_client() as app: _login_user( app, @@ -3715,9 +3559,7 @@ def test_single_source_is_successfully_starred( test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: - resp = app.post( - url_for("col.add_star", filesystem_id=test_source["filesystem_id"]) - ) + resp = app.post(url_for("col.add_star", filesystem_id=test_source["filesystem_id"])) ins.assert_redirects(resp, url_for("main.index")) @@ -3725,9 +3567,7 @@ def test_single_source_is_successfully_starred( assert source.star.starred -def test_single_source_is_successfully_unstarred( - journalist_app, test_journo, test_source -): +def test_single_source_is_successfully_unstarred(journalist_app, test_journo, test_source): with journalist_app.test_client() as app: _login_user( app, @@ -3740,9 +3580,7 @@ def test_single_source_is_successfully_unstarred( with InstrumentedApp(journalist_app) as ins: # Now unstar the source - resp = app.post( - url_for("col.remove_star", filesystem_id=test_source["filesystem_id"]) - ) + resp = app.post(url_for("col.remove_star", filesystem_id=test_source["filesystem_id"])) ins.assert_redirects(resp, url_for("main.index")) @@ -3805,9 +3643,7 @@ def test_csrf_error_page(config, journalist_app, locale): # because the session is being cleared when it expires, the # response should always be in English. assert page_language(resp.data) == "en-US" - assert "You have been logged out due to inactivity." in resp.data.decode( - "utf-8" - ) + assert "You have been logged out due to inactivity." in resp.data.decode("utf-8") def test_col_process_aborts_with_bad_action(journalist_app, test_journo): diff --git a/securedrop/tests/test_journalist_api.py b/securedrop/tests/test_journalist_api.py index b0636dfaf1f..818094df319 100644 --- a/securedrop/tests/test_journalist_api.py +++ b/securedrop/tests/test_journalist_api.py @@ -89,9 +89,7 @@ def test_user_cannot_get_an_api_token_with_wrong_password(journalist_app, test_j assert response.json["error"] == "Forbidden" -def test_user_cannot_get_an_api_token_with_wrong_2fa_token( - journalist_app, test_journo, hardening -): +def test_user_cannot_get_an_api_token_with_wrong_2fa_token(journalist_app, test_journo, hardening): with journalist_app.test_client() as app: response = app.post( url_for("api.get_token"), @@ -109,16 +107,12 @@ def test_user_cannot_get_an_api_token_with_wrong_2fa_token( assert response.json["error"] == "Forbidden" -def test_user_cannot_get_an_api_token_with_no_passphase_field( - journalist_app, test_journo -): +def test_user_cannot_get_an_api_token_with_no_passphase_field(journalist_app, test_journo): with journalist_app.test_client() as app: valid_token = TOTP(test_journo["otp_secret"]).now() response = app.post( url_for("api.get_token"), - data=json.dumps( - {"username": test_journo["username"], "one_time_code": valid_token} - ), + data=json.dumps({"username": test_journo["username"], "one_time_code": valid_token}), headers=get_api_headers(), ) @@ -127,16 +121,12 @@ def test_user_cannot_get_an_api_token_with_no_passphase_field( assert response.json["message"] == "passphrase field is missing" -def test_user_cannot_get_an_api_token_with_no_username_field( - journalist_app, test_journo -): +def test_user_cannot_get_an_api_token_with_no_username_field(journalist_app, test_journo): with journalist_app.test_client() as app: valid_token = TOTP(test_journo["otp_secret"]).now() response = app.post( url_for("api.get_token"), - data=json.dumps( - {"passphrase": test_journo["password"], "one_time_code": valid_token} - ), + data=json.dumps({"passphrase": test_journo["password"], "one_time_code": valid_token}), headers=get_api_headers(), ) @@ -163,9 +153,7 @@ def test_user_cannot_get_an_api_token_with_no_otp_field(journalist_app, test_jou assert response.json["message"] == "one_time_code field is missing" -def test_authorized_user_gets_all_sources( - journalist_app, test_submissions, journalist_api_token -): +def test_authorized_user_gets_all_sources(journalist_app, test_submissions, journalist_api_token): with journalist_app.test_client() as app: response = app.get( url_for("api.get_all_sources"), @@ -219,9 +207,7 @@ def test_user_without_token_cannot_get_protected_endpoints(journalist_app, test_ assert response.status_code == 403 -def test_user_without_token_cannot_del_protected_endpoints( - journalist_app, test_submissions -): +def test_user_without_token_cannot_del_protected_endpoints(journalist_app, test_submissions): with journalist_app.app_context(): uuid = test_submissions["source"].uuid protected_routes = [ @@ -271,9 +257,7 @@ def test_attacker_cannot_use_token_after_admin_deletes( assert response.status_code == 403 -def test_user_without_token_cannot_post_protected_endpoints( - journalist_app, test_source -): +def test_user_without_token_cannot_post_protected_endpoints(journalist_app, test_source): with journalist_app.app_context(): uuid = test_source["source"].uuid protected_routes = [ @@ -303,9 +287,7 @@ def test_api_error_handlers_defined(journalist_app): def test_api_error_handler_404(journalist_app, journalist_api_token): with journalist_app.test_client() as app: - response = app.get( - "/api/v1/invalidendpoint", headers=get_api_headers(journalist_api_token) - ) + response = app.get("/api/v1/invalidendpoint", headers=get_api_headers(journalist_api_token)) assert response.status_code == 404 assert response.json["error"] == "Not Found" @@ -323,9 +305,7 @@ def test_trailing_slash_cleanly_404s(journalist_app, test_source, journalist_api assert response.json["error"] == "Not Found" -def test_authorized_user_gets_single_source( - journalist_app, test_source, journalist_api_token -): +def test_authorized_user_gets_single_source(journalist_app, test_source, journalist_api_token): with journalist_app.test_client() as app: uuid = test_source["source"].uuid response = app.get( @@ -350,9 +330,7 @@ def test_get_non_existant_source_404s(journalist_app, journalist_api_token): assert response.status_code == 404 -def test_authorized_user_can_star_a_source( - journalist_app, test_source, journalist_api_token -): +def test_authorized_user_can_star_a_source(journalist_app, test_source, journalist_api_token): with journalist_app.test_client() as app: uuid = test_source["source"].uuid source_id = test_source["source"].id @@ -374,9 +352,7 @@ def test_authorized_user_can_star_a_source( assert response.json["is_starred"] is True -def test_authorized_user_can_unstar_a_source( - journalist_app, test_source, journalist_api_token -): +def test_authorized_user_can_unstar_a_source(journalist_app, test_source, journalist_api_token): with journalist_app.test_client() as app: uuid = test_source["source"].uuid source_id = test_source["source"].id @@ -393,10 +369,7 @@ def test_authorized_user_can_unstar_a_source( assert response.status_code == 200 # Verify that the source is gone. - assert ( - SourceStar.query.filter(SourceStar.source_id == source_id).one().starred - is False - ) + assert SourceStar.query.filter(SourceStar.source_id == source_id).one().starred is False # API should also report is_starred is false response = app.get( @@ -406,9 +379,7 @@ def test_authorized_user_can_unstar_a_source( assert response.json["is_starred"] is False -def test_disallowed_methods_produces_405( - journalist_app, test_source, journalist_api_token -): +def test_disallowed_methods_produces_405(journalist_app, test_source, journalist_api_token): with journalist_app.test_client() as app: uuid = test_source["source"].uuid response = app.delete( @@ -434,9 +405,7 @@ def test_authorized_user_can_get_all_submissions( submission["filename"] for submission in response.json["submissions"] ] - expected_submissions = [ - submission.filename for submission in Submission.query.all() - ] + expected_submissions = [submission.filename for submission in Submission.query.all()] assert observed_submissions == expected_submissions @@ -495,10 +464,7 @@ def test_authorized_user_can_get_single_submission( assert response.json["uuid"] == submission_uuid assert response.json["is_read"] is False - assert ( - response.json["filename"] - == test_submissions["source"].submissions[0].filename - ) + assert response.json["filename"] == test_submissions["source"].submissions[0].filename assert response.json["size"] == test_submissions["source"].submissions[0].size @@ -506,9 +472,7 @@ def test_authorized_user_can_get_all_replies_with_disconnected_replies( journalist_app, test_files, journalist_api_token ): with journalist_app.test_client() as app: - db.session.execute( - "DELETE FROM sources WHERE id = :id", {"id": test_files["source"].id} - ) + db.session.execute("DELETE FROM sources WHERE id = :id", {"id": test_files["source"].id}) response = app.get( url_for("api.get_all_replies"), headers=get_api_headers(journalist_api_token), @@ -517,9 +481,7 @@ def test_authorized_user_can_get_all_replies_with_disconnected_replies( assert response.status_code == 200 -def test_authorized_user_can_get_all_replies( - journalist_app, test_files, journalist_api_token -): +def test_authorized_user_can_get_all_replies(journalist_app, test_files, journalist_api_token): with journalist_app.test_client() as app: response = app.get( url_for("api.get_all_replies"), @@ -533,9 +495,7 @@ def test_authorized_user_can_get_all_replies( assert observed_replies == expected_replies -def test_authorized_user_get_source_replies( - journalist_app, test_files, journalist_api_token -): +def test_authorized_user_get_source_replies(journalist_app, test_files, journalist_api_token): with journalist_app.test_client() as app: uuid = test_files["source"].uuid response = app.get( @@ -550,9 +510,7 @@ def test_authorized_user_get_source_replies( assert observed_replies == expected_replies -def test_authorized_user_can_get_single_reply( - journalist_app, test_files, journalist_api_token -): +def test_authorized_user_can_get_single_reply(journalist_app, test_files, journalist_api_token): with journalist_app.test_client() as app: reply_uuid = test_files["source"].replies[0].uuid uuid = test_files["source"].uuid @@ -568,12 +526,8 @@ def test_authorized_user_can_get_single_reply( assert response.json["uuid"] == reply_uuid assert response.json["journalist_username"] == reply.journalist.username assert response.json["journalist_uuid"] == reply.journalist.uuid - assert response.json["journalist_first_name"] == ( - reply.journalist.first_name or "" - ) - assert response.json["journalist_last_name"] == ( - reply.journalist.last_name or "" - ) + assert response.json["journalist_first_name"] == (reply.journalist.first_name or "") + assert response.json["journalist_last_name"] == (reply.journalist.last_name or "") assert response.json["is_deleted_by_source"] is False assert response.json["filename"] == test_files["source"].replies[0].filename assert response.json["size"] == test_files["source"].replies[0].size @@ -599,13 +553,9 @@ def test_reply_of_deleted_journalist( assert response.json["journalist_last_name"] == "" assert response.json["is_deleted_by_source"] is False assert ( - response.json["filename"] - == test_files_deleted_journalist["source"].replies[0].filename - ) - assert ( - response.json["size"] - == test_files_deleted_journalist["source"].replies[0].size + response.json["filename"] == test_files_deleted_journalist["source"].replies[0].filename ) + assert response.json["size"] == test_files_deleted_journalist["source"].replies[0].size def test_authorized_user_can_delete_single_submission( @@ -629,9 +579,7 @@ def test_authorized_user_can_delete_single_submission( assert Submission.query.filter(Submission.uuid == submission_uuid).all() == [] -def test_authorized_user_can_delete_single_reply( - journalist_app, test_files, journalist_api_token -): +def test_authorized_user_can_delete_single_reply(journalist_app, test_files, journalist_api_token): with journalist_app.test_client() as app: reply_uuid = test_files["source"].replies[0].uuid uuid = test_files["source"].uuid @@ -726,9 +674,7 @@ def test_authorized_user_can_download_submission( assert response.headers["ETag"].startswith("sha256:") -def test_authorized_user_can_download_reply( - journalist_app, test_files, journalist_api_token -): +def test_authorized_user_can_download_reply(journalist_app, test_files, journalist_api_token): with journalist_app.test_client() as app: reply_uuid = test_files["source"].replies[0].uuid uuid = test_files["source"].uuid @@ -831,12 +777,8 @@ def test_authorized_user_can_add_reply( # First we must encrypt the reply, or it will get rejected # by the server. encryption_mgr = EncryptionManager.get_default() - source_key = encryption_mgr.get_source_key_fingerprint( - test_source["source"].filesystem_id - ) - reply_content = encryption_mgr._gpg.encrypt( - "This is a plaintext reply", source_key - ).data + source_key = encryption_mgr.get_source_key_fingerprint(test_source["source"].filesystem_id) + reply_content = encryption_mgr._gpg.encrypt("This is a plaintext reply", source_key).data response = app.post( url_for("api.all_source_replies", source_uuid=uuid), @@ -876,9 +818,7 @@ def test_authorized_user_can_add_reply( assert reply_content == saved_content -def test_reply_without_content_400( - journalist_app, journalist_api_token, test_source, test_journo -): +def test_reply_without_content_400(journalist_app, journalist_api_token, test_source, test_journo): with journalist_app.test_client() as app: uuid = test_source["source"].uuid response = app.post( @@ -902,9 +842,7 @@ def test_reply_without_reply_field_400( assert response.status_code == 400 -def test_reply_without_json_400( - journalist_app, journalist_api_token, test_source, test_journo -): +def test_reply_without_json_400(journalist_app, journalist_api_token, test_source, test_journo): with journalist_app.test_client() as app: uuid = test_source["source"].uuid response = app.post( @@ -945,9 +883,7 @@ def test_reply_with_valid_square_json_400( assert response.json["message"] == "reply not found in request body" -def test_malformed_json_400( - journalist_app, journalist_api_token, test_journo, test_source -): +def test_malformed_json_400(journalist_app, journalist_api_token, test_journo, test_source): with journalist_app.app_context(): uuid = test_source["source"].uuid @@ -1084,22 +1020,16 @@ def test_malformed_auth_token(journalist_app, journalist_api_token): with journalist_app.test_client() as app: # precondition to ensure token is even valid - resp = app.get( - url, headers={"Authorization": "Token {}".format(journalist_api_token)} - ) + resp = app.get(url, headers={"Authorization": "Token {}".format(journalist_api_token)}) assert resp.status_code == 200 - resp = app.get( - url, headers={"Authorization": "not-token {}".format(journalist_api_token)} - ) + resp = app.get(url, headers={"Authorization": "not-token {}".format(journalist_api_token)}) assert resp.status_code == 403 resp = app.get(url, headers={"Authorization": journalist_api_token}) assert resp.status_code == 403 - resp = app.get( - url, headers={"Authorization": "too many {}".format(journalist_api_token)} - ) + resp = app.get(url, headers={"Authorization": "too many {}".format(journalist_api_token)}) assert resp.status_code == 403 @@ -1185,9 +1115,7 @@ def test_reply_download_generates_checksum( assert not mock_add_checksum.called -def test_seen( - journalist_app, journalist_api_token, test_files, test_journo, test_submissions -): +def test_seen(journalist_app, journalist_api_token, test_files, test_journo, test_submissions): """ Happy path for seen: marking things seen works. """ @@ -1226,16 +1154,12 @@ def test_seen( assert [ s for s in response.json["submissions"] - if s["is_file"] - and s["uuid"] == file_uuid - and test_journo["uuid"] in s["seen_by"] + if s["is_file"] and s["uuid"] == file_uuid and test_journo["uuid"] in s["seen_by"] ] assert [ s for s in response.json["submissions"] - if s["is_message"] - and s["uuid"] == msg_uuid - and test_journo["uuid"] in s["seen_by"] + if s["is_message"] and s["uuid"] == msg_uuid and test_journo["uuid"] in s["seen_by"] ] # check that /replies still only contains one seen reply @@ -1256,8 +1180,7 @@ def test_seen( assert [ s for s in response.json["submissions"] - if s["uuid"] in [file_uuid, msg_uuid] - and s["seen_by"] == [test_journo["uuid"]] + if s["uuid"] in [file_uuid, msg_uuid] and s["seen_by"] == [test_journo["uuid"]] ] # check that /replies still only contains one seen reply diff --git a/securedrop/tests/test_journalist_session.py b/securedrop/tests/test_journalist_session.py index c0a25ff8e25..a915aec056a 100644 --- a/securedrop/tests/test_journalist_session.py +++ b/securedrop/tests/test_journalist_session.py @@ -122,9 +122,7 @@ def test_session_renew(journalist_app, test_journo): sid = _check_sig(session_cookie.value, journalist_app) redis_session = _get_session(sid, journalist_app) # The `renew_count` must exists in the session payload and must be equal to the app config - assert ( - redis_session["renew_count"] == journalist_app.config["SESSION_RENEW_COUNT"] - ) + assert redis_session["renew_count"] == journalist_app.config["SESSION_RENEW_COUNT"] # When forcing the session TTL in redis to be below the threshold # Threshold for auto renew is less than 60*30 @@ -140,9 +138,7 @@ def test_session_renew(journalist_app, test_journo): # Then the corresponding renew_count in redis must have been decreased redis_session = _get_session(sid, journalist_app) - assert redis_session["renew_count"] == ( - journalist_app.config["SESSION_RENEW_COUNT"] - 1 - ) + assert redis_session["renew_count"] == (journalist_app.config["SESSION_RENEW_COUNT"] - 1) # Then the ttl must have been updated and the new lifetime must be > of app confing lifetime # (Bigger because there is also a variable amount of time in the threshold that is kept) @@ -161,9 +157,7 @@ def test_session_logout(journalist_app, test_journo): assert session_cookie is not None sid = _check_sig(session_cookie.value, journalist_app) - assert ( - redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid) - ) is not None + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is not None # When sending a logout request from a logged in journalist resp = app.get(url_for("main.logout"), follow_redirects=False) @@ -189,9 +183,7 @@ def test_session_admin_change_password_logout(journalist_app, test_journo, test_ cookie_val = re.search(r"js=(.*?);", resp.headers["set-cookie"]).group(1) # Then also save the session id for later sid = _check_sig(session_cookie.value, journalist_app) - assert ( - redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid) - ) is not None + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is not None # Given a another test client and a valid admin user with journalist_app.test_client() as admin_app: @@ -234,9 +226,7 @@ def test_session_change_password_logout(journalist_app, test_journo): assert session_cookie is not None sid = _check_sig(session_cookie.value, journalist_app) - assert ( - redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid) - ) is not None + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is not None # When sending a self change password request resp = app.post( @@ -267,16 +257,12 @@ def test_session_login_regenerate_sid(journalist_app, test_journo): assert resp.status_code == 200 # Given a valid unauthenticated session id from the previous request - session_cookie_pre_login = _session_from_cookiejar( - app.cookie_jar, journalist_app - ) + session_cookie_pre_login = _session_from_cookiejar(app.cookie_jar, journalist_app) assert session_cookie_pre_login is not None # When sending a valid login request using the same client (same cookiejar) resp = _login_user(app, test_journo) - session_cookie_post_login = _session_from_cookiejar( - app.cookie_jar, journalist_app - ) + session_cookie_post_login = _session_from_cookiejar(app.cookie_jar, journalist_app) # Then the two session ids are different as the session id gets regenerated post login assert session_cookie_post_login != session_cookie_pre_login @@ -369,9 +355,7 @@ def test_session_api_logout(journalist_app, test_journo): # Then it is successful assert resp.status_code == 200 # Then the token and the corresponding payload no longer exist in redis - assert ( - redis.get("api_" + journalist_app.config["SESSION_KEY_PREFIX"] + sid) - ) is None + assert (redis.get("api_" + journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is None # When sending an authenticated request with the deleted token resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token)) @@ -411,9 +395,7 @@ def test_session_bad_signature(journalist_app, test_journo): assert resp.status_code == 403 # When requesting an authenticated endpoint with a valid unsigned token with a trailing dot - resp = app.get( - url_for("api.get_current_user"), headers=get_api_headers(sid + ".") - ) + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(sid + ".")) # Then the request is refused assert resp.status_code == 403 @@ -423,9 +405,7 @@ def test_session_bad_signature(journalist_app, test_journo): token_wrong_salt = signer.dumps(sid) # When requesting an authenticated endpoint with a valid token signed with the wrong salt - resp = app.get( - url_for("api.get_current_user"), headers=get_api_headers(token_wrong_salt) - ) + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token_wrong_salt)) # Then the request is refused assert resp.status_code == 403 @@ -437,9 +417,7 @@ def test_session_bad_signature(journalist_app, test_journo): token_not_api_salt = signer.dumps(sid) # When requesting an authenticated endpoint with such token - resp = app.get( - url_for("api.get_current_user"), headers=get_api_headers(token_not_api_salt) - ) + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token_not_api_salt)) # Then the request is refused since the JI salt is not valid for the API assert resp.status_code == 403 @@ -462,9 +440,7 @@ def test_session_race_condition(mocker, journalist_app, test_journo): # Given a test client and a valid journalist user with journalist_app.test_request_context() as app: # When manually creating a session in the context - session = journalist_app.session_interface.open_session( - journalist_app, app.request - ) + session = journalist_app.session_interface.open_session(journalist_app, app.request) assert session.sid is not None # When manually setting the journalist uid in session session["uid"] = test_journo["id"] @@ -472,23 +448,14 @@ def test_session_race_condition(mocker, journalist_app, test_journo): # When manually builfing a Flask repsonse object app.response = Response() # When manually calling save_session() to write the session in redis - journalist_app.session_interface.save_session( - journalist_app, session, app.response - ) + journalist_app.session_interface.save_session(journalist_app, session, app.response) # Then the session gets written in redis - assert ( - redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) - is not None - ) + assert redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) is not None # When manually adding the created session token in the request cookies - app.request.cookies = { - journalist_app.config["SESSION_COOKIE_NAME"]: session.token - } + app.request.cookies = {journalist_app.config["SESSION_COOKIE_NAME"]: session.token} # When getting the session object by supplying a request context to open_session() - session2 = journalist_app.session_interface.open_session( - journalist_app, app.request - ) + session2 = journalist_app.session_interface.open_session(journalist_app, app.request) # Then the properties of the two sessions are the same # (They are indeed the same session) assert session2.sid == session.sid @@ -501,9 +468,5 @@ def test_session_race_condition(mocker, journalist_app, test_journo): # When deleting the original session token and object from redis redis.delete(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) # Then the session_save() fails since the original object no longer exists - journalist_app.session_interface.save_session( - journalist_app, session, app.response - ) - assert ( - redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) is None - ) + journalist_app.session_interface.save_session(journalist_app, session, app.response) + assert redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) is None diff --git a/securedrop/tests/utils/__init__.py b/securedrop/tests/utils/__init__.py index 464b5681241..758f06a2850 100644 --- a/securedrop/tests/utils/__init__.py +++ b/securedrop/tests/utils/__init__.py @@ -29,4 +29,4 @@ def login_user(app, test_user): follow_redirects=True, ) assert resp.status_code == 200 - assert test_user['username'] in resp.data.decode('utf-8') + assert test_user["username"] in resp.data.decode("utf-8")