Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Configurable PCI_DEVICE_ID will be populated at provision time by salt logic
  • Loading branch information
emkll committed Aug 2, 2019
1 parent 8cfa3bd commit bee4790
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 40 deletions.
3 changes: 2 additions & 1 deletion securedrop_export/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from securedrop_export import export
from securedrop_export import main

CONFIG_PATH = "/etc/sd-export-config.json"

def start():
my_sub = export.SDExport(sys.argv[1])
my_sub = export.SDExport(sys.argv[1], CONFIG_PATH)
try:
# Halt immediately if target file is absent
if not os.path.exists(my_sub.archive):
Expand Down
26 changes: 16 additions & 10 deletions securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ENCRYPTED_DEVICE = "encrypted_volume"
BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv"
BRLASER_PPD = "/usr/share/cups/model/br7030.ppd"
PCI_BUS_ID = "002:"

class Metadata(object):
"""
Expand All @@ -37,12 +36,14 @@ class Metadata(object):

def __init__(self, archive_path):
self.metadata_path = os.path.join(archive_path, self.METADATA_FILE)

try:
with open(self.metadata_path) as f:
json_config = json.loads(f.read())
self.export_method = json_config.get("device", None)
self.encryption_method = json_config.get("encryption_method", None)
self.encryption_key = json_config.get("encryption_key", None)

except Exception as e:
raise

Expand All @@ -58,7 +59,7 @@ def is_valid(self):

class SDExport(object):

def __init__(self, archive):
def __init__(self, archive, config_path):
self.device = DEVICE
self.mountpoint = MOUNTPOINT
self.encrypted_device = ENCRYPTED_DEVICE
Expand All @@ -68,14 +69,21 @@ def __init__(self, archive):

self.brlaser_driver = BRLASER_DRIVER
self.brlaser_ppd = BRLASER_PPD

self.archive = archive
self.submission_dirname = os.path.basename(self.archive).split(".")[0]
self.submission_dirname = os.path.basename(self.archive).split(".")[0]
self.target_dirname = "sd-export-{}".format(
datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
)
self.tmpdir = tempfile.mkdtemp()

try:
with open(config_path) as f:
json_config = json.loads(f.read())
self.pci_bus_id = int(json_config.get("pci_bus_id", 2))
except Exception as e:
self.exit_gracefully("ERROR_CONFIG")


def exit_gracefully(self, msg, e=False):
"""
Expand Down Expand Up @@ -124,10 +132,8 @@ def extract_tarball(self):


def check_usb_connected(self):
# Rely on the output of lsusb on the bus assigned to. We might need to make this variable configurable
# In the future and extracted from config.json
p = subprocess.check_output(["lsusb", "-s", PCI_BUS_ID])
# Empty string means a likely wrong PCI_BUS_ID
p = subprocess.check_output(["lsusb", "-s", self.pci_bus_id])
# Empty string means a likely wrong pci_bus_id
if p == "":
msg = "ERROR_USB_CHECK"
self.exit_gracefully(msg)
Expand Down Expand Up @@ -189,8 +195,8 @@ def mount_volume(self):
)
subprocess.check_call(
[
"sudo",
"chown",
"sudo",
"chown",
"-R", "user:user", self.mountpoint,
]
)
Expand Down
13 changes: 3 additions & 10 deletions securedrop_export/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import os
import shutil
import sys

from securedrop_export import export
from securedrop_export import export

def __main__(submission):
submission.extract_tarball()

try:
try:
submission.archive_metadata = export.Metadata(submission.tmpdir)
except Exception as e:
msg = "ERROR_METADATA_PARSING"
submission.exit_gracefully(msg, e=e)
submission.exit_gracefully("ERROR_METADATA_PARSING")

if submission.archive_metadata.is_valid():
if submission.archive_metadata.export_method == "usb-test":
Expand All @@ -37,5 +32,3 @@ def __main__(submission):
submission.print_test_page()
else:
submission.exit_gracefully("ERROR_ARCHIVE_METADATA")


3 changes: 3 additions & 0 deletions tests/sd-export-config-bad-2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"pci_bus_id": "two"
}
3 changes: 3 additions & 0 deletions tests/sd-export-config-bad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"pciishf. i3u 2
}
3 changes: 3 additions & 0 deletions tests/sd-export-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"pci_bus_id": "2"
}
72 changes: 53 additions & 19 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import subprocess
import tempfile

from securedrop_export import export
from securedrop_export import export

SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa
SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa
Expand All @@ -14,10 +14,45 @@
SAMPLE_OUTPUT_USB="Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa
SAMPLE_OUTPUT_USB_ERROR=""
SAMPLE_OUTPUT_USB_ERROR2="h\ne\nl\nl\no"
TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json")
BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json")
ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json")


def test_bad_sd_export_config_invalid_json(capsys):

expected_message = "ERROR_CONFIG"
with pytest.raises(SystemExit) as sysexit:
submission = export.SDExport("", BAD_TEST_CONFIG)
# A graceful exit means a return code of 0
assert sysexit.value.code == 0

captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)
assert captured.out == ""


def test_bad_sd_export_config_invalid_value(capsys):

expected_message = "ERROR_CONFIG"
with pytest.raises(SystemExit) as sysexit:
submission = export.SDExport("", ANOTHER_BAD_TEST_CONFIG)
# A graceful exit means a return code of 0
assert sysexit.value.code == 0

captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)
assert captured.out == ""


def test_good_sd_export_config(capsys):
submission = export.SDExport("", TEST_CONFIG)
assert submission.pci_bus_id == 2


def test_exit_gracefully_no_exception(capsys):
submission = export.SDExport("testfile")

submission = export.SDExport("testfile", TEST_CONFIG)
test_msg = 'test'

with pytest.raises(SystemExit) as sysexit:
Expand All @@ -32,7 +67,7 @@ def test_exit_gracefully_no_exception(capsys):


def test_exit_gracefully_exception(capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
test_msg = 'test'

with pytest.raises(SystemExit) as sysexit:
Expand All @@ -48,7 +83,7 @@ def test_exit_gracefully_exception(capsys):


def test_empty_config(capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
Expand All @@ -58,7 +93,7 @@ def test_empty_config(capsys):


def test_valid_printer_test_config(capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
Expand All @@ -70,7 +105,7 @@ def test_valid_printer_test_config(capsys):


def test_valid_printer_config(capsys):
submission = export.SDExport("")
submission = export.SDExport("", TEST_CONFIG)
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
Expand All @@ -82,7 +117,7 @@ def test_valid_printer_config(capsys):


def test_invalid_encryption_config(capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)

temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE)
Expand All @@ -97,7 +132,7 @@ def test_invalid_encryption_config(capsys):


def test_valid_encryption_config(capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
Expand All @@ -112,7 +147,7 @@ def test_valid_encryption_config(capsys):

@mock.patch("subprocess.check_call")
def test_popup_message(mocked_call):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
submission.popup_message("hello!")
mocked_call.assert_called_once_with([
"notify-send",
Expand All @@ -124,14 +159,14 @@ def test_popup_message(mocked_call):

@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BOTHER_PRINTER)
def test_get_good_printer_uri(mocked_call):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
result = submission.get_printer_uri()
assert result == "usb://Brother/HL-L2320D%20series?serial=A00000A000000"


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER)
def test_get_bad_printer_uri(mocked_call, capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "ERROR_PRINTER_NOT_FOUND"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

Expand All @@ -153,7 +188,7 @@ def test_get_bad_printer_uri(mocked_call, capsys):
"/tmp/tmpJf83j9/secret.pptx"
])
def test_is_open_office_file(capsys, open_office_paths):
submission = export.SDExport("")
submission = export.SDExport("", TEST_CONFIG)
assert submission.is_open_office_file(open_office_paths)


Expand All @@ -164,13 +199,13 @@ def test_is_open_office_file(capsys, open_office_paths):
"/tmp/tmpJf83j9/secret.gpg"
])
def test_is_not_open_office_file(capsys, open_office_paths):
submission = export.SDExport("")
submission = export.SDExport("", TEST_CONFIG)
assert not submission.is_open_office_file(open_office_paths)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB)
def test_usb_precheck_connected(mocked_call, capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_NOT_CONNECTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
Expand All @@ -184,7 +219,7 @@ def test_usb_precheck_connected(mocked_call, capsys):

@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB)
def test_usb_precheck_disconnected(mocked_call, capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_CONNECTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
Expand All @@ -198,7 +233,7 @@ def test_usb_precheck_disconnected(mocked_call, capsys):

@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR)
def test_usb_precheck_error(mocked_call, capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "ERROR_USB_CHECK"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
Expand All @@ -212,7 +247,7 @@ def test_usb_precheck_error(mocked_call, capsys):

@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2)
def test_usb_precheck_error_2(mocked_call, capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "ERROR_USB_CHECK"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
Expand All @@ -226,12 +261,11 @@ def test_usb_precheck_error_2(mocked_call, capsys):

@mock.patch("subprocess.check_call")
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile")
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_ENCRYPTED"
with pytest.raises(SystemExit) as sysexit:
result = submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)
assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)

0 comments on commit bee4790

Please sign in to comment.