422 changes: 221 additions & 201 deletions qubes_config/tests/test_gtk_widgets.py

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions qubes_config/tests/test_new_qube/test_advanced_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ def test_advanced_handler(test_qapp, new_qube_builder):
assert not handler.launch_settings_check.get_active()
assert not handler.get_launch_settings()

handler._template_changed(None, 'fedora-35')
handler._template_changed(None, "fedora-35")

assert not handler.install_system_check.get_sensitive()
assert not handler.get_install_system()


# init ram

assert handler.initram.get_value() == 0
Expand All @@ -82,10 +81,10 @@ def test_advanced_handler(test_qapp, new_qube_builder):
# storage pool
assert handler.get_pool() is None
# defaults from conftest
assert handler.pool.get_active_id() == 'default (file)'
assert handler.pool.get_active_id() == "default (file)"

handler.pool.set_active_id('lvm')
assert handler.get_pool() == 'lvm'
handler.pool.set_active_id("lvm")
assert handler.get_pool() == "lvm"

handler.pool.set_active_id('default (file)')
handler.pool.set_active_id("default (file)")
assert handler.get_pool() is None
285 changes: 166 additions & 119 deletions qubes_config/tests/test_new_qube/test_application_selector.py

Large diffs are not rendered by default.

23 changes: 13 additions & 10 deletions qubes_config/tests/test_new_qube/test_network_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import qubesadmin
from ...new_qube.network_selector import NetworkSelector


def test_network_handler(test_qapp, new_qube_builder):
handler = NetworkSelector(new_qube_builder, test_qapp)

assert handler.get_selected_netvm() == qubesadmin.DEFAULT
assert handler.network_current_widget.vm == test_qapp.default_netvm
assert handler.network_current_widget.vm == test_qapp.domains['sys-net']
assert handler.network_current_widget.vm == test_qapp.domains["sys-net"]
assert not handler.network_current_none.get_visible()

# select none
Expand All @@ -39,13 +40,12 @@ def test_network_handler(test_qapp, new_qube_builder):
assert handler.network_current_none.get_visible()

handler.network_custom.set_active(True)
assert handler.get_selected_netvm() != test_qapp.domains['sys-net']
handler.network_modeler.select_value('sys-net')
assert handler.get_selected_netvm() != test_qapp.domains["sys-net"]
handler.network_modeler.select_value("sys-net")
assert not handler.network_current_none.get_visible()

assert handler.get_selected_netvm() == test_qapp.domains['sys-net']
assert handler.network_current_widget.vm == \
test_qapp.domains['sys-net']
assert handler.get_selected_netvm() == test_qapp.domains["sys-net"]
assert handler.network_current_widget.vm == test_qapp.domains["sys-net"]

handler.network_default.set_active(True)
assert handler.get_selected_netvm() == qubesadmin.DEFAULT
Expand All @@ -59,7 +59,10 @@ def test_network_handler_whonix(test_qapp_whonix, new_qube_builder):

handler.network_tor.set_active(True)

assert handler.get_selected_netvm() == \
test_qapp_whonix.domains['sys-whonix']
assert handler.network_current_widget.vm == \
test_qapp_whonix.domains['sys-whonix']
assert (
handler.get_selected_netvm() == test_qapp_whonix.domains["sys-whonix"]
)
assert (
handler.network_current_widget.vm
== test_qapp_whonix.domains["sys-whonix"]
)
402 changes: 244 additions & 158 deletions qubes_config/tests/test_new_qube/test_new_qube.py

Large diffs are not rendered by default.

124 changes: 64 additions & 60 deletions qubes_config/tests/test_new_qube/test_template_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,54 +27,56 @@
from ...new_qube.application_selector import ApplicationData

import gi
gi.require_version('Gtk', '3.0')
gi.require_version('GdkPixbuf', '2.0')

gi.require_version("Gtk", "3.0")
gi.require_version("GdkPixbuf", "2.0")
from gi.repository import Gtk


@patch('subprocess.check_output')
@patch("subprocess.check_output")
def test_template_handler_normal(mock_subprocess, test_qapp, new_qube_builder):
mock_subprocess.return_value = b''
mock_subprocess.return_value = b""
handler = TemplateHandler(new_qube_builder, test_qapp)

# assert we start at app
assert handler.selected_type == 'qube_type_app'
assert handler.selected_type == "qube_type_app"

# check selected template
assert handler.get_selected_template()
assert handler.get_selected_template() == test_qapp.default_template
assert handler.get_selected_template() == test_qapp.domains['fedora-36']
assert handler.get_selected_template() == test_qapp.domains["fedora-36"]

# select another
handler.select_template('fedora-35')
assert handler.get_selected_template() == test_qapp.domains['fedora-35']
handler.select_template("fedora-35")
assert handler.get_selected_template() == test_qapp.domains["fedora-35"]

assert handler.is_given_template_available(test_qapp.domains['fedora-36'])
assert handler.is_given_template_available(test_qapp.domains['fedora-35'])
assert not handler.is_given_template_available(test_qapp.domains['dom0'])
assert not handler.is_given_template_available(test_qapp.domains['test-vm'])
assert handler.is_given_template_available(test_qapp.domains["fedora-36"])
assert handler.is_given_template_available(test_qapp.domains["fedora-35"])
assert not handler.is_given_template_available(test_qapp.domains["dom0"])
assert not handler.is_given_template_available(test_qapp.domains["test-vm"])


@patch('subprocess.check_output')
@patch("subprocess.check_output")
def test_template_handler_none(mock_subprocess, test_qapp, new_qube_builder):
mock_subprocess.return_value = b''
mock_subprocess.return_value = b""
handler = TemplateHandler(new_qube_builder, test_qapp)

# assert we start at app
assert handler.selected_type == 'qube_type_app'
handler.change_vm_type('qube_type_template')
assert handler.selected_type == "qube_type_app"
handler.change_vm_type("qube_type_template")

# templates are available
assert handler.is_given_template_available(test_qapp.domains['fedora-36'])
assert handler.is_given_template_available(test_qapp.domains['fedora-35'])
assert handler.is_given_template_available(test_qapp.domains["fedora-36"])
assert handler.is_given_template_available(test_qapp.domains["fedora-35"])
assert not handler.is_given_template_available(
test_qapp.domains['test-standalone'])
assert not handler.is_given_template_available(test_qapp.domains['dom0'])
assert not handler.is_given_template_available(test_qapp.domains['test-vm'])
test_qapp.domains["test-standalone"]
)
assert not handler.is_given_template_available(test_qapp.domains["dom0"])
assert not handler.is_given_template_available(test_qapp.domains["test-vm"])

radio_none = new_qube_builder.get_object('radio_template_none')
radio_some = new_qube_builder.get_object('radio_template_template')
combo: Gtk.ComboBox = new_qube_builder.get_object('combo_template_template')
radio_none = new_qube_builder.get_object("radio_template_none")
radio_some = new_qube_builder.get_object("radio_template_template")
combo: Gtk.ComboBox = new_qube_builder.get_object("combo_template_template")

# none is selected
assert handler.get_selected_template() is None
Expand All @@ -92,96 +94,98 @@ def test_template_handler_none(mock_subprocess, test_qapp, new_qube_builder):

# select something
radio_some.set_active(True)
combo.set_active_id('fedora-36')
assert handler.get_selected_template() == test_qapp.domains['fedora-36']
combo.set_active_id("fedora-36")
assert handler.get_selected_template() == test_qapp.domains["fedora-36"]


@patch('subprocess.check_output')
def test_template_handler_select_vm(mock_subprocess,
test_qapp, new_qube_builder):
mock_subprocess.return_value = b''
@patch("subprocess.check_output")
def test_template_handler_select_vm(
mock_subprocess, test_qapp, new_qube_builder
):
mock_subprocess.return_value = b""
handler = TemplateHandler(new_qube_builder, test_qapp)

# assert we start at app
assert handler.selected_type == 'qube_type_app'
assert handler.selected_type == "qube_type_app"

# selecting template works
assert handler.get_selected_template() == test_qapp.domains['fedora-36']
handler.select_template('fedora-35')
assert handler.get_selected_template() == test_qapp.domains['fedora-35']
assert handler.get_selected_template() == test_qapp.domains["fedora-36"]
handler.select_template("fedora-35")
assert handler.get_selected_template() == test_qapp.domains["fedora-35"]

# switch to a type with None
handler.change_vm_type('qube_type_template')
handler.change_vm_type("qube_type_template")
assert handler.get_selected_template() is None

handler.select_template('fedora-36')
assert handler.get_selected_template() == test_qapp.domains['fedora-36']
handler.select_template("fedora-36")
assert handler.get_selected_template() == test_qapp.domains["fedora-36"]

handler.select_template(None)
assert handler.get_selected_template() is None


@patch('subprocess.check_output')
@patch("subprocess.check_output")
def test_get_appdata(mock_subprocess, test_qapp, new_qube_builder):
def mock_output(command):
vm_name = command[-1]
if vm_name == 'fedora-35':
return b'test.desktop|Test App|test desc'
return b''
if vm_name == "fedora-35":
return b"test.desktop|Test App|test desc"
return b""

mock_subprocess.side_effect = mock_output

handler = TemplateHandler(new_qube_builder, test_qapp)

fedora35 = test_qapp.domains['fedora-35']
testvm = test_qapp.domains['test-vm']
fedora35 = test_qapp.domains["fedora-35"]
testvm = test_qapp.domains["test-vm"]

assert handler.get_available_apps(testvm) == []
assert len(handler.get_available_apps(fedora35)) == 1

app_data: ApplicationData = handler.get_available_apps(fedora35)[0]

assert app_data.name == 'Test App'
assert app_data.ident == 'test.desktop'
assert app_data.name == "Test App"
assert app_data.ident == "test.desktop"
assert app_data.template == fedora35

assert handler.get_available_apps() == [app_data]


@patch('subprocess.check_output')
@patch("subprocess.check_output")
def test_template_emit_signal(mock_subprocess, test_qapp, new_qube_builder):
mock_subprocess.return_value = b''
mock_subprocess.return_value = b""
handler = TemplateHandler(new_qube_builder, test_qapp)

mock_emit = Mock()
handler.main_window.connect('template-changed', mock_emit)
handler.main_window.connect("template-changed", mock_emit)

handler.change_vm_type('qube_type_template')
handler.change_vm_type("qube_type_template")
mock_emit.assert_called_with(ANY, None)

radio_none = new_qube_builder.get_object('radio_template_none')
radio_some = new_qube_builder.get_object('radio_template_template')
combo: Gtk.ComboBox = new_qube_builder.get_object('combo_template_template')
radio_none = new_qube_builder.get_object("radio_template_none")
radio_some = new_qube_builder.get_object("radio_template_template")
combo: Gtk.ComboBox = new_qube_builder.get_object("combo_template_template")

radio_some.set_active(True)

# first available template
mock_emit.assert_called_with(ANY, 'fedora-35')
mock_emit.assert_called_with(ANY, "fedora-35")

combo.set_active_id('fedora-36')
assert handler.get_selected_template() == test_qapp.domains['fedora-36']
combo.set_active_id("fedora-36")
assert handler.get_selected_template() == test_qapp.domains["fedora-36"]

# two calls because comboboxes are weird
mock_emit.assert_called_with(ANY, 'fedora-36')
mock_emit.assert_called_with(ANY, "fedora-36")

radio_none.set_active(True)

mock_emit.assert_called_with(ANY, None)

handler.change_vm_type('qube_type_app')
handler.change_vm_type("qube_type_app")

# default template
mock_emit.assert_called_with(ANY, 'fedora-36')
mock_emit.assert_called_with(ANY, "fedora-36")

handler.select_template(test_qapp.domains['fedora-35'])
handler.select_template(test_qapp.domains["fedora-35"])

mock_emit.assert_called_with(ANY, 'fedora-35')
mock_emit.assert_called_with(ANY, "fedora-35")
445 changes: 266 additions & 179 deletions qubes_config/tests/test_policy_editor.py

Large diffs are not rendered by default.

334 changes: 202 additions & 132 deletions qubes_config/tests/test_policy_exceptions_handler.py

Large diffs are not rendered by default.

456 changes: 268 additions & 188 deletions qubes_config/tests/test_policy_handler.py

Large diffs are not rendered by default.

144 changes: 87 additions & 57 deletions qubes_config/tests/test_policy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,41 @@

def test_conflict_files():
def return_files(service_name):
if service_name == 'test':
if service_name == "test":
return ["a-test", "b-test", "c-test"]
return ['']
return [""]

manager = PolicyManager()
with patch("qubes_config.global_config.policy_manager."
"PolicyClient.policy_get_files") as mock_get:
with patch(
"qubes_config.global_config.policy_manager."
"PolicyClient.policy_get_files"
) as mock_get:
mock_get.side_effect = return_files

assert manager.get_conflicting_policy_files('test', 'd-test') == \
["a-test", "b-test", "c-test"]
assert manager.get_conflicting_policy_files('test', 'c-test') == \
["a-test", "b-test"]
assert manager.get_conflicting_policy_files('test', 'b-test') == \
["a-test"]
assert not manager.get_conflicting_policy_files('test', 'a-test')
assert not manager.get_conflicting_policy_files('other', 'test')

@patch("qubes_config.global_config.policy_manager."
"PolicyClient.policy_get")
@patch("qubes_config.global_config.policy_manager."
"PolicyClient.policy_replace")
assert manager.get_conflicting_policy_files("test", "d-test") == [
"a-test",
"b-test",
"c-test",
]
assert manager.get_conflicting_policy_files("test", "c-test") == [
"a-test",
"b-test",
]
assert manager.get_conflicting_policy_files("test", "b-test") == [
"a-test"
]
assert not manager.get_conflicting_policy_files("test", "a-test")
assert not manager.get_conflicting_policy_files("other", "test")


@patch("qubes_config.global_config.policy_manager.PolicyClient.policy_get")
@patch("qubes_config.global_config.policy_manager.PolicyClient.policy_replace")
def test_get_policy_from_file_new_no_default(mock_replace, mock_get):
manager = PolicyManager()

mock_get.side_effect = subprocess.CalledProcessError(2, 'test')
mock_get.side_effect = subprocess.CalledProcessError(2, "test")

assert manager.get_rules_from_filename('test', '') == ([], None)
assert manager.get_rules_from_filename("test", "") == ([], None)
assert not mock_replace.mock_calls


Expand All @@ -67,45 +74,46 @@ def __init__(self):
def policy_get(self, filename):
if filename in self.files:
return self.files[filename], filename
raise subprocess.CalledProcessError(2, 'test')
raise subprocess.CalledProcessError(2, "test")

def policy_replace(self, filename, text):
self.files[filename] = text

manager = PolicyManager()
manager.policy_client = MockPolicy()

test_default = 'Test\t*\t@anyvm\t@anyvm\tdeny'
test_default = "Test\t*\t@anyvm\t@anyvm\tdeny"

# token should be None (the file is new), but rules should be appropriate
got_rules, token = manager.get_rules_from_filename('test', test_default)
got_rules, token = manager.get_rules_from_filename("test", test_default)
assert token is None
assert len(got_rules) == 1
assert str(got_rules[0]) == test_default
assert 'test' not in manager.policy_client.files
assert "test" not in manager.policy_client.files


def test_get_policy_from_file_existing():
manager = PolicyManager()

rules = 'Test\t*\t@anyvm\t@anyvm\tdeny'
rules = "Test\t*\t@anyvm\t@anyvm\tdeny"

def get_file(filename):
if filename == 'test':
return rules, 'test'
return '', ''
if filename == "test":
return rules, "test"
return "", ""

with patch("qubes_config.global_config.policy_manager."
"PolicyClient.policy_get") as mock_get:
with patch(
"qubes_config.global_config.policy_manager.PolicyClient.policy_get"
) as mock_get:
mock_get.side_effect = get_file

got_rules, token = manager.get_rules_from_filename('test', '')
assert token == 'test'
got_rules, token = manager.get_rules_from_filename("test", "")
assert token == "test"
assert len(got_rules) == 1
assert str(got_rules[0]) == rules

got_rules, token = manager.get_rules_from_filename('test2', '')
assert token == ''
got_rules, token = manager.get_rules_from_filename("test2", "")
assert token == ""
assert len(got_rules) == 0


Expand All @@ -118,19 +126,26 @@ def test_compare_rules_to_text():
rule_text_3 = """Test * @anyvm @anyvm deny
Test * work @anyvm allow"""

rules_1 = [Rule.from_line(None, "Test * @anyvm @anyvm deny",
filepath=None, lineno=0)]
rules_1 = [
Rule.from_line(
None, "Test * @anyvm @anyvm deny", filepath=None, lineno=0
)
]
rules_2 = [
Rule.from_line(None, "Test * @anyvm @anyvm deny",
filepath=None, lineno=0),
Rule.from_line(None, "Test +Test2 work @anyvm allow",
filepath=None, lineno=0)
Rule.from_line(
None, "Test * @anyvm @anyvm deny", filepath=None, lineno=0
),
Rule.from_line(
None, "Test +Test2 work @anyvm allow", filepath=None, lineno=0
),
]
rules_3 = [
Rule.from_line(None, "Test * @anyvm @anyvm deny",
filepath=None, lineno=0),
Rule.from_line(None, "Test * work @anyvm allow",
filepath=None, lineno=0)
Rule.from_line(
None, "Test * @anyvm @anyvm deny", filepath=None, lineno=0
),
Rule.from_line(
None, "Test * work @anyvm allow", filepath=None, lineno=0
),
]

assert manager.compare_rules_to_text(rules_1, rule_text_1)
Expand All @@ -146,34 +161,49 @@ def test_new_rule():
manager = PolicyManager()

rule_1 = Rule.from_line(
None, "Test * @anyvm @anyvm deny", filepath=None, lineno=0)
None, "Test * @anyvm @anyvm deny", filepath=None, lineno=0
)
rule_2 = Rule.from_line(
None, "Test +Test @anyvm vault allow target=dom0",
filepath=None, lineno=0)
None,
"Test +Test @anyvm vault allow target=dom0",
filepath=None,
lineno=0,
)

assert str(manager.new_rule("Test", "@anyvm", "@anyvm", "deny")) == str(
rule_1
)
assert str(
manager.new_rule(
service="Test",
source="@anyvm",
target="vault",
action="allow target=dom0",
argument="+Test",
)
) == str(rule_2)

assert str(manager.new_rule('Test', '@anyvm', '@anyvm', 'deny')) == \
str(rule_1)
assert str(manager.new_rule(
service='Test', source='@anyvm', target='vault',
action='allow target=dom0', argument='+Test')) == str(rule_2)

def test_save_policy():
manager = PolicyManager()

rule_text = 'Test\t*\t@anyvm\t@anyvm\tdeny'
rule_text = "Test\t*\t@anyvm\t@anyvm\tdeny"
rule = Rule.from_line(
None, "Test * @anyvm @anyvm deny", filepath=None, lineno=0)
None, "Test * @anyvm @anyvm deny", filepath=None, lineno=0
)

def replace_file(file_name: str, new_text: str, _token):
if file_name == 'test':
if file_name == "test":
if not new_text.startswith(manager.policy_disclaimer):
assert False
if not rule_text in new_text:
assert False
return
assert False

with patch("qubes_config.global_config.policy_manager."
"PolicyClient.policy_replace") as mock_replace:
with patch(
"qubes_config.global_config.policy_manager."
"PolicyClient.policy_replace"
) as mock_replace:
mock_replace.side_effect = replace_file
manager.save_rules('test', [rule], 'any')
manager.save_rules("test", [rule], "any")
453 changes: 240 additions & 213 deletions qubes_config/tests/test_policy_rules.py

Large diffs are not rendered by default.

224 changes: 122 additions & 102 deletions qubes_config/tests/test_rule_list_widgets.py

Large diffs are not rendered by default.

1,053 changes: 663 additions & 390 deletions qubes_config/tests/test_update_handler.py

Large diffs are not rendered by default.

970 changes: 596 additions & 374 deletions qubes_config/tests/test_usb_devices.py

Large diffs are not rendered by default.

117 changes: 62 additions & 55 deletions qubes_config/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,84 +23,91 @@
import pytest

import qubesadmin.exc
from ..widgets.utils import apply_feature_change, get_boolean_feature, \
get_feature, apply_feature_change_from_widget, BiDictionary
from ..widgets.utils import (
apply_feature_change,
get_boolean_feature,
get_feature,
apply_feature_change_from_widget,
BiDictionary,
)


def test_get_feature(test_qapp):
"""Test if get feature methods behave correctly, in
particular when setting features to None and handling boolean features."""
feature_name = 'test_feature'
default_value = 'test'
vm = test_qapp.domains['test-vm']
feature_name = "test_feature"
default_value = "test"
vm = test_qapp.domains["test-vm"]

# missing feature
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Get',
feature_name, None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00'
("test-vm", "admin.vm.feature.Get", feature_name, None)
] = b"2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00"
assert get_feature(vm, feature_name, default_value) == default_value

# correct feature
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Get', feature_name, None)] = \
b'0\0value1'
assert get_feature(vm, feature_name, default_value) == 'value1'
("test-vm", "admin.vm.feature.Get", feature_name, None)
] = b"0\0value1"
assert get_feature(vm, feature_name, default_value) == "value1"

# boolean feature
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Get', feature_name, None)] = \
b'0\x001'
("test-vm", "admin.vm.feature.Get", feature_name, None)
] = b"0\x001"
assert get_boolean_feature(vm, feature_name, False) is True
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Get',
feature_name, None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00'
("test-vm", "admin.vm.feature.Get", feature_name, None)
] = b"2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00"
assert get_boolean_feature(vm, feature_name, True) is True

# set feature
call = ('test-vm', 'admin.vm.feature.Set', feature_name, b'1')
test_qapp.expected_calls[call] = b'0\0'
call = ("test-vm", "admin.vm.feature.Set", feature_name, b"1")
test_qapp.expected_calls[call] = b"0\0"
apply_feature_change(vm, feature_name, True)
assert call in test_qapp.actual_calls

call = ('test-vm', 'admin.vm.feature.Set', feature_name, b'text')
test_qapp.expected_calls[call] = b'0\0'
apply_feature_change(vm, feature_name, 'text')
call = ("test-vm", "admin.vm.feature.Set", feature_name, b"text")
test_qapp.expected_calls[call] = b"0\0"
apply_feature_change(vm, feature_name, "text")
assert call in test_qapp.actual_calls

test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.List', None, None)] = \
b'0\x00test_feature'
("test-vm", "admin.vm.feature.List", None, None)
] = b"0\x00test_feature"
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Remove', feature_name, None)] = \
b'0\x001'
("test-vm", "admin.vm.feature.Remove", feature_name, None)
] = b"0\x001"
apply_feature_change(vm, feature_name, None)
assert ('test-vm', 'admin.vm.feature.Remove', feature_name, None) \
in test_qapp.actual_calls
assert (
"test-vm",
"admin.vm.feature.Remove",
feature_name,
None,
) in test_qapp.actual_calls


def test_feature_unavailable(test_qapp):
feature_name = 'test_feature'
default_value = 'test'
vm = test_qapp.domains['test-vm']
feature_name = "test_feature"
default_value = "test"
vm = test_qapp.domains["test-vm"]

# missing feature
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Get',
feature_name, None)] = \
b'2\x00QubesDaemonAccessError\x00\x00Feature not available\x00'
("test-vm", "admin.vm.feature.Get", feature_name, None)
] = b"2\x00QubesDaemonAccessError\x00\x00Feature not available\x00"
assert get_feature(vm, feature_name, default_value) == default_value

with pytest.raises(qubesadmin.exc.QubesException):
test_qapp.expected_calls[('test-vm', 'admin.vm.feature.Set',
feature_name, b'1')] = \
b'2\x00QubesDaemonAccessError\x00\x00Feature not available\x00'
test_qapp.expected_calls[
("test-vm", "admin.vm.feature.Set", feature_name, b"1")
] = b"2\x00QubesDaemonAccessError\x00\x00Feature not available\x00"
apply_feature_change(vm, feature_name, True)


def test_apply_change_from_widget(test_qapp):
vm = test_qapp.domains['test-vm']
feature_name = 'test-feature'
vm = test_qapp.domains["test-vm"]
feature_name = "test-feature"

class MockWidget:
def __init__(self, changed, value):
Expand All @@ -118,36 +125,36 @@ def get_selected(self):

# set correctly
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Set', feature_name,
b'1')] = b'0\0'
("test-vm", "admin.vm.feature.Set", feature_name, b"1")
] = b"0\0"
apply_feature_change_from_widget(MockWidget(True, True), vm, feature_name)

test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Set', feature_name,
b'text')] = b'0\0'
apply_feature_change_from_widget(MockWidget(True, 'text'), vm, feature_name)
("test-vm", "admin.vm.feature.Set", feature_name, b"text")
] = b"0\0"
apply_feature_change_from_widget(MockWidget(True, "text"), vm, feature_name)

test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.List', None, None)] = \
b'0\x00other-feature'
("test-vm", "admin.vm.feature.List", None, None)
] = b"0\x00other-feature"
test_qapp.expected_calls[
('test-vm', 'admin.vm.feature.Remove', feature_name, None)] = \
b'0\x001'
("test-vm", "admin.vm.feature.Remove", feature_name, None)
] = b"0\x001"
apply_feature_change_from_widget(MockWidget(True, None), vm, feature_name)


def test_bidict():
d = {'a': 1, 'b': 2}
d = {"a": 1, "b": 2}

bidict = BiDictionary(d)
assert bidict.inverted == {1: 'a', 2: 'b'}
bidict['c'] = 3
assert bidict.inverted == {1: 'a', 2: 'b', 3: 'c'}
del bidict['a']
assert bidict.inverted == {2: 'b', 3: 'c'}
assert bidict.inverted == {1: "a", 2: "b"}
bidict["c"] = 3
assert bidict.inverted == {1: "a", 2: "b", 3: "c"}
del bidict["a"]
assert bidict.inverted == {2: "b", 3: "c"}
with pytest.raises(ValueError):
bidict['b'] = 3
bidict["b"] = 3

with pytest.raises(ValueError):
d = {'a': 1, 'b': 1}
d = {"a": 1, "b": 1}
BiDictionary(d)
135 changes: 77 additions & 58 deletions qubes_config/tests/test_vm_flowbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
from unittest.mock import patch

import gi
gi.require_version('Gtk', '3.0')
gi.require_version('GdkPixbuf', '2.0')

gi.require_version("Gtk", "3.0")
gi.require_version("GdkPixbuf", "2.0")
from gi.repository import Gtk

from ..global_config.vm_flowbox import VMFlowboxHandler, \
VMFlowBoxButton, PlaceholderText
from ..global_config.vm_flowbox import (
VMFlowboxHandler,
VMFlowBoxButton,
PlaceholderText,
)


def get_visible_vms(flowbox_handler: VMFlowboxHandler):
Expand All @@ -45,23 +49,23 @@ def get_visible_vms(flowbox_handler: VMFlowboxHandler):


def test_simple_flowbox_init_empty(test_qapp, test_builder):
flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', [])
flowbox_handler = VMFlowboxHandler(test_builder, test_qapp, "flowtest", [])
assert not flowbox_handler.is_changed()

assert len(flowbox_handler.flowbox.get_children()) == 1 # only placeholder
assert isinstance(flowbox_handler.flowbox.get_children()[0],
PlaceholderText)
assert len(flowbox_handler.flowbox.get_children()) == 1 # only placeholder
assert isinstance(
flowbox_handler.flowbox.get_children()[0], PlaceholderText
)
assert flowbox_handler.flowbox.get_children()[0].get_visible()
assert not flowbox_handler.add_box.get_visible()


def test_simple_flowbox_init_not_empty(test_qapp, test_builder):
initial_vms = [test_qapp.domains['test-vm'],
test_qapp.domains['test-blue']]
initial_vms = [test_qapp.domains["test-vm"], test_qapp.domains["test-blue"]]

flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', initial_vms=initial_vms)
test_builder, test_qapp, "flowtest", initial_vms=initial_vms
)

assert not flowbox_handler.is_changed()

Expand All @@ -73,49 +77,52 @@ def test_simple_flowbox_init_not_empty(test_qapp, test_builder):
assert sorted(initial_vms) == flowbox_handler.selected_vms


@patch('qubes_config.global_config.vm_flowbox.ask_question',
return_value=Gtk.ResponseType.YES)
@patch(
"qubes_config.global_config.vm_flowbox.ask_question",
return_value=Gtk.ResponseType.YES,
)
def test_flowbox_remove_button(mock_question, test_qapp, test_builder):
initial_vms = [test_qapp.domains['test-vm'],
test_qapp.domains['test-blue']]
initial_vms = [test_qapp.domains["test-vm"], test_qapp.domains["test-blue"]]

flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', initial_vms=initial_vms)
test_builder, test_qapp, "flowtest", initial_vms=initial_vms
)

# remove test-vm
assert not mock_question.mock_calls
for child in flowbox_handler.flowbox.get_children():
if isinstance(child, VMFlowBoxButton) and child.vm.name == 'test-vm':
if isinstance(child, VMFlowBoxButton) and child.vm.name == "test-vm":
child.get_child().clicked()
assert len(mock_question.mock_calls) == 1

assert get_visible_vms(flowbox_handler) == [test_qapp.domains['test-blue']]
assert flowbox_handler.selected_vms == [test_qapp.domains['test-blue']]
assert get_visible_vms(flowbox_handler) == [test_qapp.domains["test-blue"]]
assert flowbox_handler.selected_vms == [test_qapp.domains["test-blue"]]

# remove test-blue
for child in flowbox_handler.flowbox.get_children():
if isinstance(child, VMFlowBoxButton) and child.vm.name == 'test-blue':
if isinstance(child, VMFlowBoxButton) and child.vm.name == "test-blue":
child.get_child().clicked()
assert len(mock_question.mock_calls) == 2

assert not get_visible_vms(flowbox_handler)
assert not flowbox_handler.selected_vms


@patch('qubes_config.global_config.vm_flowbox.ask_question',
return_value=Gtk.ResponseType.NO)
@patch(
"qubes_config.global_config.vm_flowbox.ask_question",
return_value=Gtk.ResponseType.NO,
)
def test_flowbox_remove_button_no(mock_question, test_qapp, test_builder):
initial_vms = [test_qapp.domains['test-vm'],
test_qapp.domains['test-blue']]
initial_vms = [test_qapp.domains["test-vm"], test_qapp.domains["test-blue"]]

flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', initial_vms=initial_vms)
test_builder, test_qapp, "flowtest", initial_vms=initial_vms
)

# remove test-vm
assert not mock_question.mock_calls
for child in flowbox_handler.flowbox.get_children():
if isinstance(child,
VMFlowBoxButton) and child.vm.name == 'test-vm':
if isinstance(child, VMFlowBoxButton) and child.vm.name == "test-vm":
child.get_child().clicked()
assert len(mock_question.mock_calls) == 1

Expand All @@ -124,16 +131,17 @@ def test_flowbox_remove_button_no(mock_question, test_qapp, test_builder):


def test_flowbox_add_vm(test_qapp, test_builder):
initial_vms = [test_qapp.domains['test-vm']]
initial_vms = [test_qapp.domains["test-vm"]]

flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', initial_vms=initial_vms)
test_builder, test_qapp, "flowtest", initial_vms=initial_vms
)

# try to add a VM and abort
assert not flowbox_handler.add_box.get_visible()
flowbox_handler.add_button.clicked()
assert flowbox_handler.add_box.get_visible()
flowbox_handler.add_qube_model.select_value('test-blue')
flowbox_handler.add_qube_model.select_value("test-blue")
flowbox_handler.add_cancel.clicked()

assert not flowbox_handler.add_box.get_visible()
Expand All @@ -143,42 +151,48 @@ def test_flowbox_add_vm(test_qapp, test_builder):
# now try to add and do not abort
flowbox_handler.add_button.clicked()
assert flowbox_handler.add_box.get_visible()
flowbox_handler.add_qube_model.select_value('test-blue')
flowbox_handler.add_qube_model.select_value("test-blue")
flowbox_handler.add_confirm.clicked()

assert not flowbox_handler.add_box.get_visible()
expected_vms = sorted([test_qapp.domains['test-vm'],
test_qapp.domains['test-blue']])
expected_vms = sorted(
[test_qapp.domains["test-vm"], test_qapp.domains["test-blue"]]
)
assert sorted(flowbox_handler.selected_vms) == expected_vms
assert sorted(get_visible_vms(flowbox_handler)) == expected_vms

# now try to add something that's already selected
flowbox_handler.add_button.clicked()
assert flowbox_handler.add_box.get_visible()
flowbox_handler.add_qube_model.select_value('test-blue')
with patch('qubes_config.global_config.vm_flowbox.show_error') as \
mock_error:
flowbox_handler.add_qube_model.select_value("test-blue")
with patch(
"qubes_config.global_config.vm_flowbox.show_error"
) as mock_error:
assert not mock_error.mock_calls
flowbox_handler.add_confirm.clicked()
assert mock_error.mock_calls
# the box should not have hidden, maybe user wants to change selection
assert flowbox_handler.add_box.get_visible()
expected_vms = sorted([test_qapp.domains['test-vm'],
test_qapp.domains['test-blue']])
expected_vms = sorted(
[test_qapp.domains["test-vm"], test_qapp.domains["test-blue"]]
)
assert sorted(flowbox_handler.selected_vms) == expected_vms
assert sorted(get_visible_vms(flowbox_handler)) == expected_vms


@patch('qubes_config.global_config.vm_flowbox.ask_question',
return_value=Gtk.ResponseType.YES)
@patch(
"qubes_config.global_config.vm_flowbox.ask_question",
return_value=Gtk.ResponseType.YES,
)
def test_save_reset(_mock_question, test_qapp, test_builder):
test_vm = test_qapp.domains['test-vm']
blue_vm = test_qapp.domains['test-blue']
test_vm = test_qapp.domains["test-vm"]
blue_vm = test_qapp.domains["test-blue"]

initial_vms = [test_vm]

flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', initial_vms=initial_vms)
test_builder, test_qapp, "flowtest", initial_vms=initial_vms
)

assert not flowbox_handler.is_changed()

Expand All @@ -193,8 +207,7 @@ def test_save_reset(_mock_question, test_qapp, test_builder):

# remove added qube
for child in flowbox_handler.flowbox.get_children():
if isinstance(child,
VMFlowBoxButton) and child.vm == blue_vm:
if isinstance(child, VMFlowBoxButton) and child.vm == blue_vm:
child.get_child().clicked()
assert flowbox_handler.selected_vms == [test_vm]
assert get_visible_vms(flowbox_handler) == [test_vm]
Expand All @@ -204,8 +217,7 @@ def test_save_reset(_mock_question, test_qapp, test_builder):

# remove more
for child in flowbox_handler.flowbox.get_children():
if isinstance(child,
VMFlowBoxButton) and child.vm == test_vm:
if isinstance(child, VMFlowBoxButton) and child.vm == test_vm:
child.get_child().clicked()
assert not flowbox_handler.selected_vms
assert not get_visible_vms(flowbox_handler)
Expand All @@ -232,8 +244,10 @@ def test_save_reset(_mock_question, test_qapp, test_builder):

# remove all and save
for child in flowbox_handler.flowbox.get_children():
if isinstance(child,
VMFlowBoxButton) and child.vm in [blue_vm, test_vm]:
if isinstance(child, VMFlowBoxButton) and child.vm in [
blue_vm,
test_vm,
]:
child.get_child().clicked()
flowbox_handler.save()
assert not flowbox_handler.selected_vms
Expand All @@ -253,19 +267,23 @@ def test_save_reset(_mock_question, test_qapp, test_builder):


def test_flowbox_verify(test_qapp, test_builder):
test_vm = test_qapp.domains['test-vm']
red_vm = test_qapp.domains['test-red']
test_vm = test_qapp.domains["test-vm"]
red_vm = test_qapp.domains["test-red"]

initial_vms = [test_vm]

flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', initial_vms=initial_vms,
verification_callback=lambda vm: vm.name != 'test-blue')
test_builder,
test_qapp,
"flowtest",
initial_vms=initial_vms,
verification_callback=lambda vm: vm.name != "test-blue",
)

# attempt to add and see an erorr
flowbox_handler.add_button.clicked()
assert flowbox_handler.add_box.get_visible()
flowbox_handler.add_qube_model.select_value('test-blue')
flowbox_handler.add_qube_model.select_value("test-blue")
# vm will not be added, but the verification callback is responsible
# for messaging (it can propose additional actions)
flowbox_handler.add_confirm.clicked()
Expand All @@ -276,19 +294,20 @@ def test_flowbox_verify(test_qapp, test_builder):
# but adding correct stuff still works
flowbox_handler.add_button.clicked()
assert flowbox_handler.add_box.get_visible()
flowbox_handler.add_qube_model.select_value('test-red')
flowbox_handler.add_qube_model.select_value("test-red")
flowbox_handler.add_confirm.clicked()
assert flowbox_handler.selected_vms == [red_vm, test_vm]
assert get_visible_vms(flowbox_handler) == [red_vm, test_vm]
assert flowbox_handler.is_changed()


def test_flowbox_visibility(test_qapp, test_builder):
test_vm = test_qapp.domains['test-vm']
test_vm = test_qapp.domains["test-vm"]
initial_vms = [test_vm]

flowbox_handler = VMFlowboxHandler(
test_builder, test_qapp, 'flowtest', initial_vms=initial_vms)
test_builder, test_qapp, "flowtest", initial_vms=initial_vms
)

flowbox_handler.set_visible(True)
assert flowbox_handler.selected_vms == [test_vm]
Expand Down
100 changes: 57 additions & 43 deletions qubes_config/widgets/gtk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@
from typing import Dict, Union, Optional

import gi
gi.require_version('Gtk', '3.0')

gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, GdkPixbuf, GLib, Gdk

import gettext

t = gettext.translation("desktop-linux-manager", fallback=True)
_ = t.gettext

RESPONSES_OK = {
_('_OK'): Gtk.ResponseType.OK
}
RESPONSES_OK = {_("_OK"): Gtk.ResponseType.OK}

RESPONSES_YES_NO_CANCEL = {
_("_Yes"): Gtk.ResponseType.YES,
_("_No"): Gtk.ResponseType.NO,
_("_Cancel"): Gtk.ResponseType.CANCEL
_("_Cancel"): Gtk.ResponseType.CANCEL,
}

APPVIEWER_LOCK = "/var/run/qubes/appviewer.lock"
Expand All @@ -50,8 +50,9 @@
XEVENT = "/var/run/qubes/qubes-clipboard.bin.xevent"


def load_icon_at_gtk_size(icon_name,
icon_size: Gtk.IconSize = Gtk.IconSize.LARGE_TOOLBAR):
def load_icon_at_gtk_size(
icon_name, icon_size: Gtk.IconSize = Gtk.IconSize.LARGE_TOOLBAR
):
"""Load icon from provided name, if available. If not, attempt to treat
provided name as a path. If icon not found in any of the above ways,
load a blank icon of specified size, provided as Gtk.IconSize.
Expand All @@ -75,12 +76,14 @@ def load_icon(icon_name: str, width: int = 24, height: int = 24):
try:
# icon_name is a name
image: GdkPixbuf.Pixbuf = Gtk.IconTheme.get_default().load_icon(
icon_name, width, 0)
icon_name, width, 0
)
return image
except (TypeError, GLib.Error):
# icon not found in any way
pixbuf: GdkPixbuf.Pixbuf = GdkPixbuf.Pixbuf.new(
GdkPixbuf.Colorspace.RGB, True, 8, width, height)
GdkPixbuf.Colorspace.RGB, True, 8, width, height
)
pixbuf.fill(0x000)
return pixbuf

Expand All @@ -89,8 +92,13 @@ def show_error(parent, title, text):
"""
Helper function to display error messages.
"""
return show_dialog_with_icon(parent=parent, title=title, text=text,
buttons=RESPONSES_OK, icon_name="qubes-info")
return show_dialog_with_icon(
parent=parent,
title=title,
text=text,
buttons=RESPONSES_OK,
icon_name="qubes-info",
)


def ask_question(parent, title: str, text: str):
Expand All @@ -102,16 +110,16 @@ def ask_question(parent, title: str, text: str):
title=title,
text=text,
buttons=RESPONSES_YES_NO_CANCEL,
icon_name="qubes-ask"
icon_name="qubes-ask",
)


def show_dialog_with_icon(
parent: Optional[Gtk.Widget],
title: str,
text: Union[str, Gtk.Widget],
buttons: Dict[str, Gtk.ResponseType],
icon_name: str
parent: Optional[Gtk.Widget],
title: str,
text: Union[str, Gtk.Widget],
buttons: Dict[str, Gtk.ResponseType],
icon_name: str,
) -> Gtk.ResponseType:
"""
Helper function to show a dialog with icon given by name.
Expand All @@ -122,20 +130,21 @@ def show_dialog_with_icon(
dialog.destroy()
if response == Gtk.ResponseType.DELETE_EVENT:
if Gtk.ResponseType.CANCEL in buttons.values():
# treat exiting from the window as cancel if it's one of the
# available responses, then no if it's one of the available responses
# treat exiting from the window as cancel if it's one of the
# available responses, then no if it's one of the available
# responses
return Gtk.ResponseType.CANCEL
if Gtk.ResponseType.NO in buttons.values():
return Gtk.ResponseType.NO
return response


def show_dialog(
parent: Gtk.Widget,
title: str,
text: Union[str, Gtk.Widget],
buttons: Dict[str, Gtk.ResponseType],
widget: Gtk.Widget
parent: Gtk.Widget,
title: str,
text: Union[str, Gtk.Widget],
buttons: Dict[str, Gtk.ResponseType],
widget: Gtk.Widget,
) -> Gtk.ResponseType:
"""
Show a dialog.
Expand All @@ -156,19 +165,19 @@ def show_dialog(
for key, value in buttons.items():
button: Gtk.Button = dialog.add_button(key, value)
button.set_use_underline(True)
button.get_style_context().add_class('flat_button')
button.get_style_context().add_class("flat_button")
if value in [Gtk.ResponseType.YES, Gtk.ResponseType.OK]:
button.get_style_context().add_class('button_save')
button.get_style_context().add_class("button_save")
else:
button.get_style_context().add_class('button_cancel')
button.get_style_context().add_class("button_cancel")

dialog.set_title(title)

content_area: Gtk.Box = dialog.get_content_area()
content_area.get_style_context().add_class('modal_dialog')
content_area.get_style_context().add_class("modal_dialog")

box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
box.get_style_context().add_class('modal_contents')
box.get_style_context().add_class("modal_contents")
content_area.pack_start(box, False, False, 0)

box.pack_start(widget, False, False, 0)
Expand All @@ -188,11 +197,14 @@ def show_dialog(
return dialog


def load_theme(widget: Gtk.Widget, light_theme_path: Optional[str] = None,
dark_theme_path: Optional[str] = None,
package_name: Optional[str] = None,
light_file_name: Optional[str] = None,
dark_file_name: Optional[str] = None) -> Gtk.CssProvider:
def load_theme(
widget: Gtk.Widget,
light_theme_path: Optional[str] = None,
dark_theme_path: Optional[str] = None,
package_name: Optional[str] = None,
light_file_name: Optional[str] = None,
dark_file_name: Optional[str] = None,
) -> Gtk.CssProvider:
"""
Load a dark or light theme to current screen, based on widget's
current (system) defaults.
Expand Down Expand Up @@ -222,19 +234,21 @@ def load_theme(widget: Gtk.Widget, light_theme_path: Optional[str] = None,
provider = Gtk.CssProvider()
provider.load_from_path(path)
Gtk.StyleContext.add_provider_for_screen(
screen, provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
screen, provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION
)
return provider


def is_theme_light(widget):
"""Check if current theme is light or dark"""
style_context: Gtk.StyleContext = widget.get_style_context()
background_color: Gdk.RGBA = style_context.get_background_color(
Gtk.StateType.NORMAL)
text_color: Gdk.RGBA = style_context.get_color(
Gtk.StateType.NORMAL)
background_intensity = background_color.red + \
background_color.blue + background_color.green
Gtk.StateType.NORMAL
)
text_color: Gdk.RGBA = style_context.get_color(Gtk.StateType.NORMAL)
background_intensity = (
background_color.red + background_color.blue + background_color.green
)
text_intensity = text_color.red + text_color.blue + text_color.green

return text_intensity < background_intensity
Expand All @@ -255,9 +269,9 @@ def appviewer_lock():
def copy_to_global_clipboard(text: str):
"""Copy provided text to global clipboard"""
with appviewer_lock():
with open(DATA, "w", encoding='utf-8') as contents:
with open(DATA, "w", encoding="utf-8") as contents:
contents.write(text)
with open(FROM, "w", encoding='ascii') as source:
with open(FROM, "w", encoding="ascii") as source:
source.write("dom0")
with open(XEVENT, "w", encoding='ascii') as timestamp:
with open(XEVENT, "w", encoding="ascii") as timestamp:
timestamp.write(str(Gtk.get_current_event_time()))
226 changes: 131 additions & 95 deletions qubes_config/widgets/gtk_widgets.py

Large diffs are not rendered by default.

50 changes: 36 additions & 14 deletions qubes_config/widgets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from typing import Optional, Any, Dict, List

import gettext

t = gettext.translation("desktop-linux-manager", fallback=True)
_ = t.gettext

Expand All @@ -39,6 +40,7 @@ def get_feature(vm, feature_name, default_value=None):
except qubesadmin.exc.QubesDaemonAccessError:
return default_value


def get_boolean_feature(vm, feature_name, default=False):
"""helper function to get a feature converted to a Bool if it does exist.
Necessary because of the true/false in features being coded as 1/empty
Expand All @@ -50,16 +52,20 @@ def get_boolean_feature(vm, feature_name, default=False):
result = default
return result

def apply_feature_change_from_widget(widget, vm: qubesadmin.vm.QubesVM,
feature_name:str):

def apply_feature_change_from_widget(
widget, vm: qubesadmin.vm.QubesVM, feature_name: str
):
"""Change a feature value, taking into account weirdness with None.
Widget must support is_changed and get_selected methods."""
if widget.is_changed():
value = widget.get_selected()
apply_feature_change(vm, feature_name, value)

def apply_feature_change(vm: qubesadmin.vm.QubesVM,
feature_name: str, new_value: Optional[Any]):

def apply_feature_change(
vm: qubesadmin.vm.QubesVM, feature_name: str, new_value: Optional[Any]
):
"""Change a feature value, taking into account weirdness with None."""
try:
if new_value is None:
Expand All @@ -70,13 +76,17 @@ def apply_feature_change(vm: qubesadmin.vm.QubesVM,
except qubesadmin.exc.QubesDaemonAccessError:
# pylint: disable=raise-missing-from
raise qubesadmin.exc.QubesException(
_("Failed to set {feature_name} due to insufficient "
"permissions").format(feature_name=feature_name))
_(
"Failed to set {feature_name} due to insufficient "
"permissions"
).format(feature_name=feature_name)
)


class BiDictionary(dict):
"""Helper bi-directional dictionary. By design, duplicate values
cause errors."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.inverted: Dict[Any, Any] = {}
Expand All @@ -98,8 +108,9 @@ def __delitem__(self, key):
super().__delitem__(key)


def compare_rule_lists(rule_list_1: List[Rule],
rule_list_2: List[Rule]) -> bool:
def compare_rule_lists(
rule_list_1: List[Rule], rule_list_2: List[Rule]
) -> bool:
"""Check if two provided rule lists are the same. Return True if yes."""
if len(rule_list_1) != len(rule_list_2):
return False
Expand All @@ -108,17 +119,28 @@ def compare_rule_lists(rule_list_1: List[Rule],
return False
return True


def _open_url_in_dvm(url, default_dvm: qubesadmin.vm.QubesVM):
subprocess.run(
['qvm-run', '-p', '--service', f'--dispvm={default_dvm}',
'qubes.OpenURL'], input=url.encode(), check=False,
stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
[
"qvm-run",
"-p",
"--service",
f"--dispvm={default_dvm}",
"qubes.OpenURL",
],
input=url.encode(),
check=False,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)


def open_url_in_disposable(url: str, qapp: qubesadmin.Qubes):
"""Open provided url in disposable qube based on default disposable
template"""
default_dvm = qapp.default_dispvm
open_thread = threading.Thread(group=None,
target=_open_url_in_dvm,
args=[url, default_dvm])
open_thread = threading.Thread(
group=None, target=_open_url_in_dvm, args=[url, default_dvm]
)
open_thread.start()
155 changes: 85 additions & 70 deletions qui/decorators.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
#!/usr/bin/env python3
''' Decorators wrap a `qui.models.PropertiesModel` in a class
""" Decorators wrap a `qui.models.PropertiesModel` in a class
containing helpful representation methods.
'''
"""
# pylint: disable=wrong-import-position,import-error

import gi # isort:skip
gi.require_version('Gtk', '3.0') # isort:skip

gi.require_version("Gtk", "3.0") # isort:skip
from gi.repository import Gtk, Pango, GLib, GdkPixbuf # isort:skip
from qubesadmin import exc
from qubesadmin.utils import size_to_human

import gettext

t = gettext.translation("desktop-linux-manager", fallback=True)
_ = t.gettext


class PropertiesDecorator():
''' Base class for all decorators '''
class PropertiesDecorator:
"""Base class for all decorators"""

# pylint: disable=too-few-public-methods

Expand All @@ -27,13 +29,13 @@ def __init__(self, obj, margins=(5, 5)) -> None:
super().__init__()

def set_margins(self, widget):
''' Helper for setting the default margins on a widget '''
"""Helper for setting the default margins on a widget"""
widget.set_margin_left(self.margin_left)
widget.set_margin_right(self.margin_right)


class DomainDecorator(PropertiesDecorator):
''' Useful methods for domain data representation '''
"""Useful methods for domain data representation"""

def __init__(self, vm, margins=(5, 5)) -> None:
super().__init__(vm, margins)
Expand All @@ -56,18 +58,19 @@ def __init__(self, vm):
if self.vm:
self.label.set_label(self.vm.name)
else:
self.label.set_markup(_('<b>Qube</b>'))
self.label.set_markup(_("<b>Qube</b>"))
self.pack_start(self.label, False, False, 0)

self.outdated_icon = create_icon('outdated')
self.updateable_icon = create_icon('software-update-available')
self.outdated_icon = create_icon("outdated")
self.updateable_icon = create_icon("software-update-available")

self.outdated_icon.set_no_show_all(True)
self.updateable_icon.set_no_show_all(True)

self.updateable_icon.set_tooltip_text(_("Updates available"))
self.outdated_icon.set_tooltip_text(
_("Qube must be restarted to reflect changes in template"))
_("Qube must be restarted to reflect changes in template")
)

self.update_outdated(False)
self.update_updateable()
Expand All @@ -81,53 +84,60 @@ def update_outdated(self, state):
self.update_tooltip()

def update_updateable(self):
if self.vm is None or not getattr(self.vm, 'updateable', False):
if self.vm is None or not getattr(self.vm, "updateable", False):
return
try:
updates_state = self.vm.features.get('updates-available', False)
updates_state = self.vm.features.get("updates-available", False)
except exc.QubesException:
# no access to VM features
updates_state = False
self.updateable_icon.set_visible(updates_state)
self.updates_available = updates_state
self.update_tooltip()

def update_tooltip(self,
netvm_changed=False,
storage_changed=False):
def update_tooltip(self, netvm_changed=False, storage_changed=False):

if self.vm is None:
return

tooltip = f"<b>{self.vm.name}</b>"

if self.vm.klass == 'AdminVM':
if self.vm.klass == "AdminVM":

tooltip += _("\nAdministrative domain")

else:
if not self.template_name:
self.template_name = getattr(self.vm, 'template', None)
self.template_name = _("None") if not self.template_name \
self.template_name = getattr(self.vm, "template", None)
self.template_name = (
_("None")
if not self.template_name
else str(self.template_name)
)

if not self.netvm_name or netvm_changed:
self.netvm_name = getattr(self.vm, 'netvm',
_("permission denied"))
self.netvm_name = _("None") if not self.netvm_name \
self.netvm_name = getattr(
self.vm, "netvm", _("permission denied")
)
self.netvm_name = (
_("None")
if not self.netvm_name
else str(self.netvm_name)
)

if not self.cur_storage or storage_changed:
try:
self.cur_storage = \
self.vm.get_disk_utilization() / 1024 ** 3
self.cur_storage = (
self.vm.get_disk_utilization() / 1024**3
)
except (exc.QubesDaemonNoResponseError, KeyError):
self.cur_storage = 0

if not self.max_storage or storage_changed:
try:
self.max_storage = \
self.vm.volumes['private'].size / 1024 ** 3
self.max_storage = (
self.vm.volumes["private"].size / 1024**3
)
except (exc.QubesDaemonNoResponseError, KeyError):
self.max_storage = 0

Expand All @@ -136,20 +146,23 @@ def update_tooltip(self,
else:
perc_storage = self.cur_storage / self.max_storage

tooltip += \
_("\nTemplate: <b>{template}</b>"
"\nNetworking: <b>{netvm}</b>"
"\nPrivate storage: <b>{current_storage:.2f}GB/"
"{max_storage:.2f}GB ({perc_storage:.1%})</b>").format(
template=self.template_name,
netvm=self.netvm_name,
current_storage=self.cur_storage,
max_storage=self.max_storage,
perc_storage=perc_storage)
tooltip += _(
"\nTemplate: <b>{template}</b>"
"\nNetworking: <b>{netvm}</b>"
"\nPrivate storage: <b>{current_storage:.2f}GB/"
"{max_storage:.2f}GB ({perc_storage:.1%})</b>"
).format(
template=self.template_name,
netvm=self.netvm_name,
current_storage=self.cur_storage,
max_storage=self.max_storage,
perc_storage=perc_storage,
)

if self.outdated:
tooltip += _("\n\nRestart qube to "
"apply changes in template.")
tooltip += _(
"\n\nRestart qube to apply changes in template."
)

if self.updates_available:
tooltip += _("\n\nUpdates available.")
Expand All @@ -170,13 +183,16 @@ def __init__(self):

def update_state(self, cpu=0, header=False):
if header:
markup = _('<b>CPU</b>')
markup = _("<b>CPU</b>")
elif cpu > 0:
# pylint: disable=consider-using-f-string
markup = '{:3d}%'.format(cpu)
markup = "{:3d}%".format(cpu)
else:
color = self.cpu_label.get_style_context() \
.get_color(Gtk.StateFlags.INSENSITIVE).to_color()
color = (
self.cpu_label.get_style_context()
.get_color(Gtk.StateFlags.INSENSITIVE)
.to_color()
)
markup = f'<span color="{color.to_string()}">0%</span>'

self.cpu_label.set_markup(markup)
Expand All @@ -189,9 +205,9 @@ def __init__(self):

def update_state(self, memory=0, header=False):
if header:
markup = _('<b>RAM</b>')
markup = _("<b>RAM</b>")
else:
markup = f'{str(int(memory/1024))} MiB'
markup = f"{str(int(memory/1024))} MiB"

self.mem_label.set_markup(markup)

Expand All @@ -208,25 +224,24 @@ def cpu(self):
return cpu_widget

def icon(self) -> Gtk.Image:
''' Returns a `Gtk.Image` containing the colored lock icon '''
if self.vm is None: # should not be called
"""Returns a `Gtk.Image` containing the colored lock icon"""
if self.vm is None: # should not be called
return None
try:
# this is a temporary, emergency fix for unexecpected conflict with
# qui-devices rewrite
icon = getattr(self.vm, 'icon', self.vm.label.icon)
icon = getattr(self.vm, "icon", self.vm.label.icon)
except exc.QubesDaemonCommunicationError:
# no permission to access icon
icon = 'appvm-black'
icon_vm = Gtk.IconTheme.get_default().load_icon(
icon, 16, 0)
icon = "appvm-black"
icon_vm = Gtk.IconTheme.get_default().load_icon(icon, 16, 0)
icon_img = Gtk.Image.new_from_pixbuf(icon_vm)
return icon_img

def netvm(self) -> Gtk.Label:
netvm = getattr(self.vm, 'netvm', _("permission denied"))
netvm = getattr(self.vm, "netvm", _("permission denied"))
if netvm is None:
label = Gtk.Label(_('No'), xalign=0)
label = Gtk.Label(_("No"), xalign=0)
else:
label = Gtk.Label(str(netvm), xalign=0)

Expand All @@ -235,30 +250,30 @@ def netvm(self) -> Gtk.Label:


def device_hbox(device) -> Gtk.Box:
''' Returns a :class:`Gtk.Box` containing the device name & icon.. '''
if device.devclass == 'block':
icon = 'drive-removable-media'
elif device.devclass == 'mic':
icon = 'audio-input-microphone'
elif device.devclass == 'usb':
icon = 'generic-usb'
"""Returns a :class:`Gtk.Box` containing the device name & icon.."""
if device.devclass == "block":
icon = "drive-removable-media"
elif device.devclass == "mic":
icon = "audio-input-microphone"
elif device.devclass == "usb":
icon = "generic-usb"
else:
icon = 'emblem-important'
icon = "emblem-important"
dev_icon = create_icon(icon)

name_label = Gtk.Label(xalign=0)
name = f"{device.backend_domain}:{device.port_id} - {device.description}"
if device.attachments:
dev_list = ", ".join(list(device.attachments))
name_label.set_markup(f'<b>{name} ({dev_list})</b>')
name_label.set_markup(f"<b>{name} ({dev_list})</b>")
else:
name_label.set_text(name)
name_label.set_max_width_chars(64)
name_label.set_ellipsize(Pango.EllipsizeMode.END)

size_label = Gtk.Label(xalign=1)
if device.devclass == 'block' and 'size' in device.data:
size_label.set_text(size_to_human(int(device.data['size'])))
if device.devclass == "block" and "size" in device.data:
size_label.set_text(size_to_human(int(device.data["size"])))

hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
hbox.pack_start(name_label, True, True, 0)
Expand All @@ -273,15 +288,15 @@ def device_domain_hbox(vm, attached: bool) -> Gtk.Box:
# hbox.pack_start(label, True, True, 5)

if attached:
eject_icon = create_icon('media-eject')
eject_icon = create_icon("media-eject")
hbox.pack_start(eject_icon, False, False, 5)
else:
add_icon = create_icon('list-add')
add_icon = create_icon("list-add")
hbox.pack_start(add_icon, False, False, 5)

name = Gtk.Label(xalign=0)
if attached:
name.set_markup(f'<b>{vm.vm_name}</b>')
name.set_markup(f"<b>{vm.vm_name}</b>")
else:
name.set_text(vm.vm_name)

Expand All @@ -290,11 +305,11 @@ def device_domain_hbox(vm, attached: bool) -> Gtk.Box:


def create_icon(name) -> Gtk.Image:
"""" Create an icon from string; tries for both the normal and -symbolic
variants, because some themes only have the symbolic variant. If not
found, outputs a blank icon."""
"""Create an icon from string; tries for both the normal and -symbolic
variants, because some themes only have the symbolic variant. If not
found, outputs a blank icon."""

names = [name, f'{name}-symbolic']
names = [name, f"{name}-symbolic"]
pixbuf = None
for icon_name in names:
try:
Expand Down
168 changes: 106 additions & 62 deletions qui/devices/actionable_widgets.py

Large diffs are not rendered by default.

157 changes: 85 additions & 72 deletions qui/devices/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
from qubesadmin.device_protocol import DeviceAssignment

import gi
gi.require_version('Gtk', '3.0') # isort:skip

gi.require_version("Gtk", "3.0") # isort:skip
from gi.repository import Gtk, Gio # isort:skip

import gettext

t = gettext.translation("desktop-linux-manager", fallback=True)
_ = t.gettext

Expand All @@ -39,6 +41,7 @@ class VM:
"""
Wrapper for various VMs that can serve as backend/frontend
"""

def __init__(self, vm: qubesadmin.vm.QubesVM):
self.__hash = hash(vm)
self._vm = vm
Expand All @@ -61,23 +64,23 @@ def __hash__(self):
def icon_name(self):
"""Name of the VM icon"""
try:
return getattr(self._vm, 'icon', self._vm.label.icon)
return getattr(self._vm, "icon", self._vm.label.icon)
except qubesadmin.exc.QubesException:
return 'appvm-black'
return "appvm-black"

@property
def is_dispvm_template(self) -> bool:
"""
Is this VM a dispvm template?
"""
return getattr(self._vm, 'template_for_dispvms', False)
return getattr(self._vm, "template_for_dispvms", False)

@property
def is_attachable(self) -> bool:
"""
Should this VM be listed as possible attachment target in the GUI?
"""
return self.vm_class != 'AdminVM' and self._vm.is_running()
return self.vm_class != "AdminVM" and self._vm.is_running()

@property
def vm_object(self):
Expand All @@ -95,36 +98,38 @@ def should_be_cleaned_up(self):


class Device:
def __init__(self, dev: qubesadmin.devices.DeviceInfo,
gtk_app: Gtk.Application):
def __init__(
self, dev: qubesadmin.devices.DeviceInfo, gtk_app: Gtk.Application
):
self.gtk_app: Gtk.Application = gtk_app
self._dev: qubesadmin.devices.DeviceInfo = dev
self.__hash = hash(dev)
self._port: str = ''
self._port: str = ""
# Monotonic connection timestamp only for new devices
self.connection_timestamp: float = None

self._dev_name: str = getattr(dev, 'description', 'unknown')
if dev.devclass == 'block' and 'size' in dev.data:
self._dev_name += " (" + size_to_human(int(dev.data['size'])) + ")"
self._dev_name: str = getattr(dev, "description", "unknown")
if dev.devclass == "block" and "size" in dev.data:
self._dev_name += " (" + size_to_human(int(dev.data["size"])) + ")"

self._ident: str = getattr(dev, 'port_id', 'unknown')
self._description: str = getattr(dev, 'description', 'unknown')
self._devclass: str = getattr(dev, 'devclass', 'unknown')
self._data: Dict = getattr(dev, 'data', {})
self._device_id = getattr(dev, 'device_id', '*')
self._ident: str = getattr(dev, "port_id", "unknown")
self._description: str = getattr(dev, "description", "unknown")
self._devclass: str = getattr(dev, "devclass", "unknown")
self._data: Dict = getattr(dev, "data", {})
self._device_id = getattr(dev, "device_id", "*")
self.attachments: Set[VM] = set()
backend_domain = getattr(dev, 'backend_domain', None)
backend_domain = getattr(dev, "backend_domain", None)
if backend_domain:
self._backend_domain: Optional[VM] = VM(backend_domain)
else:
self._backend_domain: Optional[VM] = None

try:
self.vm_icon: str = getattr(dev.backend_domain, 'icon',
dev.backend_domain.label.icon)
self.vm_icon: str = getattr(
dev.backend_domain, "icon", dev.backend_domain.label.icon
)
except qubesadmin.exc.QubesException:
self.vm_icon: str = 'appvm-black'
self.vm_icon: str = "appvm-black"

def __str__(self):
return self._dev_name
Expand Down Expand Up @@ -163,11 +168,11 @@ def device_class(self) -> str:
@property
def device_icon(self) -> str:
"""Device icon"""
if self.device_class == 'block':
return 'harddrive'
if self.device_class == 'mic':
return 'mic'
return ''
if self.device_class == "block":
return "harddrive"
if self.device_class == "mic":
return "mic"
return ""

@property
def backend_domain(self) -> Optional[VM]:
Expand All @@ -187,41 +192,41 @@ def notification_id(self) -> str:
@property
def device_group(self) -> str:
"""Device group for purposes of menus."""
if self._devclass == 'block':
return 'Data (Block) Devices'
if self._devclass == 'usb':
return 'USB Devices'
if self._devclass == 'mic':
return 'Microphones'
if self._devclass == "block":
return "Data (Block) Devices"
if self._devclass == "usb":
return "USB Devices"
if self._devclass == "mic":
return "Microphones"
# TODO: those below come from new API, may need an update
if self._devclass == 'Other':
return 'Other Devices'
if self._devclass == 'Communication':
return 'Other Devices' # eg. modems
if self._devclass in ('Input', 'Keyboard', 'Mouse'):
return 'Input Devices'
if self._devclass in ('Printer', 'Scanner'):
if self._devclass == "Other":
return "Other Devices"
if self._devclass == "Communication":
return "Other Devices" # eg. modems
if self._devclass in ("Input", "Keyboard", "Mouse"):
return "Input Devices"
if self._devclass in ("Printer", "Scanner"):
return "Printers and Scanners"
if self._devclass == 'Multimedia':
return 'Other Devices'
if self._devclass == "Multimedia":
return "Other Devices"
# Multimedia = Audio, Video, Displays etc.
if self._devclass == 'Wireless':
return 'Other Devices'
if self._devclass == 'Bluetooth':
return 'Bluetooth Devices'
if self._devclass == 'Mass_Data':
return 'Other Devices'
if self._devclass == 'Network':
return 'Other Devices'
if self._devclass == 'Memory':
return 'Other Devices'
if self._devclass.startswith('PCI'):
return 'PCI Devices'
if self._devclass == 'Docking Station':
return 'Docking Station'
if self._devclass == 'Processor':
return 'Other Devices'
return 'Other Devices'
if self._devclass == "Wireless":
return "Other Devices"
if self._devclass == "Bluetooth":
return "Bluetooth Devices"
if self._devclass == "Mass_Data":
return "Other Devices"
if self._devclass == "Network":
return "Other Devices"
if self._devclass == "Memory":
return "Other Devices"
if self._devclass.startswith("PCI"):
return "PCI Devices"
if self._devclass == "Docking Station":
return "Docking Station"
if self._devclass == "Processor":
return "Other Devices"
return "Other Devices"

@property
def sorting_key(self) -> str:
Expand All @@ -233,27 +238,31 @@ def attach_to_vm(self, vm: VM):
Perform attachment to provided VM.
"""
try:
assignment = DeviceAssignment.new(self.backend_domain,
port_id=self.id_string, devclass=self.device_class,
device_id=self._device_id)
assignment = DeviceAssignment.new(
self.backend_domain,
port_id=self.id_string,
devclass=self.device_class,
device_id=self._device_id,
)

vm.vm_object.devices[self.device_class].attach(assignment)
self.gtk_app.emit_notification(
_("Attaching device"),
_("Attaching {} to {}").format(self.description, vm),
Gio.NotificationPriority.NORMAL,
notification_id=self.notification_id)
notification_id=self.notification_id,
)

except Exception as ex: # pylint: disable=broad-except
except Exception as ex: # pylint: disable=broad-except
self.gtk_app.emit_notification(
_("Error"),
_("Attaching device {0} to {1} failed. "
"Error: {2} - {3}").format(
self.description, vm, type(ex).__name__,
ex),
_(
"Attaching device {0} to {1} failed. Error: {2} - {3}"
).format(self.description, vm, type(ex).__name__, ex),
Gio.NotificationPriority.HIGH,
error=True,
notification_id=self.notification_id)
notification_id=self.notification_id,
)

def detach_from_vm(self, vm: VM):
"""
Expand All @@ -263,19 +272,23 @@ def detach_from_vm(self, vm: VM):
_("Detaching device"),
_("Detaching {} from {}").format(self.description, vm),
Gio.NotificationPriority.NORMAL,
notification_id=self.notification_id)
notification_id=self.notification_id,
)
try:
assignment = DeviceAssignment.new(
self.backend_domain, self._ident, self.device_class)
self.backend_domain, self._ident, self.device_class
)
vm.vm_object.devices[self.device_class].detach(assignment)
except qubesadmin.exc.QubesException as ex:
self.gtk_app.emit_notification(
_("Error"),
_("Detaching device {0} from {1} failed. "
"Error: {2}").format(self.description, vm, ex),
_("Detaching device {0} from {1} failed. Error: {2}").format(
self.description, vm, ex
),
Gio.NotificationPriority.HIGH,
error=True,
notification_id=self.notification_id)
notification_id=self.notification_id,
)

def detach_from_all(self):
"""
Expand Down
145 changes: 86 additions & 59 deletions qui/devices/device_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import qui.utils

import gi
gi.require_version('Gtk', '3.0') # isort:skip

gi.require_version("Gtk", "3.0") # isort:skip
from gi.repository import Gtk, Gdk, Gio # isort:skip

from qui.devices import backend
Expand All @@ -43,34 +44,42 @@
from qubes_config.widgets.gtk_utils import is_theme_light

import gbulb

gbulb.install()

import gettext

t = gettext.translation("desktop-linux-manager", fallback=True)
_ = t.gettext


# FUTURE: this should be moved to backend with new API changes
DEV_TYPES = ['block', 'usb', 'mic']
DEV_TYPES = ["block", "usb", "mic"]


class DeviceMenu(Gtk.Menu):
"""Menu for handling a single device"""
def __init__(self, main_item: actionable_widgets.MainDeviceWidget,
vms: List[backend.VM],
dispvm_templates: List[backend.VM]):

def __init__(
self,
main_item: actionable_widgets.MainDeviceWidget,
vms: List[backend.VM],
dispvm_templates: List[backend.VM],
):
super().__init__()

for child_widget in main_item.get_child_widgets(vms, dispvm_templates):
item = actionable_widgets.generate_wrapper_widget(
Gtk.MenuItem, 'activate', child_widget)
Gtk.MenuItem, "activate", child_widget
)
self.add(item)

self.show_all()


class DevicesTray(Gtk.Application):
"""Tray application for handling devices."""

def __init__(self, app_name, qapp, dispatcher):
super().__init__()
self.name: str = app_name
Expand All @@ -90,32 +99,38 @@ def __init__(self, app_name, qapp, dispatcher):
self.initialize_dev_data()

for devclass in DEV_TYPES:
self.dispatcher.add_handler('device-attach:' + devclass,
self.device_attached)
self.dispatcher.add_handler('device-detach:' + devclass,
self.device_detached)
self.dispatcher.add_handler('device-list-change:' + devclass,
self.device_list_update)

self.dispatcher.add_handler('domain-shutdown',
self.vm_shutdown)
self.dispatcher.add_handler('domain-start-failed',
self.vm_shutdown)
self.dispatcher.add_handler('domain-start', self.vm_start)

self.dispatcher.add_handler('property-set:template_for_dispvms',
self.vm_dispvm_template_change)

self.dispatcher.add_handler('property-reset:template_for_dispvms',
self.vm_dispvm_template_change)
self.dispatcher.add_handler('property-del:template_for_dispvms',
self.vm_dispvm_template_change)
self.dispatcher.add_handler(
"device-attach:" + devclass, self.device_attached
)
self.dispatcher.add_handler(
"device-detach:" + devclass, self.device_detached
)
self.dispatcher.add_handler(
"device-list-change:" + devclass, self.device_list_update
)

self.dispatcher.add_handler("domain-shutdown", self.vm_shutdown)
self.dispatcher.add_handler("domain-start-failed", self.vm_shutdown)
self.dispatcher.add_handler("domain-start", self.vm_start)

self.dispatcher.add_handler(
"property-set:template_for_dispvms", self.vm_dispvm_template_change
)

self.dispatcher.add_handler(
"property-reset:template_for_dispvms",
self.vm_dispvm_template_change,
)
self.dispatcher.add_handler(
"property-del:template_for_dispvms", self.vm_dispvm_template_change
)

self.widget_icon = Gtk.StatusIcon()
self.widget_icon.set_from_icon_name('qubes-devices')
self.widget_icon.connect('button-press-event', self.show_menu)
self.widget_icon.set_from_icon_name("qubes-devices")
self.widget_icon.connect("button-press-event", self.show_menu)
self.widget_icon.set_tooltip_markup(
'<b>Qubes Devices</b>\nView and manage devices.')
"<b>Qubes Devices</b>\nView and manage devices."
)

def device_list_update(self, vm, _event, **_kwargs):

Expand All @@ -126,7 +141,8 @@ def device_list_update(self, vm, _event, **_kwargs):
for devclass in DEV_TYPES:
for device in vm.devices[devclass]:
changed_devices[str(device.port)] = backend.Device(
device, self)
device, self
)

except qubesadmin.exc.QubesException:
changed_devices = {} # VM was removed
Expand All @@ -139,7 +155,8 @@ def device_list_update(self, vm, _event, **_kwargs):
_("Device available"),
_("Device {} is available.").format(dev.description),
Gio.NotificationPriority.NORMAL,
notification_id=dev.notification_id)
notification_id=dev.notification_id,
)

dev_to_remove = []
for dev_port, dev in self.devices.items():
Expand All @@ -153,7 +170,8 @@ def device_list_update(self, vm, _event, **_kwargs):
_("Device removed"),
_("Device {} has been removed.").format(dev.description),
Gio.NotificationPriority.NORMAL,
notification_id=dev.notification_id)
notification_id=dev.notification_id,
)
del self.devices[dev_port]

def initialize_vm_data(self):
Expand All @@ -175,7 +193,8 @@ def initialize_dev_data(self):
try:
for device in domain.devices[devclass]:
self.devices[str(device.port)] = backend.Device(
device, self)
device, self
)
except qubesadmin.exc.QubesException:
# we have no permission to access VM's devices
continue
Expand All @@ -184,15 +203,17 @@ def initialize_dev_data(self):
for domain in self.qapp.domains:
for devclass in DEV_TYPES:
try:
for device in domain.devices[devclass
].get_attached_devices():
for device in domain.devices[
devclass
].get_attached_devices():
dev = str(device.port)
if dev in self.devices:
# occassionally ghost UnknownDevices appear when a
# device was removed but not detached from a VM
# FUTURE: is this still true after api changes?
self.devices[dev].attachments.add(
backend.VM(domain))
backend.VM(domain)
)
except qubesadmin.exc.QubesException:
# we have no permission to access VM's devices
continue
Expand Down Expand Up @@ -256,6 +277,7 @@ def vm_dispvm_template_change(self, vm, _event, **_kwargs):
self.dispvm_templates.add(wrapped_vm)
else:
self.dispvm_templates.discard(wrapped_vm)

#
# def on_label_changed(self, vm, _event, **_kwargs):
# if not vm: # global properties changed
Expand Down Expand Up @@ -284,16 +306,18 @@ def load_css(widget) -> str:
because it needs a realized widget.
Returns light/dark variant used currently as 'light' or 'dark' string.
"""
theme = 'light' if is_theme_light(widget) else 'dark'
theme = "light" if is_theme_light(widget) else "dark"
screen = Gdk.Screen.get_default()
provider = Gtk.CssProvider()
css_file_ref = (importlib.resources.files('qui') /
f'qubes-devices-{theme}.css')
css_file_ref = (
importlib.resources.files("qui") / f"qubes-devices-{theme}.css"
)
with importlib.resources.as_file(css_file_ref) as css_file:
provider.load_from_path(str(css_file))

Gtk.StyleContext.add_provider_for_screen(
screen, provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
screen, provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION
)

return theme

Expand All @@ -307,23 +331,24 @@ def show_menu(self, _unused, _event):
menu_items = []
sorted_vms = sorted(self.vms)
sorted_dispvms = sorted(self.dispvm_templates)
sorted_devices = sorted(self.devices.values(),
key=lambda x: x.sorting_key)
sorted_devices = sorted(
self.devices.values(), key=lambda x: x.sorting_key
)

for i, dev in enumerate(sorted_devices):
if i == 0 or dev.device_group != sorted_devices[i - 1].device_group:
# add a header
menu_item = \
actionable_widgets.generate_wrapper_widget(
Gtk.MenuItem,
'activate',
actionable_widgets.InfoHeader(dev.device_group))
menu_item = actionable_widgets.generate_wrapper_widget(
Gtk.MenuItem,
"activate",
actionable_widgets.InfoHeader(dev.device_group),
)
menu_items.append(menu_item)

device_widget = actionable_widgets.MainDeviceWidget(dev, theme)
device_item = \
actionable_widgets.generate_wrapper_widget(
Gtk.MenuItem, 'activate', device_widget)
device_item = actionable_widgets.generate_wrapper_widget(
Gtk.MenuItem, "activate", device_widget
)
device_item.set_reserve_indicator(False)

device_menu = DeviceMenu(device_widget, sorted_vms, sorted_dispvms)
Expand All @@ -338,15 +363,16 @@ def show_menu(self, _unused, _event):
tray_menu.show_all()
tray_menu.popup_at_pointer(None) # use current event

def emit_notification(self, title, message, priority, error=False,
notification_id=None):
def emit_notification(
self, title, message, priority, error=False, notification_id=None
):
notification = Gio.Notification.new(title)
notification.set_body(message)
notification.set_priority(priority)
if error:
notification.set_icon(Gio.ThemedIcon.new('dialog-error'))
notification.set_icon(Gio.ThemedIcon.new("dialog-error"))
if notification_id:
notification_id += 'ERROR'
notification_id += "ERROR"
self.send_notification(notification_id, notification)


Expand All @@ -355,16 +381,17 @@ def main():
# qapp = qubesadmin.tests.mock_app.MockQubesComplete()
dispatcher = qubesadmin.events.EventsDispatcher(qapp)
# dispatcher = qubesadmin.tests.mock_app.MockDispatcher(qapp)
app = DevicesTray(
'org.qubes.qui.tray.Devices', qapp, dispatcher)
app = DevicesTray("org.qubes.qui.tray.Devices", qapp, dispatcher)

loop = asyncio.get_event_loop()
return_code = qui.utils.run_asyncio_and_show_errors(
loop, [asyncio.ensure_future(dispatcher.listen_for_events())],
"Qubes Devices Widget")
loop,
[asyncio.ensure_future(dispatcher.listen_for_events())],
"Qubes Devices Widget",
)
del app
return return_code


if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(main())
49 changes: 27 additions & 22 deletions qui/tests/tests_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import qui.tray.domains as domains_widget
from qubesadmin import Qubes


class DomainsWidgetTest(unittest.TestCase):

def setUp(self):
super(DomainsWidgetTest, self).setUp()

self.widget = domains_widget.DomainTray('org.qubes.ui.tray.Domains')
self.widget = domains_widget.DomainTray("org.qubes.ui.tray.Domains")
self.widget.initialize_menu()

self.qapp = Qubes()
Expand All @@ -45,23 +46,26 @@ def test_01_correct_vm_state(self):
# are all running VMs listed
domains_in_widget = []
for menu_item in self.widget.tray_menu:
domain = self.qapp.domains[menu_item.vm['name']]
domain = self.qapp.domains[menu_item.vm["name"]]
domains_in_widget.append(domain)
self.assertTrue(domain.is_running(),
"halted domain listed incorrectly")
self.assertTrue(
domain.is_running(), "halted domain listed incorrectly"
)
for domain in self.qapp.domains:
if domain.klass != 'AdminVM':
self.assertEqual(domain in domains_in_widget,
domain.is_running(),
"domain missing from list")
if domain.klass != "AdminVM":
self.assertEqual(
domain in domains_in_widget,
domain.is_running(),
"domain missing from list",
)

def test_02_stop_vm(self):
domain_to_stop = self.qapp.domains['test-running']
domain_to_stop = self.qapp.domains["test-running"]

if not domain_to_stop.is_running():
domain_to_stop.start()
while domain_to_stop.get_power_state() != 'Running':
time.sleep(1)
while domain_to_stop.get_power_state() != "Running":
time.sleep(1)
time.sleep(10)

menu_item = self.__find_menu_item(domain_to_stop)
Expand All @@ -71,7 +75,7 @@ def test_02_stop_vm(self):

countdown = 100

while domain_to_stop.get_power_state() != 'Halted' and countdown > 0:
while domain_to_stop.get_power_state() != "Halted" and countdown > 0:
time.sleep(1)
countdown -= 1

Expand All @@ -81,11 +85,11 @@ def test_02_stop_vm(self):
self.assertIsNone(menu_item, "stopped item still incorrectly listed")

def test_03_start_vm(self):
domain_to_start = self.qapp.domains['test-halted']
domain_to_start = self.qapp.domains["test-halted"]

if domain_to_start.is_running():
domain_to_start.shutdown()
while domain_to_start.get_power_state() != 'Halted':
while domain_to_start.get_power_state() != "Halted":
time.sleep(1)
time.sleep(10)

Expand All @@ -99,15 +103,16 @@ def test_03_start_vm(self):
# should finish starting
countdown = 100
while countdown > 0:
if domain_to_start.get_power_state() == 'Running':
if domain_to_start.get_power_state() == "Running":
self.__refresh_gui(45)
item = self.__find_menu_item(domain_to_start)
self.assertIsNotNone(item,
"domain not listed as started")
self.assertIsNotNone(item, "domain not listed as started")
self.assertIsNotNone(item, "item incorrectly not listed")
self.assertIsInstance(item.get_submenu(),
domains_widget.StartedMenu,
"incorrect menu (debug not start)")
self.assertIsInstance(
item.get_submenu(),
domains_widget.StartedMenu,
"incorrect menu (debug not start)",
)
break
time.sleep(1)
countdown -= 1
Expand All @@ -116,7 +121,7 @@ def test_03_start_vm(self):

def __find_menu_item(self, vm):
for menu_item in self.widget.tray_menu:
menu_domain = self.qapp.domains[menu_item.vm['name']]
menu_domain = self.qapp.domains[menu_item.vm["name"]]
if menu_domain == vm:
return menu_item
return None
Expand All @@ -125,7 +130,7 @@ def __find_menu_item(self, vm):
def __refresh_gui(delay=0):
time.sleep(delay)
while Gtk.events_pending():
Gtk.main_iteration_do(blocking=True)
Gtk.main_iteration_do(blocking=True)


if __name__ == "__main__":
Expand Down
37 changes: 26 additions & 11 deletions qui/tools/qubes_device_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
from qrexec.server import SocketService
from qrexec.utils import sanitize_domain_name
from qrexec.tools.qrexec_policy_agent import (
VMListModeler, RPCConfirmationWindow)
VMListModeler,
RPCConfirmationWindow,
)
from qubesadmin.device_protocol import DeviceSerializer


Expand All @@ -59,7 +61,7 @@ def _override_entries(self, options):
self._entries = {}
for name, vm in self._domains_info.items():
if name.startswith("@dispvm:"):
vm_name = name[len("@dispvm:"):]
vm_name = name[len("@dispvm:") :]
prefix = "Disposable VM: "
else:
vm_name = name
Expand All @@ -79,14 +81,16 @@ def _override_entries(self, options):
def apply_icon(self, entry, qube_name):
if isinstance(entry, Gtk.Entry):
for vm_info in self._entries.values():
if qube_name == vm_info['api_name']:
if qube_name == vm_info["api_name"]:
entry.set_icon_from_pixbuf(
Gtk.EntryIconPosition.PRIMARY, vm_info["icon"],
Gtk.EntryIconPosition.PRIMARY,
vm_info["icon"],
)
break
else:
raise ValueError(
f"The following source qube does not exist: {qube_name}")
f"The following source qube does not exist: {qube_name}"
)
else:
raise TypeError(
"Only expecting Gtk.Entry objects to want our icon."
Expand All @@ -96,7 +100,8 @@ def apply_icon(self, entry, qube_name):
class AttachmentConfirmationWindow(RPCConfirmationWindow):
# pylint: disable=too-few-public-methods,too-many-instance-attributes
_source_file_ref = importlib.resources.files("qui").joinpath(
os.path.join("devices", "AttachConfirmationWindow.glade"))
os.path.join("devices", "AttachConfirmationWindow.glade")
)

_source_id = {
"window": "AttachConfirmationWindow",
Expand All @@ -114,13 +119,20 @@ class AttachmentConfirmationWindow(RPCConfirmationWindow):
# pylint: disable=super-init-not-called
def __init__(
self,
entries_info, source, device_name, argument, targets_list, target=None
entries_info,
source,
device_name,
argument,
targets_list,
target=None,
):
# pylint: disable=too-many-arguments
sanitize_domain_name(source, assert_sanitized=True)
DeviceSerializer.sanitize_str(
device_name, DeviceSerializer.ALLOWED_CHARS_PARAM,
error_message="Invalid device name")
device_name,
DeviceSerializer.ALLOWED_CHARS_PARAM,
error_message="Invalid device name",
)

self._gtk_builder = Gtk.Builder()
with importlib.resources.as_file(self._source_file_ref) as path:
Expand Down Expand Up @@ -157,8 +169,11 @@ def __init__(

self._entries_info = entries_info

options = {name: " " + options for vm_data in targets_list
for name, _, options in (vm_data.partition(" "),)}
options = {
name: " " + options
for vm_data in targets_list
for name, _, options in (vm_data.partition(" "),)
}
list_modeler = self._new_vm_list_modeler_overridden(options)

list_modeler.apply_model(
Expand Down
Loading