Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
Provides usb preflight checks
Browse files Browse the repository at this point in the history
Also check if a USB device is connected via `usb-test` action:
* returns `USB_OK` if a device is connected to sd-export Qube
* returns `ERROR_NO_USB` if a device is not connnected to sd-export Qube

Also checks if a Drive is LUKS-encrypted via `disk-test` action:
* returns `USB_ENCRYPTED_OK` if the usb device is LUKS-encrypted
* returns `USB_NOT_ENCRYPTED` if the usb device is not LUKS-encrypted
  • Loading branch information
emkll committed Jul 25, 2019
1 parent 5747419 commit 74d90f4
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 11 deletions.
45 changes: 43 additions & 2 deletions securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
ENCRYPTED_DEVICE = "encrypted_volume"
BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv"
BRLASER_PPD = "/usr/share/cups/model/br7030.ppd"

PCI_BUS_ID = "002:"

class Metadata(object):
"""
Object to parse, validate and store json metadata from the sd-export archive.
"""

METADATA_FILE = "metadata.json"
SUPPORTED_EXPORT_METHODS = ["disk", "printer", "printer-test"]
SUPPORTED_EXPORT_METHODS = [
"usb-test", # general preflight check
"disk",
"disk-test", # disk preflight test
"printer",
"printer-test" # print test page
]
SUPPORTED_ENCRYPTION_METHODS = ["luks"]

def __init__(self, archive_path):
Expand Down Expand Up @@ -119,6 +125,41 @@ def extract_tarball(self):
self.exit_gracefully(msg, e=e)


def check_usb_connected(self):
# Rely on the output of lsusb on the bus assigned to. We might need to make this variable configurable
# In the future and extracted from config.json
p = subprocess.check_output(["lsusb", "-s", PCI_BUS_ID])
# Empty string means a likely wrong PCI_BUS_ID
if p == "":
msg = "USB_CHECK_ERROR"
self.exit_gracefully(msg)
n_usb = len(p.rstrip().split("\n"))
# If there is one device, it is the root hub.
if n_usb == 1:
msg = "ERROR_NO_USB"
self.exit_gracefully(msg)
# If there are two devices, it's the root hub and another device (presumably for export)
elif n_usb == 2:
msg = "USB_OK"
self.exit_gracefully(msg)
# Else the result is unexpected
else:
msg = "USB_CHECK_ERROR"
self.exit_gracefully(msg)


def check_luks_volume(self):
try:
# cryptsetup isLuks returns 0 if the device is a luks volume
# subprocess with throw if the device is not luks (rc !=0)
p = subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE])
msg = "USB_ENCRYPTED_OK"
self.exit_gracefully(msg)
except subprocess.CalledProcessError as e:
msg = "USB_NOT_ENCRYPTED"
self.exit_gracefully(msg)


def unlock_luks_volume(self, encryption_key):
# the luks device is not already unlocked
if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)):
Expand Down
6 changes: 5 additions & 1 deletion securedrop_export/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ def __main__(submission):
submission.exit_gracefully(msg, e=e)

if submission.archive_metadata.is_valid():
if submission.archive_metadata.export_method == "disk":
if submission.archive_metadata.export_method == "usb-test":
submission.check_usb_connected()
elif submission.archive_metadata.export_method == "disk":
# exports all documents in the archive to luks-encrypted volume
submission.unlock_luks_volume(submission.archive_metadata.encryption_key)
submission.mount_volume()
submission.copy_submission()
elif submission.archive_metadata.export_method == "disk-test":
submission.check_luks_volume()
elif submission.archive_metadata.export_method == "printer":
# prints all documents in the archive
printer_uri = submission.get_printer_uri()
Expand Down
82 changes: 74 additions & 8 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@

import os
import pytest
import subprocess
import tempfile

from securedrop_export import export

SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa
SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa


# This below stanza is only necessary because the export code is not
# structured as a module. If a Python module were created called
# `export`, we could simply do `import export`
# path_to_script = os.path.join(
# os.path.dirname(os.path.abspath(__file__)), "send-to-usb"
# )
# securedropexport = imp.load_source("send-to-usb", path_to_script)
SAMPLE_OUTPUT_NO_USB="Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa
SAMPLE_OUTPUT_USB="Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa
SAMPLE_OUTPUT_USB_ERROR=""
SAMPLE_OUTPUT_USB_ERROR2="h\ne\nl\nl\no"


def test_exit_gracefully_no_exception(capsys):
Expand Down Expand Up @@ -169,3 +166,72 @@ def test_is_open_office_file(capsys, open_office_paths):
def test_is_not_open_office_file(capsys, open_office_paths):
submission = export.SDExport("")
assert not submission.is_open_office_file(open_office_paths)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB)
def test_usb_precheck_connected(mocked_call, capsys):
submission = export.SDExport("testfile")
expected_message = "ERROR_NO_USB"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
result = submission.check_usb_connected()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB)
def test_usb_precheck_disconnected(mocked_call, capsys):
submission = export.SDExport("testfile")
expected_message = "USB_OK"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
result = submission.check_usb_connected()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR)
def test_usb_precheck_error(mocked_call, capsys):
submission = export.SDExport("testfile")
expected_message = "USB_CHECK_ERROR"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
result = submission.check_usb_connected()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2)
def test_usb_precheck_error_2(mocked_call, capsys):
submission = export.SDExport("testfile")
expected_message = "USB_CHECK_ERROR"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
with pytest.raises(SystemExit) as sysexit:
result = submission.check_usb_connected()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_call")
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile")
expected_message = "USB_ENCRYPTED_OK"
with pytest.raises(SystemExit) as sysexit:
result = submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)
assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)

0 comments on commit 74d90f4

Please sign in to comment.