Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nixos/lib/test-driver: use QMP API to watch for VM state #257535

Merged
merged 1 commit into from Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 14 additions & 1 deletion nixos/lib/test-driver/test_driver/machine.py
Expand Up @@ -19,6 +19,8 @@

from test_driver.logger import rootlog

from .qmp import QMPSession

CHAR_TO_KEY = {
"A": "shift-a",
"N": "shift-n",
Expand Down Expand Up @@ -144,6 +146,7 @@ class StartCommand:
def cmd(
self,
monitor_socket_path: Path,
qmp_socket_path: Path,
shell_socket_path: Path,
allow_reboot: bool = False,
) -> str:
Expand All @@ -167,6 +170,7 @@ def cmd(

return (
f"{self._cmd}"
f" -qmp unix:{qmp_socket_path},server=on,wait=off"
f" -monitor unix:{monitor_socket_path}"
f" -chardev socket,id=shell,path={shell_socket_path}"
f"{qemu_opts}"
Expand Down Expand Up @@ -194,11 +198,14 @@ def run(
state_dir: Path,
shared_dir: Path,
monitor_socket_path: Path,
qmp_socket_path: Path,
shell_socket_path: Path,
allow_reboot: bool,
) -> subprocess.Popen:
return subprocess.Popen(
self.cmd(monitor_socket_path, shell_socket_path, allow_reboot),
self.cmd(
monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot
),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
Expand Down Expand Up @@ -309,6 +316,7 @@ class Machine:
shared_dir: Path
state_dir: Path
monitor_path: Path
qmp_path: Path
shell_path: Path

start_command: StartCommand
Expand All @@ -317,6 +325,7 @@ class Machine:
process: Optional[subprocess.Popen]
pid: Optional[int]
monitor: Optional[socket.socket]
qmp_client: Optional[QMPSession]
shell: Optional[socket.socket]
serial_thread: Optional[threading.Thread]

Expand Down Expand Up @@ -352,6 +361,7 @@ def __init__(

self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
self.monitor_path = self.state_dir / "monitor"
self.qmp_path = self.state_dir / "qmp"
self.shell_path = self.state_dir / "shell"
if (not self.keep_vm_state) and self.state_dir.exists():
self.cleanup_statedir()
Expand All @@ -360,6 +370,7 @@ def __init__(
self.process = None
self.pid = None
self.monitor = None
self.qmp_client = None
self.shell = None
self.serial_thread = None

Expand Down Expand Up @@ -1090,11 +1101,13 @@ def create_socket(path: Path) -> socket.socket:
self.state_dir,
self.shared_dir,
self.monitor_path,
self.qmp_path,
self.shell_path,
allow_reboot,
)
self.monitor, _ = monitor_socket.accept()
self.shell, _ = shell_socket.accept()
self.qmp_client = QMPSession.from_path(self.qmp_path)

# Store last serial console lines for use
# of wait_for_console_text
Expand Down
98 changes: 98 additions & 0 deletions nixos/lib/test-driver/test_driver/qmp.py
@@ -0,0 +1,98 @@
import json
import logging
import os
import socket
from collections.abc import Iterator
from pathlib import Path
from queue import Queue
from typing import Any

logger = logging.getLogger(__name__)


class QMPAPIError(RuntimeError):
def __init__(self, message: dict[str, Any]):
assert "error" in message, "Not an error message!"
try:
self.class_name = message["class"]
self.description = message["desc"]
# NOTE: Some errors can occur before the Server is able to read the
# id member; in these cases the id member will not be part of the
# error response, even if provided by the client.
self.transaction_id = message.get("id")
except KeyError:
raise RuntimeError("Malformed QMP API error response")

def __str__(self) -> str:
return f"<QMP API error related to transaction {self.transaction_id} [{self.class_name}]: {self.description}>"


class QMPSession:
def __init__(self, sock: socket.socket) -> None:
self.sock = sock
self.results: Queue[dict[str, str]] = Queue()
self.pending_events: Queue[dict[str, Any]] = Queue()
self.reader = sock.makefile("r")
self.writer = sock.makefile("w")
# Make the reader non-blocking so we can kind of select on it.
os.set_blocking(self.reader.fileno(), False)
hello = self._wait_for_new_result()
logger.debug(f"Got greeting from QMP API: {hello}")
# The greeting message format is:
# { "QMP": { "version": json-object, "capabilities": json-array } }
assert "QMP" in hello, f"Unexpected result: {hello}"
self.send("qmp_capabilities")
tfc marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def from_path(cls, path: Path) -> "QMPSession":
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(path))
return cls(sock)

def __del__(self) -> None:
self.sock.close()

def _wait_for_new_result(self) -> dict[str, str]:
assert self.results.empty(), "Results set is not empty, missed results!"
while self.results.empty():
self.read_pending_messages()
return self.results.get()

def read_pending_messages(self) -> None:
line = self.reader.readline()
if not line:
return
evt_or_result = json.loads(line)
logger.debug(f"Received a message: {evt_or_result}")

# It's a result
if "return" in evt_or_result or "QMP" in evt_or_result:
self.results.put(evt_or_result)
# It's an event
elif "event" in evt_or_result:
self.pending_events.put(evt_or_result)
else:
raise QMPAPIError(evt_or_result)

def wait_for_event(self, timeout: int = 10) -> dict[str, Any]:
while self.pending_events.empty():
self.read_pending_messages()

return self.pending_events.get(timeout=timeout)

def events(self, timeout: int = 10) -> Iterator[dict[str, Any]]:
while not self.pending_events.empty():
yield self.pending_events.get(timeout=timeout)

def send(self, cmd: str, args: dict[str, str] = {}) -> dict[str, str]:
self.read_pending_messages()
assert self.results.empty(), "Results set is not empty, missed results!"
data: dict[str, Any] = dict(execute=cmd)
if args != {}:
data["arguments"] = args

logger.debug(f"Sending {data} to QMP...")
json.dump(data, self.writer)
self.writer.write("\n")
self.writer.flush()
return self._wait_for_new_result()