Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ alioth-macros = { path = "alioth-macros", version = "0.12.0" }
assert_matches = "1"
ctor = "0.6"
flexi_logger = "0.31"
flume = "0.12.0"
parking_lot = { version = "0.12", features = ["hardware-lock-elision"] }
pretty_assertions = "1"
proc-macro2 = "1"
Expand Down
1 change: 1 addition & 0 deletions alioth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ alioth-macros.workspace = true
bitfield = "0.19.4"
bitflags = "2.11.0"
chrono = "0.4.44"
flume.workspace = true
libc = "0.2.184"
log = "0.4"
mio = { version = "1", features = ["net", "os-ext", "os-poll"] }
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/board/board.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ mod x86_64;
use std::collections::HashMap;
use std::ffi::CStr;
use std::sync::Arc;
use std::sync::mpsc::Sender;
use std::thread::JoinHandle;

use flume::Sender;
use libc::{MAP_PRIVATE, MAP_SHARED};
use parking_lot::{Condvar, Mutex, RwLock, RwLockReadGuard};
use serde::Deserialize;
Expand Down
8 changes: 4 additions & 4 deletions alioth/src/hv/hv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ pub enum Error {
TdxErr { code: u64 },
}

impl From<std::sync::mpsc::RecvError> for Error {
fn from(error: std::sync::mpsc::RecvError) -> Self {
impl From<flume::RecvError> for Error {
fn from(error: flume::RecvError) -> Self {
let source = error.as_error_source();
Error::BrokenChannel {
_location: snafu::GenerateImplicitData::generate_with_source(source),
}
}
}

impl<T: 'static> From<std::sync::mpsc::SendError<T>> for Error {
fn from(error: std::sync::mpsc::SendError<T>) -> Self {
impl<T: 'static> From<flume::SendError<T>> for Error {
fn from(error: flume::SendError<T>) -> Self {
let source = error.as_error_source();
Error::BrokenChannel {
_location: snafu::GenerateImplicitData::generate_with_source(source),
Expand Down
6 changes: 3 additions & 3 deletions alioth/src/hv/hvf/vcpu/vcpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ mod vmexit;

use std::collections::HashMap;
use std::ptr::null_mut;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, mpsc};

use flume::{Receiver, Sender};
use parking_lot::Mutex;
use snafu::ResultExt;

Expand Down Expand Up @@ -70,7 +70,7 @@ impl HvfVcpu {
let ret = unsafe { hv_vcpu_set_sys_reg(vcpu_id, SReg::MPIDR_EL1, mpidr.0) };
check_ret(ret).context(error::VcpuReg)?;

let (sender, receiver) = mpsc::channel();
let (sender, receiver) = flume::unbounded();

let power_on = Arc::new(AtomicBool::new(false));
let handle = Arc::new(VcpuHandle {
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/dev/balloon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::fmt::Debug;
use std::io::{IoSlice, IoSliceMut};
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use alioth_macros::Layout;
use flume::Receiver;
use libc::{_SC_PAGESIZE, sysconf};
use mio::Registry;
use mio::event::Event;
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/dev/blk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::os::fd::AsRawFd;
use std::os::unix::fs::FileExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use flume::Receiver;
#[cfg(target_os = "linux")]
use io_uring::cqueue::Entry as Cqe;
#[cfg(target_os = "linux")]
Expand Down
4 changes: 2 additions & 2 deletions alioth/src/virtio/dev/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ pub mod vsock;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicU16, AtomicU32};
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::JoinHandle;

use bitflags::Flags;
use flume::{Receiver, Sender};
use snafu::ResultExt;

use crate::hv::IoeventFd;
Expand Down Expand Up @@ -194,7 +194,7 @@ where
let queue_regs = queue_regs.collect::<Arc<_>>();

let shared_mem_regions = dev.shared_mem_regions();
let (event_tx, event_rx) = mpsc::channel();
let (event_tx, event_rx) = flume::unbounded();
let (handle, notifier) = dev.spawn_worker(event_rx, memory, queue_regs.clone())?;
log::debug!(
"{name}: created with {:x?}, {:x?}",
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/dev/entropy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::fs::{File, OpenOptions};
use std::os::unix::prelude::OpenOptionsExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use flume::Receiver;
use libc::O_NONBLOCK;
use mio::Registry;
use mio::event::Event;
Expand Down
6 changes: 3 additions & 3 deletions alioth/src/virtio/dev/entropy_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use std::ffi::CString;
use std::fs::OpenOptions;
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
use std::sync::mpsc::TryRecvError;
use std::sync::{Arc, mpsc};
use std::time::Duration;

use assert_matches::assert_matches;
use flume::TryRecvError;
use rstest::rstest;
use tempfile::TempDir;

Expand Down Expand Up @@ -78,9 +78,9 @@ fn entropy_test(fixture_ram_bus: RamBus, fixture_queues: Box<[QueueReg]>) {
assert_matches!(*dev.config(), EntropyConfig);
assert_eq!(dev.feature(), FEATURE_BUILT_IN);

let (tx, rx) = mpsc::channel();
let (tx, rx) = flume::unbounded();
let (handle, notifier) = dev.spawn_worker(rx, ram_bus.clone(), regs).unwrap();
let (irq_tx, irq_rx) = mpsc::channel();
let (irq_tx, irq_rx) = flume::unbounded();
let irq_sender = Arc::new(FakeIrqSender { q_tx: irq_tx });
let start_param = StartParam {
feature: VirtioFeature::VERSION_1.bits(),
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/dev/fs/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::fs::File;
use std::io::{self, IoSlice, IoSliceMut, Read};
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use flume::Receiver;
use mio::Registry;
use mio::event::Event;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/dev/fs/vu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::mem::size_of_val;
use std::os::fd::{AsFd, AsRawFd};
use std::path::Path;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use flume::Receiver;
use libc::{MAP_ANONYMOUS, MAP_FAILED, MAP_FIXED, MAP_PRIVATE, MAP_SHARED, PROT_NONE, mmap};
use mio::event::Event;
use mio::unix::SourceFd;
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/dev/net/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::os::fd::{AsFd, AsRawFd};
use std::os::unix::prelude::OpenOptionsExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use flume::Receiver;
use io_uring::cqueue::Entry as Cqe;
use io_uring::opcode;
use io_uring::types::Fd;
Expand Down
8 changes: 4 additions & 4 deletions alioth/src/virtio/dev/net/vmnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use std::ffi::CStr;
use std::fmt::Debug;
use std::io::{self, ErrorKind, Read};
use std::ptr::null;
use std::sync::Arc;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, mpsc};
use std::thread::JoinHandle;
use std::time::Duration;

use flume::{Receiver, Sender};
use libc::c_void;
use mio::event::Event;
use mio::{Interest, Registry, Token};
Expand Down Expand Up @@ -92,7 +92,7 @@ impl Net {
];
let desc = unsafe { xpc_dictionary_create(keys.as_ptr(), vals.as_ptr(), 3) };
let dispatch_queue = unsafe { dispatch_queue_create(c"virtio-net".as_ptr(), null()) };
let (sender, receiver) = mpsc::channel::<Result<NetConfig>>();
let (sender, receiver) = flume::unbounded::<Result<NetConfig>>();

#[repr(C)]
struct HandlerBlock {
Expand Down Expand Up @@ -180,7 +180,7 @@ impl Drop for Net {
let interface = self.interface.load(Ordering::Acquire);
let dispatch_queue = self.dispatch_queue.load(Ordering::Acquire);

let (sender, receiver) = mpsc::channel::<VmnetReturn>();
let (sender, receiver) = flume::unbounded::<VmnetReturn>();

#[repr(C)]
struct HandlerBlock {
Expand Down
16 changes: 8 additions & 8 deletions alioth/src/virtio/dev/vsock/uds_vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ use std::os::fd::AsRawFd;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use flume::Receiver;
use mio::event::Event;
use mio::unix::SourceFd;
use mio::{Interest, Registry, Token};
use serde::Deserialize;
use serde_aco::Help;
use zerocopy::{FromBytes, IntoBytes};

use crate::ffi;
use crate::hv::IoeventFd;
use crate::mem::mapped::RamBus;
Expand All @@ -38,13 +45,6 @@ use crate::virtio::queue::{DescChain, Queue, QueueReg, Status, VirtQueue};
use crate::virtio::worker::mio::{ActiveMio, Mio, VirtioMio};
use crate::virtio::{DeviceId, FEATURE_BUILT_IN, IrqSender, Result, error};

use mio::event::Event;
use mio::unix::SourceFd;
use mio::{Interest, Registry, Token};
use serde::Deserialize;
use serde_aco::Help;
use zerocopy::{FromBytes, IntoBytes};

const HEADER_SIZE: usize = size_of::<VsockHeader>();
const SOCKET_TYPE: VsockType = VsockType::STREAM;

Expand Down
6 changes: 3 additions & 3 deletions alioth/src/virtio/dev/vsock/uds_vsock_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
use std::io::{BufRead, BufReader, ErrorKind, Read, Write};
use std::mem::size_of;
use std::os::unix::net::{UnixListener, UnixStream};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::sync::{Arc, mpsc};
use std::time::Duration;

use assert_matches::assert_matches;
use flume::{Receiver, Sender, TryRecvError};
use rstest::rstest;
use tempfile::TempDir;
use zerocopy::{FromBytes, FromZeros, IntoBytes};
Expand Down Expand Up @@ -133,9 +133,9 @@ fn vsock_conn_test(fixture_ram_bus: RamBus, #[with(3)] fixture_queues: Box<[Queu
VsockFeature::STREAM.bits() | FEATURE_BUILT_IN
);

let (tx, rx) = mpsc::channel();
let (tx, rx) = flume::unbounded();
let (handle, notifier) = dev.spawn_worker(rx, ram_bus.clone(), regs).unwrap();
let (irq_tx, irq_rx) = mpsc::channel();
let (irq_tx, irq_rx) = flume::unbounded();
let irq_sender = Arc::new(FakeIrqSender { q_tx: irq_tx });
let start_param = StartParam {
feature: VirtioFeature::VERSION_1.bits(),
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/dev/vsock/vhost_vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;

use flume::Receiver;
use libc::{EFD_CLOEXEC, EFD_NONBLOCK, eventfd};
use mio::event::Event;
use mio::unix::SourceFd;
Expand Down
2 changes: 1 addition & 1 deletion alioth/src/virtio/pci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use std::mem::size_of;
use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::mpsc::Sender;

use alioth_macros::Layout;
use flume::Sender;
use parking_lot::{Mutex, RwLock};
use zerocopy::{FromZeros, Immutable, IntoBytes};

Expand Down
Loading