diff --git a/qubes/api/__init__.py b/qubes/api/__init__.py index 700eca39b..e809d6d8f 100644 --- a/qubes/api/__init__.py +++ b/qubes/api/__init__.py @@ -24,10 +24,13 @@ import functools import io import os +import re import shutil import socket import struct import traceback +from typing import Union +import uuid import qubes.exc @@ -94,6 +97,33 @@ def apply_filters(iterable, filters): iterable = filter(selector, iterable) return iterable +# pylint: disable=line-too-long +_uuid_regex = re.compile(rb"\A@uuid:[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\Z") +_qube_name_regex = re.compile(rb"\A[A-Za-z][A-Za-z0-9_.-]*\Z") +def decode_vm( + untrusted_input: bytes, domains: qubes.app.VMCollection +) -> qubes.vm.qubesvm.QubesVM: + lookup: Union[uuid.UUID, str] + vm = untrusted_input.decode("ascii", "strict") + if untrusted_input.startswith(b"@uuid:"): + if (len(untrusted_input) != 42 or + not _uuid_regex.match(untrusted_input)): + raise qubes.exc.QubesVMNotFoundError(f"Invalid VM UUID {vm[6:]}") + lookup = uuid.UUID(vm[6:].decode("ascii", "strict")) + else: + if (len(untrusted_input) > 31 or + not _qube_name_regex.match(untrusted_input)): + raise qubes.exc.QubesVMNotFoundError(f"Invalid VM name {vm}") + lookup = vm + try: + return domains[lookup] + except KeyError: + # normally this should filtered out by qrexec policy, but there are + # two cases it might not be: + # 1. The call comes from dom0, which bypasses qrexec policy + # 2. Domain was removed between checking the policy and here + # we inform the client accordingly + raise qubes.exc.QubesVMNotFoundError(vm) class AbstractQubesAPI: '''Common code for Qubes Management Protocol handling @@ -114,25 +144,17 @@ class AbstractQubesAPI: #: the preferred socket location (to be overridden in child's class) SOCKNAME = None - def __init__(self, app, src, method_name, dest, arg, send_event=None): + app: qubes.Qubes + src: qubes.vm.qubesvm.QubesVM + def __init__(self, app: qubes.Qubes, src: bytes, method_name, dest: bytes, arg, send_event=None): #: :py:class:`qubes.Qubes` object self.app = app - try: - vm = src.decode('ascii') - #: source qube - self.src = self.app.domains[vm] - - vm = dest.decode('ascii') - #: destination qube - self.dest = self.app.domains[vm] - except KeyError: - # normally this should filtered out by qrexec policy, but there are - # two cases it might not be: - # 1. The call comes from dom0, which bypasses qrexec policy - # 2. Domain was removed between checking the policy and here - # we inform the client accordingly - raise qubes.exc.QubesVMNotFoundError(vm) + #: source qube + self.src = decode_vm(src, app.domains) + + #: destination qube + self.dest = decode_vm(dest, app.domains) #: argument self.arg = arg.decode('ascii') diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 6c612c308..77c6bc4a4 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -119,10 +119,11 @@ async def vm_list(self): else: domains = self.fire_event_for_filter([self.dest]) - return ''.join('{} class={} state={}\n'.format( + return ''.join('{} class={} state={} uuid={}\n'.format( vm.name, vm.__class__.__name__, - vm.get_power_state()) + vm.get_power_state(), + vm.uuid) for vm in sorted(domains)) @qubes.api.method('admin.vm.property.List', no_payload=True, @@ -1134,10 +1135,14 @@ async def _vm_create(self, vm_type, allow_pool=False, raise self.app.save() - @qubes.api.method('admin.vm.CreateDisposable', no_payload=True, + @qubes.api.method('admin.vm.CreateDisposable', scope='global', write=True) - async def create_disposable(self): + async def create_disposable(self, untrusted_payload): self.enforce(not self.arg) + if untrusted_payload not in (b"", b"uuid"): + raise qubes.exc.QubesValueError( + "Invalid payload for admin.vm.CreateDisposable: " + "expected the empty string or 'uuid'") if self.dest.name == 'dom0': dispvm_template = self.src.default_dispvm @@ -1150,7 +1155,7 @@ async def create_disposable(self): # TODO: move this to extension (in race-free fashion, better than here) dispvm.tags.add('disp-created-by-' + str(self.src)) - return dispvm.name + return str(dispvm.uuid) if untrusted_payload else dispvm.name @qubes.api.method('admin.vm.Remove', no_payload=True, scope='global', write=True) diff --git a/qubes/api/internal.py b/qubes/api/internal.py index 3812764d1..587a61b0c 100644 --- a/qubes/api/internal.py +++ b/qubes/api/internal.py @@ -43,6 +43,7 @@ def get_system_info(app): 'guivm': (domain.guivm.name if getattr(domain, 'guivm', None) else None), 'power_state': domain.get_power_state(), + 'uuid': str(domain.uuid), } for domain in app.domains }} return system_info diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index e6f6fd8fb..b4c62867e 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -23,6 +23,7 @@ import asyncio import operator import os +import re import shutil import tempfile import unittest.mock @@ -123,18 +124,30 @@ def call_internal_mgmt_func(self, method, dest, arg=b'', payload=b''): mgmt_obj.execute(untrusted_payload=payload)) return response +_uuid_regex = re.compile(r"\A uuid=[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\Z") +# UUIDs are non-deterministic, so strip them from the output +def replace_uuid(list_line: str) -> str: + if not list_line: + return "" + out = [] + assert list_line.endswith("\n") + for line in list_line.split("\n")[:-1]: + assert _uuid_regex.match(line[-42:]), f"line {line[-42:]!r} does not end with a UUID" + out.append(line[:-42]) + out.append("") + return "\n".join(out) class TC_00_VMs(AdminAPITestCase): def test_000_vm_list(self): value = self.call_mgmt_func(b'admin.vm.List', b'dom0') - self.assertEqual(value, + self.assertEqual(replace_uuid(value), 'dom0 class=AdminVM state=Running\n' 'test-template class=TemplateVM state=Halted\n' 'test-vm1 class=AppVM state=Halted\n') def test_001_vm_list_single(self): value = self.call_mgmt_func(b'admin.vm.List', b'test-vm1') - self.assertEqual(value, + self.assertEqual(replace_uuid(value), 'test-vm1 class=AppVM state=Halted\n') def test_002_vm_list_filter(self): @@ -151,7 +164,7 @@ def test_002_vm_list_filter(self): loop = asyncio.get_event_loop() value = loop.run_until_complete( mgmt_obj.execute(untrusted_payload=b'')) - self.assertEqual(value, + self.assertEqual(replace_uuid(value), 'dom0 class=AdminVM state=Running\n' 'test-vm1 class=AppVM state=Halted\n') diff --git a/qubes/tests/api_internal.py b/qubes/tests/api_internal.py index 653f20fdb..cf06f4e29 100644 --- a/qubes/tests/api_internal.py +++ b/qubes/tests/api_internal.py @@ -23,6 +23,7 @@ import qubes.vm.adminvm from unittest import mock import json +import uuid def mock_coro(f): async def coro_f(*args, **kwargs): @@ -150,6 +151,7 @@ def test_010_get_system_info(self): self.dom0.template_for_dispvms = False self.dom0.label.icon = 'icon-dom0' self.dom0.get_power_state.return_value = 'Running' + self.dom0.uuid = uuid.UUID("00000000-0000-0000-0000-000000000000") del self.dom0.guivm vm = mock.NonCallableMock(spec=qubes.vm.qubesvm.QubesVM) @@ -160,6 +162,7 @@ def test_010_get_system_info(self): vm.label.icon = 'icon-vm' vm.guivm = vm vm.get_power_state.return_value = 'Halted' + vm.uuid = uuid.uuid4() self.domains['vm'] = vm ret = json.loads(self.call_mgmt_func(b'internal.GetSystemInfo')) @@ -173,6 +176,7 @@ def test_010_get_system_info(self): 'icon': 'icon-dom0', 'guivm': None, 'power_state': 'Running', + 'uuid': "00000000-0000-0000-0000-000000000000", }, 'vm': { 'tags': ['tag3', 'tag4'], @@ -182,6 +186,7 @@ def test_010_get_system_info(self): 'icon': 'icon-vm', 'guivm': 'vm', 'power_state': 'Halted', + "uuid": str(vm.uuid) } } })