From 97d23a9aea8e40103546c50e7abdda2793dd92c2 Mon Sep 17 00:00:00 2001 From: Constantine Peresypkin Date: Sat, 6 Dec 2025 14:46:42 -0800 Subject: [PATCH] fix: asyncio not saving interrupt state During prepare_save we must unconditionally trigger an interrupt to ensure the guest gets notified after restore. The guest may have suppressed notifications, but after snapshot/restore it needs to be woken up regardless. Fixes #5554 Signed-off-by: Constantine Peresypkin --- .../src/devices/virtio/block/virtio/device.rs | 10 +- .../virtio/block/virtio/io/async_io.rs | 9 +- .../functional/test_snapshot_basic.py | 119 +++++++++++++++++- 3 files changed, 132 insertions(+), 6 deletions(-) diff --git a/src/vmm/src/devices/virtio/block/virtio/device.rs b/src/vmm/src/devices/virtio/block/virtio/device.rs index ecdd8ee4f6d..0c290c3f74f 100644 --- a/src/vmm/src/devices/virtio/block/virtio/device.rs +++ b/src/vmm/src/devices/virtio/block/virtio/device.rs @@ -468,12 +468,13 @@ impl VirtioBlock { Ok(()) } - fn process_async_completion_queue(&mut self) { + fn process_async_completion_queue(&mut self, force_signal: bool) { let engine = unwrap_async_file_engine_or_return!(&mut self.disk.file_engine); // This is safe since we checked in the event handler that the device is activated. let active_state = self.device_state.active_state().unwrap(); let queue = &mut self.queues[0]; + let mut used_any = false; loop { match engine.pop(&active_state.mem) { @@ -504,12 +505,13 @@ impl VirtioBlock { finished.desc_idx, err ) }); + used_any = true; } } } queue.advance_used_ring_idx(); - if queue.prepare_kick() { + if (force_signal && used_any) || queue.prepare_kick() { active_state .interrupt .trigger(VirtioInterruptType::Queue(0)) @@ -525,7 +527,7 @@ impl VirtioBlock { if let Err(err) = engine.completion_evt().read() { error!("Failed to get async completion event: {:?}", err); } else { - self.process_async_completion_queue(); + self.process_async_completion_queue(false); if self.is_io_engine_throttled { self.is_io_engine_throttled = false; @@ -577,7 +579,7 @@ impl VirtioBlock { self.drain_and_flush(false); if let FileEngine::Async(ref _engine) = self.disk.file_engine { - self.process_async_completion_queue(); + self.process_async_completion_queue(true); } } } diff --git a/src/vmm/src/devices/virtio/block/virtio/io/async_io.rs b/src/vmm/src/devices/virtio/block/virtio/io/async_io.rs index f83d9dea1df..574d87e313d 100644 --- a/src/vmm/src/devices/virtio/block/virtio/io/async_io.rs +++ b/src/vmm/src/devices/virtio/block/virtio/io/async_io.rs @@ -14,7 +14,7 @@ use crate::devices::virtio::block::virtio::{IO_URING_NUM_ENTRIES, PendingRequest use crate::io_uring::operation::{Cqe, OpCode, Operation}; use crate::io_uring::restriction::Restriction; use crate::io_uring::{IoUring, IoUringError}; -use crate::logger::log_dev_preview_warning; +use crate::logger::{debug, log_dev_preview_warning}; use crate::vstate::memory::{GuestAddress, GuestMemory, GuestMemoryExtension, GuestMemoryMmap}; #[derive(Debug, thiserror::Error, displaydoc::Display)] @@ -220,6 +220,13 @@ impl AsyncFileEngine { } pub fn drain_and_flush(&mut self, discard_cqes: bool) -> Result<(), AsyncIoError> { + // LCOV_EXCL_START + debug!( + "drain_and_flush draining: pending_ops={} discard_cqes={}", + self.ring.num_ops(), + discard_cqes + ); + // LCOV_EXCL_STOP self.drain(discard_cqes)?; // Sync data out to physical media on host. diff --git a/tests/integration_tests/functional/test_snapshot_basic.py b/tests/integration_tests/functional/test_snapshot_basic.py index bd9f1ec0d9b..59cb02f04b0 100644 --- a/tests/integration_tests/functional/test_snapshot_basic.py +++ b/tests/integration_tests/functional/test_snapshot_basic.py @@ -19,8 +19,9 @@ import host_tools.drive as drive_tools import host_tools.network as net_tools from framework import utils +from framework.microvm import SnapshotType from framework.properties import global_props -from framework.utils import check_filesystem, check_output +from framework.utils import Timeout, check_filesystem, check_output from framework.utils_vsock import ( ECHO_SERVER_PORT, VSOCK_UDS_PATH, @@ -583,3 +584,119 @@ def test_snapshot_rename_interface(uvm_nano, microvm_factory): rename_interfaces={iface_override.dev_name: iface_override.tap_name}, resume=True, ) + + +@pytest.mark.parametrize("snapshot_type", [SnapshotType.FULL]) +@pytest.mark.parametrize("pci_enabled", [False]) +def test_snapshot_with_heavy_async_io( + microvm_factory, guest_kernel_linux_6_1, rootfs_rw, snapshot_type, pci_enabled +): + """ + Test that snapshot/restore works correctly when there are in-flight async I/O + operations. + """ + + def _parse_pending_ops_from_log(vm): + """Parse VM log to find pending_ops during drain. Returns 0 if not found.""" + if not vm.log_file or not vm.log_file.exists(): + return 0 + try: + for line in vm.log_data.splitlines(): + if "drain_and_flush draining:" in line: + match = re.search(r"pending_ops=(\d+)", line) + if match: + print(line) + return int(match.group(1)) + except (OSError, ValueError) as err: + print(f"ERROR: Failed to parse log file: {err}") + return 0 + + max_attempts = 30 + + for attempt in range(max_attempts): + print("=" * 80) + print( + f"Attempt {attempt + 1}/{max_attempts} - Testing for non-zero async I/O drain" + ) + print("=" * 80) + + vm = microvm_factory.build(guest_kernel_linux_6_1, rootfs_rw, pci=pci_enabled) + vm.spawn(log_level="Debug", log_show_level=True, log_show_origin=True) + vm.basic_config( + vcpu_count=2, + mem_size_mib=1024, + rootfs_io_engine="Async", + ) + vm.add_net_iface() + vm.start() + + write_io_script = """ + mkdir -p /root/io_test + cd /root/io_test + + for i in $(seq 1 2000); do + dd if=/dev/urandom of=/root/io_test/test_file_$i bs=8k count=1 oflag=direct 2>/dev/null & + done + + for i in $(seq 1 1000); do + dd if=/dev/urandom of=/root/io_test/medium_file_$i bs=16k count=1 oflag=direct 2>/dev/null & + done + + for i in $(seq 1 500); do + dd if=/dev/urandom of=/root/io_test/large_file_$i bs=32k count=1 oflag=direct 2>/dev/null & + done + + for i in $(seq 1 200); do + dd if=/dev/urandom of=/root/io_test/xlarge_file_$i bs=64k count=1 oflag=direct 2>/dev/null & + done + + fio --name=heavy_write --filename=/root/io_test/fio_write_test \ + --rw=randwrite --bs=4k --size=2G --ioengine=libaio \ + --iodepth=512 --direct=1 --runtime=30 --time_based \ + --numjobs=2 --group_reporting >/dev/null 2>&1 & + + fio --name=heavy_write2 --filename=/root/io_test/fio_write_test2 \ + --rw=randwrite --bs=8k --size=2G --ioengine=libaio \ + --iodepth=512 --direct=1 --runtime=30 --time_based \ + --numjobs=2 --group_reporting >/dev/null 2>&1 & + + fio --name=heavy_write3 --filename=/root/io_test/fio_write_test3 \ + --rw=randwrite --bs=16k --size=2G --ioengine=libaio \ + --iodepth=512 --direct=1 --runtime=30 --time_based \ + --numjobs=2 --group_reporting >/dev/null 2>&1 & + + fio --name=heavy_write4 --filename=/root/io_test/fio_write_test4 \ + --rw=randwrite --bs=4k --size=2G --ioengine=libaio \ + --iodepth=512 --direct=1 --runtime=30 --time_based \ + --numjobs=2 --group_reporting >/dev/null 2>&1 & + """ + + vm.ssh.run(write_io_script) + + # Very short sleep to maximize chance of catching in-flight I/O + time.sleep(0.01) + + snapshot = vm.make_snapshot(snapshot_type) + pending_ops_during_drain = _parse_pending_ops_from_log(vm) + vm.kill() + + if pending_ops_during_drain == 0: + print("pending_ops=0, retrying...") + # Clean up only when we're not going to use the snapshot + vm.jailer.cleanup() + chroot = vm.jailer.chroot_base_with_id() + if chroot.exists(): + shutil.rmtree(chroot) + continue + + # We caught in-flight I/O - now test restore + print(f"Caught {pending_ops_during_drain} pending ops, testing restore...") + with Timeout(30): + restored_vm = microvm_factory.build_from_snapshot(snapshot) + + # Verify VM is responsive + restored_vm.ssh.check_output("true") + print(f"SUCCESS: VM restored with {pending_ops_during_drain} pending async ops") + return + + pytest.skip(f"Could not catch in-flight async I/O after {max_attempts} attempts")