diff --git a/securedrop_export/entrypoint.py b/securedrop_export/entrypoint.py index fbdec17..c5b2249 100755 --- a/securedrop_export/entrypoint.py +++ b/securedrop_export/entrypoint.py @@ -5,9 +5,10 @@ from securedrop_export import export from securedrop_export import main +CONFIG_PATH = "/etc/sd-export-config.json" def start(): - my_sub = export.SDExport(sys.argv[1]) + my_sub = export.SDExport(sys.argv[1], CONFIG_PATH) try: # Halt immediately if target file is absent if not os.path.exists(my_sub.archive): diff --git a/securedrop_export/export.py b/securedrop_export/export.py index 12f918f..7fe7fa5 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -18,7 +18,6 @@ ENCRYPTED_DEVICE = "encrypted_volume" BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" -PCI_BUS_ID = "002:" class Metadata(object): """ @@ -37,12 +36,14 @@ class Metadata(object): def __init__(self, archive_path): self.metadata_path = os.path.join(archive_path, self.METADATA_FILE) + try: with open(self.metadata_path) as f: json_config = json.loads(f.read()) self.export_method = json_config.get("device", None) self.encryption_method = json_config.get("encryption_method", None) self.encryption_key = json_config.get("encryption_key", None) + except Exception as e: raise @@ -58,7 +59,7 @@ def is_valid(self): class SDExport(object): - def __init__(self, archive): + def __init__(self, archive, config_path): self.device = DEVICE self.mountpoint = MOUNTPOINT self.encrypted_device = ENCRYPTED_DEVICE @@ -68,14 +69,21 @@ def __init__(self, archive): self.brlaser_driver = BRLASER_DRIVER self.brlaser_ppd = BRLASER_PPD - + self.archive = archive - self.submission_dirname = os.path.basename(self.archive).split(".")[0] + self.submission_dirname = os.path.basename(self.archive).split(".")[0] self.target_dirname = "sd-export-{}".format( datetime.datetime.now().strftime("%Y%m%d-%H%M%S") ) self.tmpdir = tempfile.mkdtemp() + try: + with open(config_path) as f: + json_config = json.loads(f.read()) + self.pci_bus_id = int(json_config.get("pci_bus_id", 2)) + except Exception as e: + self.exit_gracefully("ERROR_CONFIG") + def exit_gracefully(self, msg, e=False): """ @@ -124,10 +132,8 @@ def extract_tarball(self): def check_usb_connected(self): - # Rely on the output of lsusb on the bus assigned to. We might need to make this variable configurable - # In the future and extracted from config.json - p = subprocess.check_output(["lsusb", "-s", PCI_BUS_ID]) - # Empty string means a likely wrong PCI_BUS_ID + p = subprocess.check_output(["lsusb", "-s", self.pci_bus_id]) + # Empty string means a likely wrong pci_bus_id if p == "": msg = "ERROR_USB_CHECK" self.exit_gracefully(msg) @@ -189,8 +195,8 @@ def mount_volume(self): ) subprocess.check_call( [ - "sudo", - "chown", + "sudo", + "chown", "-R", "user:user", self.mountpoint, ] ) diff --git a/securedrop_export/main.py b/securedrop_export/main.py index a0b830c..b517705 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -1,17 +1,12 @@ -import os -import shutil -import sys - -from securedrop_export import export +from securedrop_export import export def __main__(submission): submission.extract_tarball() - try: + try: submission.archive_metadata = export.Metadata(submission.tmpdir) except Exception as e: - msg = "ERROR_METADATA_PARSING" - submission.exit_gracefully(msg, e=e) + submission.exit_gracefully("ERROR_METADATA_PARSING") if submission.archive_metadata.is_valid(): if submission.archive_metadata.export_method == "usb-test": @@ -37,5 +32,3 @@ def __main__(submission): submission.print_test_page() else: submission.exit_gracefully("ERROR_ARCHIVE_METADATA") - - diff --git a/tests/sd-export-config-bad-2.json b/tests/sd-export-config-bad-2.json new file mode 100644 index 0000000..879fb83 --- /dev/null +++ b/tests/sd-export-config-bad-2.json @@ -0,0 +1,3 @@ +{ + "pci_bus_id": "two" +} diff --git a/tests/sd-export-config-bad.json b/tests/sd-export-config-bad.json new file mode 100644 index 0000000..f7cbf8d --- /dev/null +++ b/tests/sd-export-config-bad.json @@ -0,0 +1,3 @@ +{ + "pciishf. i3u 2 +} diff --git a/tests/sd-export-config.json b/tests/sd-export-config.json new file mode 100644 index 0000000..d1167cf --- /dev/null +++ b/tests/sd-export-config.json @@ -0,0 +1,3 @@ +{ + "pci_bus_id": "2" +} diff --git a/tests/test_export.py b/tests/test_export.py index 00996c9..82cc824 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -5,7 +5,7 @@ import subprocess import tempfile -from securedrop_export import export +from securedrop_export import export SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa @@ -14,10 +14,45 @@ SAMPLE_OUTPUT_USB="Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa SAMPLE_OUTPUT_USB_ERROR="" SAMPLE_OUTPUT_USB_ERROR2="h\ne\nl\nl\no" +TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") +BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json") +ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json") + + +def test_bad_sd_export_config_invalid_json(capsys): + + expected_message = "ERROR_CONFIG" + with pytest.raises(SystemExit) as sysexit: + submission = export.SDExport("", BAD_TEST_CONFIG) + # A graceful exit means a return code of 0 + assert sysexit.value.code == 0 + + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + assert captured.out == "" + + +def test_bad_sd_export_config_invalid_value(capsys): + + expected_message = "ERROR_CONFIG" + with pytest.raises(SystemExit) as sysexit: + submission = export.SDExport("", ANOTHER_BAD_TEST_CONFIG) + # A graceful exit means a return code of 0 + assert sysexit.value.code == 0 + + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + assert captured.out == "" + + +def test_good_sd_export_config(capsys): + submission = export.SDExport("", TEST_CONFIG) + assert submission.pci_bus_id == 2 def test_exit_gracefully_no_exception(capsys): - submission = export.SDExport("testfile") + + submission = export.SDExport("testfile", TEST_CONFIG) test_msg = 'test' with pytest.raises(SystemExit) as sysexit: @@ -32,7 +67,7 @@ def test_exit_gracefully_no_exception(capsys): def test_exit_gracefully_exception(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) test_msg = 'test' with pytest.raises(SystemExit) as sysexit: @@ -48,7 +83,7 @@ def test_exit_gracefully_exception(capsys): def test_empty_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -58,7 +93,7 @@ def test_empty_config(capsys): def test_valid_printer_test_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -70,7 +105,7 @@ def test_valid_printer_test_config(capsys): def test_valid_printer_config(capsys): - submission = export.SDExport("") + submission = export.SDExport("", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -82,7 +117,7 @@ def test_valid_printer_config(capsys): def test_invalid_encryption_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) @@ -97,7 +132,7 @@ def test_invalid_encryption_config(capsys): def test_valid_encryption_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -112,7 +147,7 @@ def test_valid_encryption_config(capsys): @mock.patch("subprocess.check_call") def test_popup_message(mocked_call): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) submission.popup_message("hello!") mocked_call.assert_called_once_with([ "notify-send", @@ -124,14 +159,14 @@ def test_popup_message(mocked_call): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BOTHER_PRINTER) def test_get_good_printer_uri(mocked_call): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) result = submission.get_printer_uri() assert result == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) def test_get_bad_printer_uri(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "ERROR_PRINTER_NOT_FOUND" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) @@ -153,7 +188,7 @@ def test_get_bad_printer_uri(mocked_call, capsys): "/tmp/tmpJf83j9/secret.pptx" ]) def test_is_open_office_file(capsys, open_office_paths): - submission = export.SDExport("") + submission = export.SDExport("", TEST_CONFIG) assert submission.is_open_office_file(open_office_paths) @@ -164,13 +199,13 @@ def test_is_open_office_file(capsys, open_office_paths): "/tmp/tmpJf83j9/secret.gpg" ]) def test_is_not_open_office_file(capsys, open_office_paths): - submission = export.SDExport("") + submission = export.SDExport("", TEST_CONFIG) assert not submission.is_open_office_file(open_office_paths) @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB) def test_usb_precheck_connected(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_NOT_CONNECTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -184,7 +219,7 @@ def test_usb_precheck_connected(mocked_call, capsys): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB) def test_usb_precheck_disconnected(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_CONNECTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -198,7 +233,7 @@ def test_usb_precheck_disconnected(mocked_call, capsys): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR) def test_usb_precheck_error(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "ERROR_USB_CHECK" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -212,7 +247,7 @@ def test_usb_precheck_error(mocked_call, capsys): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2) def test_usb_precheck_error_2(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "ERROR_USB_CHECK" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -226,7 +261,7 @@ def test_usb_precheck_error_2(mocked_call, capsys): @mock.patch("subprocess.check_call") def test_luks_precheck_encrypted(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_ENCRYPTED" with pytest.raises(SystemExit) as sysexit: result = submission.check_luks_volume() @@ -234,4 +269,3 @@ def test_luks_precheck_encrypted(mocked_call, capsys): assert sysexit.value.code == 0 captured = capsys.readouterr() assert captured.err == "{}\n".format(expected_message) -