diff --git a/nixos/lib/test-driver/test_driver/machine.py b/nixos/lib/test-driver/test_driver/machine.py index 7ed001a1dfce46a..c434e2db321630b 100644 --- a/nixos/lib/test-driver/test_driver/machine.py +++ b/nixos/lib/test-driver/test_driver/machine.py @@ -19,6 +19,8 @@ from test_driver.logger import rootlog +from .qmp import QMPSession + CHAR_TO_KEY = { "A": "shift-a", "N": "shift-n", @@ -144,6 +146,7 @@ class StartCommand: def cmd( self, monitor_socket_path: Path, + qmp_socket_path: Path, shell_socket_path: Path, allow_reboot: bool = False, ) -> str: @@ -167,6 +170,7 @@ def cmd( return ( f"{self._cmd}" + f" -qmp unix:{qmp_socket_path},server=on,wait=off" f" -monitor unix:{monitor_socket_path}" f" -chardev socket,id=shell,path={shell_socket_path}" f"{qemu_opts}" @@ -194,11 +198,14 @@ def run( state_dir: Path, shared_dir: Path, monitor_socket_path: Path, + qmp_socket_path: Path, shell_socket_path: Path, allow_reboot: bool, ) -> subprocess.Popen: return subprocess.Popen( - self.cmd(monitor_socket_path, shell_socket_path, allow_reboot), + self.cmd( + monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot + ), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -309,6 +316,7 @@ class Machine: shared_dir: Path state_dir: Path monitor_path: Path + qmp_path: Path shell_path: Path start_command: StartCommand @@ -317,6 +325,7 @@ class Machine: process: Optional[subprocess.Popen] pid: Optional[int] monitor: Optional[socket.socket] + qmp_client: Optional[QMPSession] shell: Optional[socket.socket] serial_thread: Optional[threading.Thread] @@ -352,6 +361,7 @@ def __init__( self.state_dir = self.tmp_dir / f"vm-state-{self.name}" self.monitor_path = self.state_dir / "monitor" + self.qmp_path = self.state_dir / "qmp" self.shell_path = self.state_dir / "shell" if (not self.keep_vm_state) and self.state_dir.exists(): self.cleanup_statedir() @@ -360,6 +370,7 @@ def __init__( self.process = None self.pid = None self.monitor = None + self.qmp_client = None self.shell = None self.serial_thread = None @@ -1090,11 +1101,13 @@ def create_socket(path: Path) -> socket.socket: self.state_dir, self.shared_dir, self.monitor_path, + self.qmp_path, self.shell_path, allow_reboot, ) self.monitor, _ = monitor_socket.accept() self.shell, _ = shell_socket.accept() + self.qmp_client = QMPSession.from_path(self.qmp_path) # Store last serial console lines for use # of wait_for_console_text diff --git a/nixos/lib/test-driver/test_driver/qmp.py b/nixos/lib/test-driver/test_driver/qmp.py new file mode 100644 index 000000000000000..62ca6d7d5b802c2 --- /dev/null +++ b/nixos/lib/test-driver/test_driver/qmp.py @@ -0,0 +1,98 @@ +import json +import logging +import os +import socket +from collections.abc import Iterator +from pathlib import Path +from queue import Queue +from typing import Any + +logger = logging.getLogger(__name__) + + +class QMPAPIError(RuntimeError): + def __init__(self, message: dict[str, Any]): + assert "error" in message, "Not an error message!" + try: + self.class_name = message["class"] + self.description = message["desc"] + # NOTE: Some errors can occur before the Server is able to read the + # id member; in these cases the id member will not be part of the + # error response, even if provided by the client. + self.transaction_id = message.get("id") + except KeyError: + raise RuntimeError("Malformed QMP API error response") + + def __str__(self) -> str: + return f"" + + +class QMPSession: + def __init__(self, sock: socket.socket) -> None: + self.sock = sock + self.results: Queue[dict[str, str]] = Queue() + self.pending_events: Queue[dict[str, Any]] = Queue() + self.reader = sock.makefile("r") + self.writer = sock.makefile("w") + # Make the reader non-blocking so we can kind of select on it. + os.set_blocking(self.reader.fileno(), False) + hello = self._wait_for_new_result() + logger.debug(f"Got greeting from QMP API: {hello}") + # The greeting message format is: + # { "QMP": { "version": json-object, "capabilities": json-array } } + assert "QMP" in hello, f"Unexpected result: {hello}" + self.send("qmp_capabilities") + + @classmethod + def from_path(cls, path: Path) -> "QMPSession": + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(str(path)) + return cls(sock) + + def __del__(self) -> None: + self.sock.close() + + def _wait_for_new_result(self) -> dict[str, str]: + assert self.results.empty(), "Results set is not empty, missed results!" + while self.results.empty(): + self.read_pending_messages() + return self.results.get() + + def read_pending_messages(self) -> None: + line = self.reader.readline() + if not line: + return + evt_or_result = json.loads(line) + logger.debug(f"Received a message: {evt_or_result}") + + # It's a result + if "return" in evt_or_result or "QMP" in evt_or_result: + self.results.put(evt_or_result) + # It's an event + elif "event" in evt_or_result: + self.pending_events.put(evt_or_result) + else: + raise QMPAPIError(evt_or_result) + + def wait_for_event(self, timeout: int = 10) -> dict[str, Any]: + while self.pending_events.empty(): + self.read_pending_messages() + + return self.pending_events.get(timeout=timeout) + + def events(self, timeout: int = 10) -> Iterator[dict[str, Any]]: + while not self.pending_events.empty(): + yield self.pending_events.get(timeout=timeout) + + def send(self, cmd: str, args: dict[str, str] = {}) -> dict[str, str]: + self.read_pending_messages() + assert self.results.empty(), "Results set is not empty, missed results!" + data: dict[str, Any] = dict(execute=cmd) + if args != {}: + data["arguments"] = args + + logger.debug(f"Sending {data} to QMP...") + json.dump(data, self.writer) + self.writer.write("\n") + self.writer.flush() + return self._wait_for_new_result()