Skip to content

Commit

Permalink
q-dev: use ext/utils
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 1, 2024
1 parent 785bd62 commit e7acdf4
Showing 1 changed file with 12 additions and 75 deletions.
87 changes: 12 additions & 75 deletions qubesusbproxy/core3ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import qubes.devices
import qubes.ext
import qubes.vm.adminvm
from qubes.ext import utils

usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)*$")
# should match valid VM name
Expand All @@ -46,6 +47,7 @@

HWDATA_PATH = '/usr/share/hwdata'


class USBDevice(qubes.devices.DeviceInfo):
# pylint: disable=too-few-public-methods
def __init__(self, backend_domain, ident):
Expand Down Expand Up @@ -148,9 +150,9 @@ def interfaces(self) -> List[qubes.devices.DeviceInterface]:
@property
def parent_device(self) -> Optional[qubes.devices.DeviceInfo]:
"""
The parent device if any.
The parent device, if any.
USB device has no parents.
A USB device has no parents.
"""
return None

Expand Down Expand Up @@ -266,7 +268,7 @@ def attachment(self):
@property
def self_identity(self) -> str:
"""
Get identification of device not related to port.
Get identification of a device not related to port.
"""
if self._vendor_id is None:
vendor_id = self._load_desc_from_qubesdb()["vendor ID"]
Expand All @@ -288,7 +290,7 @@ def _get_vendor_and_product_names(
"""
Return tuple of vendor's and product's names for the ids.
If the id is not known return ("unknown", "unknown").
If the id is not known, return ("unknown", "unknown").
"""
return (USBDevice._load_usb_known_devices()
.get(vendor_id, dict())
Expand Down Expand Up @@ -425,7 +427,7 @@ def on_domain_init_load(self, vm, event):
else:
self.devices_cache[vm.name] = {}

async def _attach_and_notify(self, vm, device, options):
async def attach_and_notify(self, vm, device, options):
# bypass DeviceCollection logic preventing double attach
try:
await self.on_device_attach_usb(
Expand All @@ -437,76 +439,11 @@ async def _attach_and_notify(self, vm, device, options):

@qubes.ext.handler('domain-qdb-change:/qubes-usb-devices')
def on_qdb_change(self, vm, event, path):
"""A change in QubesDB means a change in device list."""
"""A change in QubesDB means a change in a device list."""
# pylint: disable=unused-argument,no-self-use
vm.fire_event('device-list-change:usb')
current_devices = dict((dev.ident, dev.attachment)
for dev in self.on_device_list_usb(vm, None))

# send events about devices detached/attached outside by themselves
# (like device pulled out or manual qubes.USB qrexec call)
# compare cached devices and current devices, collect:
# - newly appeared devices (ident)
# - devices attached from a vm to frontend vm (ident: frontend_vm)
# - devices detached from frontend vm (ident: frontend_vm)
# - disappeared devices, e.g. plugged out (ident)
added = set()
attached = dict()
detached = dict()
removed = set()
cache = self.devices_cache[vm.name]
for dev_id, front_vm in current_devices.items():
if dev_id not in cache:
added.add(dev_id)
if front_vm is not None:
attached[dev_id] = front_vm
elif cache[dev_id] != front_vm:
cached_front = cache[dev_id]
if front_vm is None:
detached[dev_id] = cached_front
elif cached_front is None:
attached[dev_id] = front_vm
else:
# front changed from one to another, so we signal it as:
# detach from first one and attach to the second one.
detached[dev_id] = cached_front
attached[dev_id] = front_vm

for dev_id, cached_front in cache.items():
if dev_id not in current_devices:
removed.add(dev_id)
if cached_front is not None:
detached[dev_id] = cached_front

for dev_id, front_vm in detached.items():
dev = USBDevice(vm, dev_id)
asyncio.ensure_future(front_vm.fire_event_async(
'device-detach:usb', device=dev))
for dev_id in removed:
device = USBDevice(vm, dev_id)
vm.fire_event('device-removed:usb', device=device)
for dev_id in added:
device = USBDevice(vm, dev_id)
vm.fire_event('device-added:usb', device=device)
for dev_ident, front_vm in attached.items():
dev = USBDevice(vm, dev_ident)
asyncio.ensure_future(front_vm.fire_event_async(
'device-attach:usb', device=dev,
options={'identity': dev.self_identity})
)

self.devices_cache[vm.name] = current_devices

for front_vm in vm.app.domains:
if not front_vm.is_running():
continue
for assignment in front_vm.devices['usb'].get_assigned_devices():
if (assignment.backend_domain == vm
and assignment.ident in added
and assignment.ident not in attached
):
asyncio.ensure_future(self._attach_and_notify(
front_vm, assignment.device, assignment.options))
for dev in self.on_device_list_usb(vm, None))
utils.device_list_change(self, current_devices, vm, path, USBDevice)

@qubes.ext.handler('device-list:usb')
def on_device_list_usb(self, vm, event):
Expand All @@ -522,7 +459,7 @@ def on_device_list_usb(self, vm, event):
untrusted_dev_list = vm.untrusted_qdb.list('/qubes-usb-devices/')
if not untrusted_dev_list:
return
# just get list of devices, not its every property
# just get a list of devices, not its every property
untrusted_dev_list = \
set(path.split('/')[2] for path in untrusted_dev_list)
for untrusted_qdb_ident in untrusted_dev_list:
Expand Down Expand Up @@ -670,7 +607,7 @@ async def on_device_detach_usb(self, vm, event, device):
async def on_domain_start(self, vm, _event, **_kwargs):
# pylint: disable=unused-argument
for assignment in vm.devices['usb'].get_assigned_devices():
asyncio.ensure_future(self._attach_and_notify(
asyncio.ensure_future(self.attach_and_notify(
vm, assignment.device, assignment.options))

@qubes.ext.handler('domain-shutdown')
Expand Down

0 comments on commit e7acdf4

Please sign in to comment.