Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure ruff #1028

Merged
merged 5 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 9 additions & 23 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,35 +179,21 @@ venv: ## Provision a Python 3 virtualenv for development (ensure to also install
check: lint test ## Runs linters and tests

.PHONY: lint
lint: check-black check-isort flake8 mypy bandit rpmlint shellcheck ## Runs linters (black, isort, flake8, mypy, bandit rpmlint, and shellcheck)

.PHONY: bandit
bandit: ## Runs the bandit security linter
poetry run bandit -ll -r .
lint: check-ruff mypy rpmlint shellcheck ## Runs linters (ruff, mypy, rpmlint, and shellcheck)

.PHONY: test-launcher
test-launcher: ## Runs launcher tests
xvfb-run poetry run python3 -m pytest --cov-report term-missing --cov=sdw_notify --cov=sdw_updater/ --cov=sdw_util -v launcher/tests/

.PHONY: check-black
check-black: ## Check Python source code formatting with black
poetry run black --check --diff .

.PHONY: black
black: ## Update Python source code formatting with black
poetry run black .

.PHONY: check-isort
check-isort: ## Check Python import organization with isort
poetry run isort --check-only --diff .

.PHONY: isort
isort: ## Update Python import organization with isort
poetry run isort .
.PHONY: check-ruff
check-ruff: ## Check Python source code formatting with ruff
poetry run ruff format . --diff
poetry run ruff check . --output-format=full

.PHONY: flake8
flake8: ## Validate PEP8 compliance for Python source files
poetry run flake8
.PHONY: fix
fix: ## Fix Python source code formatting with ruff
poetry run ruff format .
poetry run ruff check --fix

.PHONY: mypy
mypy: ## Type check Python files
Expand Down
11 changes: 7 additions & 4 deletions dom0/remove-tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
Removes tags used for exempting VMs from default SecureDrop Workstation
RPC policies from all VMs (including non-SecureDrop ones).
"""

import sys

import qubesadmin

q = qubesadmin.Qubes()
Expand All @@ -15,17 +18,17 @@ def main():
for vm in q.domains:
for tag in TAGS_TO_REMOVE:
if tag in q.domains[vm].tags:
print("Removing tag '{}' from VM '{}'.".format(tag, vm))
print(f"Removing tag '{tag}' from VM '{vm}'.")
try:
q.domains[vm].tags.remove(tag)
except Exception as error:
print("Error removing tag: '{}'".format(error))
print(f"Error removing tag: '{error}'")
print("Aborting.")
exit(1)
sys.exit(1)
tags_removed = True

if tags_removed is False:
print("Tags {} not set on any VMs, nothing removed.".format(TAGS_TO_REMOVE))
print(f"Tags {TAGS_TO_REMOVE} not set on any VMs, nothing removed.")


if __name__ == "__main__":
Expand Down
6 changes: 4 additions & 2 deletions files/destroy-vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Utility to quickly destroy a Qubes VM managed by the Workstation
salt config, for use in repeated builds during development.
"""

import argparse
import subprocess
import sys
Expand Down Expand Up @@ -38,10 +39,11 @@ def destroy_vm(vm):
Destroys a single VM instance. Requires arg to be
QubesVM object.
"""
assert SDW_DEFAULT_TAG in vm.tags
if SDW_DEFAULT_TAG not in vm.tags:
raise Exception("VM does not have the 'sd-workstation' tag")
if vm.is_running():
vm.kill()
print("Destroying VM '{}'... ".format(vm.name), end="")
print(f"Destroying VM '{vm.name}'... ", end="")
subprocess.check_call(["qvm-remove", "-f", vm.name])
print("OK")

Expand Down
10 changes: 4 additions & 6 deletions files/sdw-admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
packages only puts the files in place `/srv/salt` but does not apply the state, nor
does it handle the config.
"""

import argparse
import os
import subprocess
Expand Down Expand Up @@ -59,9 +60,7 @@ def parse_args():
action="store_true",
help="During uninstall action, don't prompt for confirmation, proceed immediately",
)
args = parser.parse_args()

return args
return parser.parse_args()


def install_pvh_support():
Expand All @@ -70,9 +69,9 @@ def install_pvh_support():
TODO: install this via package requirements instead if possible
"""
try:
subprocess.run(["sudo", "qubes-dom0-update", "-y", "-q", "grub2-xen-pvh"])
subprocess.check_call(["sudo", "qubes-dom0-update", "-y", "-q", "grub2-xen-pvh"])
except subprocess.CalledProcessError:
raise SDWAdminException("Error installing grub2-xen-pvah: local PVH not available.")
raise SDWAdminException("Error installing grub2-xen-pvh: local PVH not available.")


def copy_config():
Expand Down Expand Up @@ -141,7 +140,6 @@ def refresh_salt():


def perform_uninstall(keep_template_rpm=False):

try:
subprocess.check_call(["sudo", "qubesctl", "state.sls", "sd-clean-default-dispvm"])
print("Destroying all VMs")
Expand Down
1 change: 1 addition & 0 deletions files/sdw-login.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Utility script for SecureDrop Workstation. Launches the SecureDrop Workstation
updater on boot. It will prompt users to apply template and dom0 updates
"""

import logging
import os
import subprocess
Expand Down
3 changes: 2 additions & 1 deletion files/sdw-notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
for too long without checking for security updates. Writes output to a logfile,
not stdout. All settings are in Notify utility module.
"""

import sys

from PyQt5.QtWidgets import QApplication
Expand Down Expand Up @@ -55,7 +56,7 @@ def show_update_warning():
notify script runs.
"""

app = QApplication([]) # noqa: F841
app = QApplication([])
dialog = NotifyApp.NotifyDialog(Util.is_sdapp_halted())
result = dialog.run()

Expand Down
18 changes: 7 additions & 11 deletions files/validate_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Utility to verify that SecureDrop Workstation config is properly structured.
Checks for
"""

import json
import os
import re
Expand All @@ -24,7 +25,7 @@ class ValidationError(Exception):
pass


class SDWConfigValidator(object):
class SDWConfigValidator:
def __init__(self, config_base_dir=None):
if config_base_dir:
self.config_filepath = os.path.join(config_base_dir, CONFIG_FILEPATH)
Expand All @@ -42,7 +43,7 @@ def __init__(self, config_base_dir=None):

def confirm_config_file_exists(self):
if not os.path.exists(self.config_filepath):
msg = "Config file does not exist: {}".format(self.config_filepath)
msg = f"Config file does not exist: {self.config_filepath}"
msg += "Create from config.json.example"
raise ValidationError(msg)

Expand Down Expand Up @@ -107,9 +108,7 @@ def confirm_submission_privkey_file(self):
pass

if not result:
raise ValidationError(
"GPG private key is not valid: {}".format(self.secret_key_filepath)
)
raise ValidationError(f"GPG private key is not valid: {self.secret_key_filepath}")

def confirm_submission_privkey_fingerprint(self):
if "submission_key_fpr" not in self.config:
Expand All @@ -126,14 +125,11 @@ def confirm_submission_privkey_fingerprint(self):
raise ValidationError("Configured fingerprint does not match key!")

except subprocess.CalledProcessError as e:
raise ValidationError(
"Key validation failed: {}".format(e.output.decode(sys.stdout.encoding))
)
raise ValidationError(f"Key validation failed: {e.output.decode(sys.stdout.encoding)}")

def read_config_file(self):
with open(self.config_filepath, "r") as f:
j = json.load(f)
return j
with open(self.config_filepath) as f:
return json.load(f)

def validate_existing_size(self):
"""This method checks for existing private volume size and new
Expand Down
2 changes: 1 addition & 1 deletion launcher/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest


@pytest.fixture
@pytest.fixture()
def tmpdir():
"""Run the test in a temporary directory"""
cwd = os.getcwd()
Expand Down
3 changes: 1 addition & 2 deletions launcher/tests/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def test_warning_shown_if_updater_never_ran(mocked_info, mocked_warning, mocked_
"""
# We're going to look for a nonexistent file in an existing temporary directoryr
with mock.patch("sdw_notify.Notify.LAST_UPDATED_FILE", tmp_path / "not-a-file"):

warning_should_be_shown = Notify.is_update_check_necessary()

# No handled errors should occur
Expand All @@ -61,7 +60,7 @@ def test_warning_shown_if_updater_never_ran(mocked_info, mocked_warning, mocked_


@pytest.mark.parametrize(
"uptime,warning_expected",
("uptime", "warning_expected"),
[(Notify.UPTIME_GRACE_PERIOD + 1, True), (Notify.UPTIME_GRACE_PERIOD - 1, False)],
)
@mock.patch("sdw_notify.Notify.sdlog.error")
Expand Down
40 changes: 21 additions & 19 deletions launcher/tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ def test_write_updates_status_flag_to_disk(
Updater._write_updates_status_flag_to_disk(status)

mocked_call.assert_called_once_with(
["qvm-run", "sd-app", "echo '{}' > {}".format(status.value, flag_file_sd_app)]
["qvm-run", "sd-app", f"echo '{status.value}' > {flag_file_sd_app}"]
)

assert os.path.exists(flag_file_dom0)
with open(flag_file_dom0, "r") as f:
with open(flag_file_dom0) as f:
contents = json.load(f)
assert contents["status"] == status.value
assert "tmp" in flag_file_dom0
Expand Down Expand Up @@ -207,12 +207,14 @@ def test_write_last_updated_flags_to_disk(mocked_info, mocked_error, mocked_call
subprocess_command = [
"qvm-run",
"sd-app",
"echo '{}' > {}".format(current_time, flag_file_sd_app),
f"echo '{current_time}' > {flag_file_sd_app}",
]
mocked_call.assert_called_once_with(subprocess_command)
assert not mocked_error.called
assert os.path.exists(flag_file_dom0)
contents = open(flag_file_dom0, "r").read()
with open(flag_file_dom0) as f:
contents = f.read()

assert contents == current_time


Expand Down Expand Up @@ -294,7 +296,7 @@ def test_apply_updates_vms(mocked_info, mocked_error, mocked_call, vm):
result = Updater._apply_updates_vm(vm)
assert result == UpdateStatus.UPDATES_OK

if vm.startswith("fedora") or vm.startswith("whonix"):
if vm.startswith(("fedora", "whonix")):
expected_salt_state = "update.qubes-vm"
else:
expected_salt_state = "fpf-apt-repo"
Expand All @@ -311,7 +313,7 @@ def test_apply_updates_vms(mocked_info, mocked_error, mocked_call, vm):
@mock.patch("sdw_updater.Updater.sdlog.info")
def test_apply_updates_vms_fails(mocked_info, mocked_error, mocked_call, vm):
error_calls = [
call("An error has occurred updating {}. Please contact your administrator.".format(vm)),
call(f"An error has occurred updating {vm}. Please contact your administrator."),
call("Command 'check_call' returned non-zero exit status 1."),
]
result = Updater._apply_updates_vm(vm)
Expand Down Expand Up @@ -403,7 +405,7 @@ def test_overall_update_status_reboot_not_done_previously(
@mock.patch("sdw_updater.Updater.sdlog.error")
@mock.patch("sdw_updater.Updater.sdlog.info")
def test_safely_shutdown(mocked_info, mocked_error, mocked_output, vm):
call_list = [call(["qvm-shutdown", "--wait", "{}".format(vm)], stderr=-1)]
call_list = [call(["qvm-shutdown", "--wait", f"{vm}"], stderr=-1)]

Updater._safely_shutdown_vm(vm)
mocked_output.assert_has_calls(call_list)
Expand Down Expand Up @@ -431,7 +433,7 @@ def test_safely_start(mocked_info, mocked_error, mocked_output, vm):
@mock.patch("sdw_updater.Updater.sdlog.info")
def test_safely_start_fails(mocked_info, mocked_error, mocked_output, vm):
call_list = [
call("Error while starting {}".format(vm)),
call(f"Error while starting {vm}"),
call("Command 'check_output' returned non-zero exit status 1."),
]

Expand All @@ -445,7 +447,7 @@ def test_safely_start_fails(mocked_info, mocked_error, mocked_output, vm):
@mock.patch("sdw_updater.Updater.sdlog.info")
def test_safely_shutdown_fails(mocked_info, mocked_error, mocked_call, vm):
call_list = [
call("Failed to shut down {}".format(vm)),
call(f"Failed to shut down {vm}"),
call("Command 'check_output' returned non-zero exit status 1."),
]

Expand Down Expand Up @@ -474,8 +476,8 @@ def test_shutdown_and_start_vms(
]
template_vm_calls = [
call("fedora-39-xfce"),
call("sd-large-{}-template".format(DEBIAN_VERSION)),
call("sd-small-{}-template".format(DEBIAN_VERSION)),
call(f"sd-large-{DEBIAN_VERSION}-template"),
call(f"sd-small-{DEBIAN_VERSION}-template"),
call("whonix-gateway-17"),
]
app_vm_calls = [
Expand Down Expand Up @@ -520,8 +522,8 @@ def test_shutdown_and_start_vms_sysvm_fail(
]
template_vm_calls = [
call("fedora-39-xfce"),
call("sd-large-{}-template".format(DEBIAN_VERSION)),
call("sd-small-{}-template".format(DEBIAN_VERSION)),
call(f"sd-large-{DEBIAN_VERSION}-template"),
call(f"sd-small-{DEBIAN_VERSION}-template"),
call("whonix-gateway-17"),
]
error_calls = [
Expand Down Expand Up @@ -550,7 +552,7 @@ def test_read_dom0_update_flag_from_disk(mocked_error, mocked_subprocess, status
flag_file_dom0 = Updater.get_dom0_path(Updater.FLAG_FILE_STATUS_DOM0)

assert os.path.exists(flag_file_dom0)
with open(flag_file_dom0, "r") as f:
with open(flag_file_dom0) as f:
contents = json.load(f)
assert contents["status"] == status.value
assert "tmp" in flag_file_dom0
Expand Down Expand Up @@ -638,7 +640,7 @@ def test_last_required_reboot_performed_not_required(mocked_info, mocked_error,


@pytest.mark.parametrize(
"status, rebooted, expect_status_change, expect_updater",
("status", "rebooted", "expect_status_change", "expect_updater"),
[
(UpdateStatus.UPDATES_OK, True, False, True),
(UpdateStatus.UPDATES_REQUIRED, True, False, True),
Expand Down Expand Up @@ -674,7 +676,7 @@ def test_should_run_updater_status_interval_expired(


@pytest.mark.parametrize(
"status, rebooted, expect_status_change, expect_updater",
("status", "rebooted", "expect_status_change", "expect_updater"),
[
(UpdateStatus.UPDATES_OK, True, False, False),
(UpdateStatus.UPDATES_REQUIRED, True, False, True),
Expand Down Expand Up @@ -807,7 +809,7 @@ def test_run_full_install(mocked_call, mocked_output, mocked_info):
"""
# subprocess.check_call is mocked, so this directory should never be accessed
# by the test.
MIGRATION_DIR = "/tmp/potato" # nosec
MIGRATION_DIR = "/tmp/potato"
with mock.patch("sdw_updater.Updater.MIGRATION_DIR", MIGRATION_DIR):
result = Updater.run_full_install()
check_outputs = [call(["sdw-admin", "--apply"])]
Expand All @@ -833,7 +835,7 @@ def test_run_full_install_with_error(mocked_call, mocked_output, mocked_error):
And the error is logged
And the failure enum is returned
"""
MIGRATION_DIR = "/tmp/potato" # nosec
MIGRATION_DIR = "/tmp/potato"
with mock.patch("sdw_updater.Updater.MIGRATION_DIR", MIGRATION_DIR):
result = Updater.run_full_install()
calls = [call(["sdw-admin", "--apply"])]
Expand All @@ -857,7 +859,7 @@ def test_run_full_install_with_flag_error(mocked_call, mocked_output, mocked_err
Then the error is logged
And the failure enum is returned
"""
MIGRATION_DIR = "/tmp/potato" # nosec
MIGRATION_DIR = "/tmp/potato"
with mock.patch("sdw_updater.Updater.MIGRATION_DIR", MIGRATION_DIR):
result = Updater.run_full_install()
check_outputs = [call(["sdw-admin", "--apply"])]
Expand Down