diff --git a/securedrop/crypto_util.py b/securedrop/crypto_util.py index 6918f3916b..1517c4a388 100644 --- a/securedrop/crypto_util.py +++ b/securedrop/crypto_util.py @@ -135,7 +135,7 @@ def do_runtime_tests(self): if not rm.check_secure_delete_capability(): raise AssertionError("Secure file deletion is not possible.") - def get_wordlist(self, locale: Text) -> List[str]: + def get_wordlist(self, locale: 'Text') -> 'List[str]': """" Ensure the wordlist for the desired locale is read and available in the words global variable. If there is no wordlist for the desired local, fallback to the default english wordlist. diff --git a/securedrop/journalist_app/__init__.py b/securedrop/journalist_app/__init__.py index 33f03e7ce4..ba83b1d04b 100644 --- a/securedrop/journalist_app/__init__.py +++ b/securedrop/journalist_app/__init__.py @@ -38,7 +38,7 @@ _insecure_views = ['main.login', 'main.select_logo', 'static'] -def create_app(config: SDConfig) -> Flask: +def create_app(config: 'SDConfig') -> Flask: app = Flask(__name__, template_folder=config.JOURNALIST_TEMPLATES_DIR, static_folder=path.join(config.SECUREDROP_ROOT, 'static')) @@ -81,14 +81,16 @@ def create_app(config: SDConfig) -> Flask: ) @app.errorhandler(CSRFError) - def handle_csrf_error(e: CSRFError) -> Response: + def handle_csrf_error(e: CSRFError) -> 'Response': # render the message first to ensure it's localized. msg = gettext('You have been logged out due to inactivity') session.clear() flash(msg, 'error') return redirect(url_for('main.login')) - def _handle_http_exception(error: HTTPException) -> Tuple[Union[Response, str], Optional[int]]: + def _handle_http_exception( + error: 'HTTPException' + ) -> 'Tuple[Union[Response, str], Optional[int]]': # Workaround for no blueprint-level 404/5 error handlers, see: # https://github.com/pallets/flask/issues/503#issuecomment-71383286 handler = list(app.error_handler_spec['api'][error.code].values())[0] @@ -126,7 +128,7 @@ def load_instance_config(): app.instance_config = InstanceConfig.get_current() @app.before_request - def setup_g() -> Optional[Response]: + def setup_g() -> 'Optional[Response]': """Store commonly used values in Flask's special g object""" if 'expires' in session and datetime.utcnow() >= session['expires']: session.clear() diff --git a/securedrop/journalist_app/utils.py b/securedrop/journalist_app/utils.py index 6f1bf73010..544cdcb020 100644 --- a/securedrop/journalist_app/utils.py +++ b/securedrop/journalist_app/utils.py @@ -254,7 +254,7 @@ def col_delete(cols_selected): return redirect(url_for('main.index')) -def make_password(config: SDConfig) -> str: +def make_password(config: 'SDConfig') -> str: while True: password = current_app.crypto_util.genrandomid( 7, diff --git a/securedrop/models.py b/securedrop/models.py index 1b90fbad4c..763fd285fa 100644 --- a/securedrop/models.py +++ b/securedrop/models.py @@ -42,9 +42,9 @@ ARGON2_PARAMS = dict(memory_cost=2**16, rounds=4, parallelism=2) -def get_one_or_else(query: Query, - logger: Logger, - failure_method: Callable[[int], None]) -> None: +def get_one_or_else(query: 'Query', + logger: 'Logger', + failure_method: 'Callable[[int], None]') -> None: try: return query.one() except MultipleResultsFound as e: @@ -97,7 +97,7 @@ def journalist_filename(self) -> str: return ''.join([c for c in self.journalist_designation.lower().replace( ' ', '_') if c in valid_chars]) - def documents_messages_count(self) -> Dict[str, int]: + def documents_messages_count(self) -> 'Dict[str, int]': self.docs_msgs_count = {'messages': 0, 'documents': 0} for submission in self.submissions: if submission.filename.endswith('msg.gpg'): @@ -108,7 +108,7 @@ def documents_messages_count(self) -> Dict[str, int]: return self.docs_msgs_count @property - def collection(self) -> List[Union[Submission, Reply]]: + def collection(self) -> 'List[Union[Submission, Reply]]': """Return the list of submissions and replies for this source, sorted in ascending order by the filename/interaction count.""" collection = [] # type: List[Union[Submission, Reply]] @@ -141,7 +141,7 @@ def public_key(self, value: str) -> None: def public_key(self) -> None: raise NotImplementedError - def to_json(self) -> Dict[str, Union[str, bool, int, str]]: + def to_json(self) -> 'Dict[str, Union[str, bool, int, str]]': docs_msg_count = self.documents_messages_count() if self.last_updated: @@ -212,7 +212,7 @@ def __init__(self, source: Source, filename: str) -> None: def __repr__(self) -> str: return '' % (self.filename) - def to_json(self) -> Dict[str, Union[str, int, bool]]: + def to_json(self) -> 'Dict[str, Union[str, int, bool]]': json_submission = { 'source_url': url_for('api.single_source', source_uuid=self.source.uuid), @@ -260,7 +260,7 @@ class Reply(db.Model): deleted_by_source = Column(Boolean, default=False, nullable=False) def __init__(self, - journalist: Journalist, + journalist: 'Journalist', source: Source, filename: str) -> None: self.journalist_id = journalist.id @@ -273,7 +273,7 @@ def __init__(self, def __repr__(self) -> str: return '' % (self.filename) - def to_json(self) -> Dict[str, Union[str, int, bool]]: + def to_json(self) -> 'Dict[str, Union[str, int, bool]]': username = "deleted" first_name = "" last_name = "" @@ -307,7 +307,7 @@ class SourceStar(db.Model): source_id = Column("source_id", Integer, ForeignKey('sources.id')) starred = Column("starred", Boolean, default=True) - def __eq__(self, other: Any) -> bool: + def __eq__(self, other: 'Any') -> bool: if isinstance(other, SourceStar): return (self.source_id == other.source_id and self.id == other.id and self.starred == other.starred) @@ -418,10 +418,10 @@ class Journalist(db.Model): def __init__(self, username: str, password: str, - first_name: Optional[str] = None, - last_name: Optional[str] = None, + first_name: 'Optional[str]' = None, + last_name: 'Optional[str]' = None, is_admin: bool = False, - otp_secret: Optional[str] = None) -> None: + otp_secret: 'Optional[str]' = None) -> None: self.check_username_acceptable(username) self.username = username @@ -547,14 +547,14 @@ def set_hotp_secret(self, otp_secret: str) -> None: self.hotp_counter = 0 @property - def totp(self) -> OTP: + def totp(self) -> 'OTP': if self.is_totp: return pyotp.TOTP(self.otp_secret) else: raise ValueError('{} is not using TOTP'.format(self)) @property - def hotp(self) -> OTP: + def hotp(self) -> 'OTP': if not self.is_totp: return pyotp.HOTP(self.otp_secret) else: @@ -618,7 +618,7 @@ def verify_token(self, token: str) -> bool: _MAX_LOGIN_ATTEMPTS_PER_PERIOD = 5 @classmethod - def throttle_login(cls, user: Journalist) -> None: + def throttle_login(cls, user: 'Journalist') -> None: # Record the login attempt... login_attempt = JournalistLoginAttempt(user) db.session.add(login_attempt) @@ -640,7 +640,7 @@ def throttle_login(cls, user: Journalist) -> None: def login(cls, username: str, password: str, - token: str) -> Journalist: + token: str) -> 'Journalist': try: user = Journalist.query.filter_by(username=username).one() except NoResultFound: @@ -677,7 +677,7 @@ def validate_token_is_not_expired_or_invalid(token): return True @staticmethod - def validate_api_token_and_get_user(token: str) -> Union[Journalist, None]: + def validate_api_token_and_get_user(token: str) -> 'Union[Journalist, None]': s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) @@ -690,7 +690,7 @@ def validate_api_token_and_get_user(token: str) -> Union[Journalist, None]: return Journalist.query.get(data['id']) - def to_json(self) -> Dict[str, Union[str, bool, str]]: + def to_json(self) -> 'Dict[str, Union[str, bool, str]]': json_user = { 'username': self.username, 'last_login': self.last_access.isoformat() + 'Z', diff --git a/securedrop/source_app/__init__.py b/securedrop/source_app/__init__.py index 1845c2e46d..4ac189263c 100644 --- a/securedrop/source_app/__init__.py +++ b/securedrop/source_app/__init__.py @@ -31,7 +31,7 @@ from sdconfig import SDConfig # noqa: F401 -def create_app(config: SDConfig) -> Flask: +def create_app(config: 'SDConfig') -> Flask: app = Flask(__name__, template_folder=config.SOURCE_TEMPLATES_DIR, static_folder=path.join(config.SECUREDROP_ROOT, 'static')) diff --git a/securedrop/store.py b/securedrop/store.py index a1962051f2..3b5ef44193 100644 --- a/securedrop/store.py +++ b/securedrop/store.py @@ -190,8 +190,8 @@ def path_without_filesystem_id(self, filename: str) -> str: return absolute def get_bulk_archive(self, - selected_submissions: List, - zip_directory: str = '') -> _TemporaryFileWrapper: + selected_submissions: 'List', + zip_directory: str = '') -> '_TemporaryFileWrapper': """Generate a zip file from the selected submissions""" zip_file = tempfile.NamedTemporaryFile( prefix='tmp_securedrop_bulk_dl_', @@ -299,7 +299,7 @@ def save_file_submission(self, count: int, journalist_filename: str, filename: str, - stream: BufferedIOBase) -> str: + stream: 'BufferedIOBase') -> str: sanitized_filename = secure_filename(filename) # We store file submissions in a .gz file for two reasons: @@ -363,7 +363,7 @@ def save_message_submission(self, return filename -def async_add_checksum_for_file(db_obj: Union[Submission, Reply]) -> str: +def async_add_checksum_for_file(db_obj: 'Union[Submission, Reply]') -> str: return create_queue().enqueue( queued_add_checksum_for_file, type(db_obj), @@ -373,7 +373,7 @@ def async_add_checksum_for_file(db_obj: Union[Submission, Reply]) -> str: ) -def queued_add_checksum_for_file(db_model: Union[Type[Submission], Type[Reply]], +def queued_add_checksum_for_file(db_model: 'Union[Type[Submission], Type[Reply]]', model_id: int, file_path: str, db_uri: str) -> str: @@ -385,8 +385,8 @@ def queued_add_checksum_for_file(db_model: Union[Type[Submission], Type[Reply]], return "success" -def add_checksum_for_file(session: Session, - db_obj: Union[Submission, Reply], +def add_checksum_for_file(session: 'Session', + db_obj: 'Union[Submission, Reply]', file_path: str) -> None: hasher = sha256() with open(file_path, 'rb') as f: