Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/vmm/src/devices/virtio/block/virtio/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,13 @@ impl VirtioBlock {
Ok(())
}

fn process_async_completion_queue(&mut self) {
fn process_async_completion_queue(&mut self, force_signal: bool) {
let engine = unwrap_async_file_engine_or_return!(&mut self.disk.file_engine);

// This is safe since we checked in the event handler that the device is activated.
let active_state = self.device_state.active_state().unwrap();
let queue = &mut self.queues[0];
let mut used_any = false;

loop {
match engine.pop(&active_state.mem) {
Expand Down Expand Up @@ -504,12 +505,13 @@ impl VirtioBlock {
finished.desc_idx, err
)
});
used_any = true;
}
}
}
queue.advance_used_ring_idx();

if queue.prepare_kick() {
if (force_signal && used_any) || queue.prepare_kick() {
active_state
.interrupt
.trigger(VirtioInterruptType::Queue(0))
Expand All @@ -525,7 +527,7 @@ impl VirtioBlock {
if let Err(err) = engine.completion_evt().read() {
error!("Failed to get async completion event: {:?}", err);
} else {
self.process_async_completion_queue();
self.process_async_completion_queue(false);

if self.is_io_engine_throttled {
self.is_io_engine_throttled = false;
Expand Down Expand Up @@ -577,7 +579,7 @@ impl VirtioBlock {

self.drain_and_flush(false);
if let FileEngine::Async(ref _engine) = self.disk.file_engine {
self.process_async_completion_queue();
self.process_async_completion_queue(true);
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/vmm/src/devices/virtio/block/virtio/io/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::devices::virtio::block::virtio::{IO_URING_NUM_ENTRIES, PendingRequest
use crate::io_uring::operation::{Cqe, OpCode, Operation};
use crate::io_uring::restriction::Restriction;
use crate::io_uring::{IoUring, IoUringError};
use crate::logger::log_dev_preview_warning;
use crate::logger::{debug, log_dev_preview_warning};
use crate::vstate::memory::{GuestAddress, GuestMemory, GuestMemoryExtension, GuestMemoryMmap};

#[derive(Debug, thiserror::Error, displaydoc::Display)]
Expand Down Expand Up @@ -220,6 +220,13 @@ impl AsyncFileEngine {
}

pub fn drain_and_flush(&mut self, discard_cqes: bool) -> Result<(), AsyncIoError> {
// LCOV_EXCL_START
debug!(
"drain_and_flush draining: pending_ops={} discard_cqes={}",
self.ring.num_ops(),
discard_cqes
);
// LCOV_EXCL_STOP
self.drain(discard_cqes)?;

// Sync data out to physical media on host.
Expand Down
119 changes: 118 additions & 1 deletion tests/integration_tests/functional/test_snapshot_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import host_tools.drive as drive_tools
import host_tools.network as net_tools
from framework import utils
from framework.microvm import SnapshotType
from framework.properties import global_props
from framework.utils import check_filesystem, check_output
from framework.utils import Timeout, check_filesystem, check_output
from framework.utils_vsock import (
ECHO_SERVER_PORT,
VSOCK_UDS_PATH,
Expand Down Expand Up @@ -583,3 +584,119 @@ def test_snapshot_rename_interface(uvm_nano, microvm_factory):
rename_interfaces={iface_override.dev_name: iface_override.tap_name},
resume=True,
)


@pytest.mark.parametrize("snapshot_type", [SnapshotType.FULL])
@pytest.mark.parametrize("pci_enabled", [False])
def test_snapshot_with_heavy_async_io(
microvm_factory, guest_kernel_linux_6_1, rootfs_rw, snapshot_type, pci_enabled
):
"""
Test that snapshot/restore works correctly when there are in-flight async I/O
operations.
"""

def _parse_pending_ops_from_log(vm):
"""Parse VM log to find pending_ops during drain. Returns 0 if not found."""
if not vm.log_file or not vm.log_file.exists():
return 0
try:
for line in vm.log_data.splitlines():
if "drain_and_flush draining:" in line:
match = re.search(r"pending_ops=(\d+)", line)
if match:
print(line)
return int(match.group(1))
except (OSError, ValueError) as err:
print(f"ERROR: Failed to parse log file: {err}")
return 0

max_attempts = 30

for attempt in range(max_attempts):
print("=" * 80)
print(
f"Attempt {attempt + 1}/{max_attempts} - Testing for non-zero async I/O drain"
)
print("=" * 80)

vm = microvm_factory.build(guest_kernel_linux_6_1, rootfs_rw, pci=pci_enabled)
vm.spawn(log_level="Debug", log_show_level=True, log_show_origin=True)
vm.basic_config(
vcpu_count=2,
mem_size_mib=1024,
rootfs_io_engine="Async",
)
vm.add_net_iface()
vm.start()

write_io_script = """
mkdir -p /root/io_test
cd /root/io_test

for i in $(seq 1 2000); do
dd if=/dev/urandom of=/root/io_test/test_file_$i bs=8k count=1 oflag=direct 2>/dev/null &
done

for i in $(seq 1 1000); do
dd if=/dev/urandom of=/root/io_test/medium_file_$i bs=16k count=1 oflag=direct 2>/dev/null &
done

for i in $(seq 1 500); do
dd if=/dev/urandom of=/root/io_test/large_file_$i bs=32k count=1 oflag=direct 2>/dev/null &
done

for i in $(seq 1 200); do
dd if=/dev/urandom of=/root/io_test/xlarge_file_$i bs=64k count=1 oflag=direct 2>/dev/null &
done

fio --name=heavy_write --filename=/root/io_test/fio_write_test \
--rw=randwrite --bs=4k --size=2G --ioengine=libaio \
--iodepth=512 --direct=1 --runtime=30 --time_based \
--numjobs=2 --group_reporting >/dev/null 2>&1 &

fio --name=heavy_write2 --filename=/root/io_test/fio_write_test2 \
--rw=randwrite --bs=8k --size=2G --ioengine=libaio \
--iodepth=512 --direct=1 --runtime=30 --time_based \
--numjobs=2 --group_reporting >/dev/null 2>&1 &

fio --name=heavy_write3 --filename=/root/io_test/fio_write_test3 \
--rw=randwrite --bs=16k --size=2G --ioengine=libaio \
--iodepth=512 --direct=1 --runtime=30 --time_based \
--numjobs=2 --group_reporting >/dev/null 2>&1 &

fio --name=heavy_write4 --filename=/root/io_test/fio_write_test4 \
--rw=randwrite --bs=4k --size=2G --ioengine=libaio \
--iodepth=512 --direct=1 --runtime=30 --time_based \
--numjobs=2 --group_reporting >/dev/null 2>&1 &
"""

vm.ssh.run(write_io_script)

# Very short sleep to maximize chance of catching in-flight I/O
time.sleep(0.01)

snapshot = vm.make_snapshot(snapshot_type)
pending_ops_during_drain = _parse_pending_ops_from_log(vm)
vm.kill()

if pending_ops_during_drain == 0:
print("pending_ops=0, retrying...")
# Clean up only when we're not going to use the snapshot
vm.jailer.cleanup()
chroot = vm.jailer.chroot_base_with_id()
if chroot.exists():
shutil.rmtree(chroot)
continue

# We caught in-flight I/O - now test restore
print(f"Caught {pending_ops_during_drain} pending ops, testing restore...")
with Timeout(30):
restored_vm = microvm_factory.build_from_snapshot(snapshot)

# Verify VM is responsive
restored_vm.ssh.check_output("true")
print(f"SUCCESS: VM restored with {pending_ops_during_drain} pending async ops")
return

pytest.skip(f"Could not catch in-flight async I/O after {max_attempts} attempts")