From 5c8d92c8e3d576e47312289b437ee49e7c0b254b Mon Sep 17 00:00:00 2001 From: longjin Date: Mon, 18 May 2026 19:33:55 +0000 Subject: [PATCH 1/5] fix(vfs): wait for pipe space before splice read Signed-off-by: longjin --- .../src/filesystem/vfs/syscall/sys_splice.rs | 5 ++- kernel/src/ipc/pipe.rs | 41 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/kernel/src/filesystem/vfs/syscall/sys_splice.rs b/kernel/src/filesystem/vfs/syscall/sys_splice.rs index fa9ba38e3a..539ec00086 100644 --- a/kernel/src/filesystem/vfs/syscall/sys_splice.rs +++ b/kernel/src/filesystem/vfs/syscall/sys_splice.rs @@ -254,7 +254,10 @@ fn splice_file_to_pipe( ) -> Result { let pipe_inode = get_pipe_inode(pipe)?; - let buf_size = len.min(4096); + let mut buf_size = len.min(4096); + if !flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { + buf_size = pipe_inode.wait_writable_for_splice(buf_size)?; + } let mut buffer = vec![0u8; buf_size]; // 从文件读取 diff --git a/kernel/src/ipc/pipe.rs b/kernel/src/ipc/pipe.rs index 02ab69a024..5b9b615cbc 100644 --- a/kernel/src/ipc/pipe.rs +++ b/kernel/src/ipc/pipe.rs @@ -606,6 +606,47 @@ impl LockedPipeInode { Ok(to_write) } + /// Wait for space before file->pipe splice reads from the input file. + /// + /// DragonOS pipes are byte-ring based, so this follows the same write + /// atomicity rule as pipe writes: writes up to PIPE_BUF wait for the full + /// request; larger writes wait for any space and may complete partially. + pub fn wait_writable_for_splice(&self, len: usize) -> Result { + if len == 0 { + return Ok(0); + } + + let need_atomic = len <= PIPE_BUF; + loop { + let guard = self.inner.lock(); + if guard.reader == 0 { + drop(guard); + let _ = send_kernel_signal_to_current(Signal::SIGPIPE); + return Err(SystemError::EPIPE); + } + + let used = guard.valid_cnt.max(0) as usize; + let space = guard.buf_size.saturating_sub(used); + if (need_atomic && space >= len) || (!need_atomic && space > 0) { + return Ok(if need_atomic { len } else { len.min(space) }); + } + + drop(guard); + let wait_result = if need_atomic { + wq_wait_event_interruptible!( + self.write_wait_queue, + self.writeable_len_at_least(len), + {} + ) + } else { + wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {}) + }; + if wait_result.is_err() { + return Err(SystemError::ERESTARTSYS); + } + } + } + /// 从管道中“窥视”最多 `len` 字节数据到 `buf`,但不消耗管道数据。 /// /// 返回实际拷贝的字节数(可能小于 `len`)。不会睡眠。 From 37346156671d07a34d355cdf97b3b31f15646f68 Mon Sep 17 00:00:00 2001 From: longjin Date: Tue, 19 May 2026 03:36:48 +0000 Subject: [PATCH 2/5] fix(sync): prevent rwsem waiter starvation Signed-off-by: longjin --- .../src/filesystem/vfs/syscall/sys_splice.rs | 58 ++-- kernel/src/ipc/pipe.rs | 9 +- kernel/src/libs/rwsem.rs | 131 ++++++++- .../suites/normal/splice_concurrent_io.cc | 258 ++++++++++++++++++ user/apps/tests/dunitest/whitelist.txt | 1 + 5 files changed, 424 insertions(+), 33 deletions(-) create mode 100644 user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc diff --git a/kernel/src/filesystem/vfs/syscall/sys_splice.rs b/kernel/src/filesystem/vfs/syscall/sys_splice.rs index 539ec00086..3f42af3170 100644 --- a/kernel/src/filesystem/vfs/syscall/sys_splice.rs +++ b/kernel/src/filesystem/vfs/syscall/sys_splice.rs @@ -254,10 +254,26 @@ fn splice_file_to_pipe( ) -> Result { let pipe_inode = get_pipe_inode(pipe)?; - let mut buf_size = len.min(4096); - if !flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { - buf_size = pipe_inode.wait_writable_for_splice(buf_size)?; + let wanted = splice_file_read_limit(file, offset, len.min(4096)); + if wanted == 0 { + return Ok(0); } + + let space = if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { + let space = pipe_inode.writable_len(); + if space == 0 && pipe_inode.has_readers() { + return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); + } + space + } else { + pipe_inode.wait_writable_for_splice(wanted)? + }; + + let buf_size = if space == 0 { + wanted + } else { + wanted.min(space) + }; let mut buffer = vec![0u8; buf_size]; // 从文件读取 @@ -275,26 +291,6 @@ fn splice_file_to_pipe( buffer.truncate(read_len); - // Linux-like nonblocking semantics for file->pipe splice: - // - SPLICE_F_NONBLOCK makes the splice nonblocking regardless of the pipe fd's O_NONBLOCK. - // - When "atomic" (<= PIPE_BUF), lack of space yields EAGAIN (no partial write). - // - When non-atomic, write as much as fits and return partial. - if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { - let space = pipe_inode.writable_len(); - if space == 0 && pipe_inode.has_readers() { - return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); - } - if buffer.len() <= crate::ipc::pipe::PIPE_BUF && space < buffer.len() { - return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); - } - let to_write = if space == 0 { - buffer.len() - } else { - buffer.len().min(space) - }; - buffer.truncate(to_write); - } - // 写入 pipe let written = if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { pipe_inode.write_from_splice_nonblock(&buffer) @@ -313,6 +309,22 @@ fn splice_file_to_pipe( } } +fn splice_file_read_limit(file: &File, offset: Option, limit: usize) -> usize { + if limit == 0 { + return 0; + } + + if matches!(file.file_type(), FileType::File) { + if let Ok(metadata) = file.metadata() { + let size = metadata.size.max(0) as usize; + let pos = offset.unwrap_or_else(|| file.pos()); + return limit.min(size.saturating_sub(pos)); + } + } + + limit +} + /// pipe 到 file 的数据传输 fn splice_pipe_to_file( pipe: &File, diff --git a/kernel/src/ipc/pipe.rs b/kernel/src/ipc/pipe.rs index 5b9b615cbc..824a23b6e5 100644 --- a/kernel/src/ipc/pipe.rs +++ b/kernel/src/ipc/pipe.rs @@ -606,11 +606,12 @@ impl LockedPipeInode { Ok(to_write) } - /// Wait for space before file->pipe splice reads from the input file. + /// Wait for pipe space before file->pipe splice reads from the input file. /// - /// DragonOS pipes are byte-ring based, so this follows the same write - /// atomicity rule as pipe writes: writes up to PIPE_BUF wait for the full - /// request; larger writes wait for any space and may complete partially. + /// The caller must pass the maximum number of bytes it can actually read + /// from the input file for this splice attempt. DragonOS pipes are byte-ring + /// based, so requests up to PIPE_BUF wait for the complete readable chunk; + /// larger requests wait for any space and may complete partially. pub fn wait_writable_for_splice(&self, len: usize) -> Result { if len == 0 { return Ok(0); diff --git a/kernel/src/libs/rwsem.rs b/kernel/src/libs/rwsem.rs index 3d3e64e036..ec84995ae7 100644 --- a/kernel/src/libs/rwsem.rs +++ b/kernel/src/libs/rwsem.rs @@ -15,7 +15,9 @@ use core::{ use alloc::rc::Rc; use system_error::SystemError; -use super::wait_queue::WaitQueue; +use crate::process::ProcessManager; + +use super::wait_queue::{WaitQueue, Waiter}; /// A mutex that provides data access to either one writer or many readers. /// @@ -47,6 +49,7 @@ use super::wait_queue::WaitQueue; #[derive(Debug)] pub struct RwSem { lock: AtomicUsize, + waiters: AtomicUsize, queue: WaitQueue, val: UnsafeCell, } @@ -95,6 +98,7 @@ impl RwSem { Self { val: UnsafeCell::new(val), lock: AtomicUsize::new(0), + waiters: AtomicUsize::new(0), queue: WaitQueue::default(), } } @@ -107,7 +111,11 @@ impl RwSem { /// upreaders present. #[track_caller] pub fn read(&self) -> RwSemReadGuard<'_, T> { - self.queue.wait_until(|| self.try_read()) + if let Some(guard) = self.try_read() { + return guard; + } + + self.wait_read(false).unwrap() } /// Acquires a write mutex and sleep until it can be acquired. @@ -116,7 +124,13 @@ impl RwSem { /// or readers present. #[track_caller] pub fn write(&self) -> RwSemWriteGuard<'_, T> { - self.queue.wait_until(|| self.try_write()) + if self.waiters.load(Acquire) == 0 || ProcessManager::current_pcb().preempt_count() != 0 { + if let Some(guard) = self.try_write() { + return guard; + } + } + + self.wait_write(false).unwrap() } /// Acquires a upread mutex and sleep until it can be acquired. @@ -124,17 +138,31 @@ impl RwSem { /// The calling thread will sleep until there are no writers or upreaders present. #[track_caller] pub fn upread(&self) -> RwSemUpgradeableGuard<'_, T> { - self.queue.wait_until(|| self.try_upread()) + if let Some(guard) = self.try_upread() { + return guard; + } + + self.wait_upread(false).unwrap() } /// Blocking read acquire (interruptible). pub fn read_interruptible(&self) -> Result, SystemError> { - self.queue.wait_until_interruptible(|| self.try_read()) + if let Some(guard) = self.try_read() { + return Ok(guard); + } + + self.wait_read(true) } /// Blocking write acquire (interruptible). pub fn write_interruptible(&self) -> Result, SystemError> { - self.queue.wait_until_interruptible(|| self.try_write()) + if self.waiters.load(Acquire) == 0 || ProcessManager::current_pcb().preempt_count() != 0 { + if let Some(guard) = self.try_write() { + return Ok(guard); + } + } + + self.wait_write(true) } /// Attempts to acquire a read lock. @@ -194,6 +222,97 @@ impl RwSem { pub fn get_mut(&mut self) -> &mut T { self.val.get_mut() } + + fn wait_read(&self, interruptible: bool) -> Result, SystemError> { + self.waiters.fetch_add(1, AcqRel); + let (waiter, waker) = Waiter::new_pair(); + + loop { + if let Err(e) = self.queue.register_waker(waker.clone()) { + self.waiters.fetch_sub(1, Release); + return Err(e); + } + + if let Some(guard) = self.try_read() { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Ok(guard); + } + + if let Err(e) = waiter.wait(interruptible) { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Err(e); + } + } + } + + fn wait_write(&self, interruptible: bool) -> Result, SystemError> { + let had_waiters = self.waiters.fetch_add(1, AcqRel) != 0; + let mut must_sleep_once = had_waiters; + let (waiter, waker) = Waiter::new_pair(); + + loop { + if let Err(e) = self.queue.register_waker(waker.clone()) { + self.waiters.fetch_sub(1, Release); + return Err(e); + } + + if must_sleep_once && self.waiters.load(Acquire) == 1 { + must_sleep_once = false; + } + + if !must_sleep_once { + if let Some(guard) = self.try_write() { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Ok(guard); + } + } + must_sleep_once = false; + + if let Err(e) = waiter.wait(interruptible) { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Err(e); + } + } + } + + fn wait_upread( + &self, + interruptible: bool, + ) -> Result, SystemError> { + let had_waiters = self.waiters.fetch_add(1, AcqRel) != 0; + let mut must_sleep_once = had_waiters; + let (waiter, waker) = Waiter::new_pair(); + + loop { + if let Err(e) = self.queue.register_waker(waker.clone()) { + self.waiters.fetch_sub(1, Release); + return Err(e); + } + + if must_sleep_once && self.waiters.load(Acquire) == 1 { + must_sleep_once = false; + } + + if !must_sleep_once { + if let Some(guard) = self.try_upread() { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Ok(guard); + } + } + must_sleep_once = false; + + if let Err(e) = waiter.wait(interruptible) { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Err(e); + } + } + } } impl Deref for RwSemReadGuard<'_, T> { diff --git a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc new file mode 100644 index 0000000000..f1e99281d8 --- /dev/null +++ b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc @@ -0,0 +1,258 @@ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace { + +constexpr char kByte = 0x01; +constexpr int kIterations = 1000; + +void AlarmHandler(int) { + _exit(124); +} + +class TempFile { + public: + TempFile() { + char tmpl[] = "/tmp/dunitest_splice_concurrent_XXXXXX"; + fd_ = mkstemp(tmpl); + if (fd_ >= 0) { + strcpy(path_, tmpl); + } + } + + ~TempFile() { + if (fd_ >= 0) { + close(fd_); + } + if (path_[0] != '\0') { + unlink(path_); + } + } + + TempFile(const TempFile&) = delete; + TempFile& operator=(const TempFile&) = delete; + + bool valid() const { + return fd_ >= 0; + } + + int fd() const { + return fd_; + } + + private: + int fd_ = -1; + char path_[sizeof("/tmp/dunitest_splice_concurrent_XXXXXX")] = {}; +}; + +struct TestState { + std::atomic done {false}; + std::atomic failures {0}; + int file_fd = -1; + int pipe_read_fd = -1; + void* mapping = MAP_FAILED; +}; + +void* FileReaderThread(void* arg) { + auto* state = static_cast(arg); + while (!state->done.load(std::memory_order_acquire)) { + char byte = 0; + if (lseek(state->file_fd, 0, SEEK_SET) < 0) { + state->failures.fetch_add(1, std::memory_order_relaxed); + continue; + } + + const ssize_t n = read(state->file_fd, &byte, 1); + if (n < 0) { + if (errno != EINTR) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + continue; + } + if (n == 1 && byte != kByte) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + } + return nullptr; +} + +void* PipeReaderThread(void* arg) { + auto* state = static_cast(arg); + while (!state->done.load(std::memory_order_acquire)) { + char byte = 0; + const ssize_t n = read(state->pipe_read_fd, &byte, 1); + if (n < 0) { + if (errno != EINTR) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + continue; + } + if (n == 1 && byte != kByte) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + } + return nullptr; +} + +void* MadviseThread(void* arg) { + auto* state = static_cast(arg); + while (!state->done.load(std::memory_order_acquire)) { + madvise(state->mapping, 4096, MADV_DONTNEED); + } + return nullptr; +} + +void FillPipeLeavingSpace(int fd, int capacity, size_t free_space) { + ASSERT_GT(capacity, 0); + ASSERT_LT(free_space, static_cast(capacity)); + + const size_t fill = static_cast(capacity) - free_space; + std::vector bytes(fill, 0x7f); + size_t written = 0; + while (written < fill) { + const ssize_t n = write(fd, bytes.data() + written, fill - written); + ASSERT_GT(n, 0) << strerror(errno); + written += static_cast(n); + } +} + +bool IsDragonOS() { + struct utsname uts {}; + if (uname(&uts) != 0) { + return false; + } + return strstr(uts.release, "dragonos") != nullptr || + strstr(uts.nodename, "dragonos") != nullptr; +} + +} // namespace + +TEST(SpliceConcurrentIo, FileToPipeNotStarvedByMadvise) { + struct sigaction sa {}; + sa.sa_handler = AlarmHandler; + ASSERT_EQ(0, sigaction(SIGALRM, &sa, nullptr)) << strerror(errno); + alarm(20); + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + + void* mapping = mmap(nullptr, 4096, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + ASSERT_NE(MAP_FAILED, mapping) << strerror(errno); + + TestState state; + state.file_fd = file.fd(); + state.pipe_read_fd = pipe_fds[0]; + state.mapping = mapping; + + pthread_t file_reader {}; + pthread_t pipe_reader {}; + pthread_t madvise_thread {}; + ASSERT_EQ(0, pthread_create(&file_reader, nullptr, FileReaderThread, &state)) + << strerror(errno); + ASSERT_EQ(0, pthread_create(&pipe_reader, nullptr, PipeReaderThread, &state)) + << strerror(errno); + ASSERT_EQ(0, pthread_create(&madvise_thread, nullptr, MadviseThread, &state)) + << strerror(errno); + + for (int i = 0; i < kIterations; ++i) { + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + ASSERT_EQ(1, splice(file.fd(), nullptr, pipe_fds[1], nullptr, 1, 0)) + << strerror(errno); + } + + state.done.store(true, std::memory_order_release); + close(pipe_fds[1]); + + ASSERT_EQ(0, pthread_join(file_reader, nullptr)) << strerror(errno); + ASSERT_EQ(0, pthread_join(pipe_reader, nullptr)) << strerror(errno); + ASSERT_EQ(0, pthread_join(madvise_thread, nullptr)) << strerror(errno); + + close(pipe_fds[0]); + munmap(mapping, 4096); + alarm(0); + + EXPECT_EQ(0, state.failures.load(std::memory_order_relaxed)); +} + +TEST(SpliceConcurrentIo, FileToPipeShortReadNeedsOnlyAnyPipeSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS byte-ring pipe regression test"; + } + + struct sigaction sa {}; + sa.sa_handler = AlarmHandler; + ASSERT_EQ(0, sigaction(SIGALRM, &sa, nullptr)) << strerror(errno); + alarm(5); + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, 1) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, 1); + + ASSERT_EQ(1, splice(file.fd(), nullptr, pipe_fds[1], nullptr, 4096, 0)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); + alarm(0); +} + +TEST(SpliceConcurrentIo, NonblockFileToPipeShortReadUsesAvailableSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS byte-ring pipe regression test"; + } + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, 1) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, 1); + + ASSERT_EQ(1, + splice(file.fd(), nullptr, pipe_fds[1], nullptr, 4096, + SPLICE_F_NONBLOCK)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/user/apps/tests/dunitest/whitelist.txt b/user/apps/tests/dunitest/whitelist.txt index 8901766b30..a8ef260c1d 100644 --- a/user/apps/tests/dunitest/whitelist.txt +++ b/user/apps/tests/dunitest/whitelist.txt @@ -13,6 +13,7 @@ normal/proc_self_limits normal/mlock_semantics normal/sched_affinity normal/sync_file_range +normal/splice_concurrent_io normal/rcu_selftest normal/test_tlb_shootdown normal/tcp_close_semantics From a15b34d47608c76b0491fe3945473271e3607217 Mon Sep 17 00:00:00 2001 From: longjin Date: Tue, 19 May 2026 06:14:59 +0000 Subject: [PATCH 3/5] fix(vfs): avoid splice size clipping for virtual files Signed-off-by: longjin --- .../src/filesystem/vfs/syscall/sys_splice.rs | 6 ++++- .../suites/normal/splice_concurrent_io.cc | 27 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/kernel/src/filesystem/vfs/syscall/sys_splice.rs b/kernel/src/filesystem/vfs/syscall/sys_splice.rs index 3f42af3170..a7c44c18ee 100644 --- a/kernel/src/filesystem/vfs/syscall/sys_splice.rs +++ b/kernel/src/filesystem/vfs/syscall/sys_splice.rs @@ -314,7 +314,7 @@ fn splice_file_read_limit(file: &File, offset: Option, limit: usize) -> u return 0; } - if matches!(file.file_type(), FileType::File) { + if matches!(file.file_type(), FileType::File) && splice_regular_file_has_trusted_size(file) { if let Ok(metadata) = file.metadata() { let size = metadata.size.max(0) as usize; let pos = offset.unwrap_or_else(|| file.pos()); @@ -325,6 +325,10 @@ fn splice_file_read_limit(file: &File, offset: Option, limit: usize) -> u limit } +fn splice_regular_file_has_trusted_size(file: &File) -> bool { + matches!(file.inode().fs().name(), "ext4" | "fat" | "tmpfs" | "ramfs") +} + /// pipe 到 file 的数据传输 fn splice_pipe_to_file( pipe: &File, diff --git a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc index f1e99281d8..d0cc18e9e9 100644 --- a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc +++ b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc @@ -13,10 +13,12 @@ #include #include #include +#include #include #include #include +#include #include namespace { @@ -252,6 +254,31 @@ TEST(SpliceConcurrentIo, NonblockFileToPipeShortReadUsesAvailableSpace) { close(pipe_fds[1]); } +TEST(SpliceConcurrentIo, ProcfsZeroSizeRegularFileCanSpliceData) { + const int file_fd = open("/proc/cpuinfo", O_RDONLY); + ASSERT_GE(file_fd, 0) << strerror(errno); + + struct stat st {}; + ASSERT_EQ(0, fstat(file_fd, &st)) << strerror(errno); + ASSERT_TRUE(S_ISREG(st.st_mode)); + ASSERT_EQ(0, st.st_size); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + + const ssize_t spliced = splice(file_fd, nullptr, pipe_fds[1], nullptr, 4096, 0); + ASSERT_GT(spliced, 0) << strerror(errno); + + std::vector bytes(static_cast(spliced)); + ASSERT_EQ(spliced, read(pipe_fds[0], bytes.data(), bytes.size())) << strerror(errno); + const std::string text(bytes.begin(), bytes.end()); + EXPECT_NE(std::string::npos, text.find("processor")); + + close(pipe_fds[0]); + close(pipe_fds[1]); + close(file_fd); +} + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); From dee3c57d27c140ecb23d65c5a2968977fc6a8e2c Mon Sep 17 00:00:00 2001 From: longjin Date: Tue, 19 May 2026 07:08:49 +0000 Subject: [PATCH 4/5] fix(vfs): preserve nonblocking splice pipe buffer atomicity Signed-off-by: longjin --- .../src/filesystem/vfs/syscall/sys_splice.rs | 5 +++- .../suites/normal/splice_concurrent_io.cc | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/kernel/src/filesystem/vfs/syscall/sys_splice.rs b/kernel/src/filesystem/vfs/syscall/sys_splice.rs index a7c44c18ee..971b1e62c0 100644 --- a/kernel/src/filesystem/vfs/syscall/sys_splice.rs +++ b/kernel/src/filesystem/vfs/syscall/sys_splice.rs @@ -3,7 +3,7 @@ use crate::filesystem::vfs::file::FileMode; use crate::filesystem::vfs::FileFlags; use crate::filesystem::vfs::{file::File, syscall::SpliceFlags, FileType}; use crate::ipc::kill::send_signal_to_pid; -use crate::ipc::pipe::LockedPipeInode; +use crate::ipc::pipe::{LockedPipeInode, PIPE_BUF}; use crate::process::resource::RLimitID; use crate::process::ProcessManager; use crate::syscall::table::Syscall; @@ -264,6 +264,9 @@ fn splice_file_to_pipe( if space == 0 && pipe_inode.has_readers() { return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); } + if wanted <= PIPE_BUF && space < wanted && pipe_inode.has_readers() { + return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); + } space } else { pipe_inode.wait_writable_for_splice(wanted)? diff --git a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc index d0cc18e9e9..23924a4af8 100644 --- a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc +++ b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc @@ -254,6 +254,35 @@ TEST(SpliceConcurrentIo, NonblockFileToPipeShortReadUsesAvailableSpace) { close(pipe_fds[1]); } +TEST(SpliceConcurrentIo, NonblockFileToPipePipeBufNeedsCompleteSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS byte-ring pipe regression test"; + } + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + std::vector bytes(4096, kByte); + ASSERT_EQ(static_cast(bytes.size()), + write(file.fd(), bytes.data(), bytes.size())) + << strerror(errno); + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, 1) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, 1); + + errno = 0; + EXPECT_EQ(-1, + splice(file.fd(), nullptr, pipe_fds[1], nullptr, bytes.size(), + SPLICE_F_NONBLOCK)); + EXPECT_EQ(EAGAIN, errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); +} + TEST(SpliceConcurrentIo, ProcfsZeroSizeRegularFileCanSpliceData) { const int file_fd = open("/proc/cpuinfo", O_RDONLY); ASSERT_GE(file_fd, 0) << strerror(errno); From 402f1c09b9b3a025f6c1148bb3dd6998df8d4715 Mon Sep 17 00:00:00 2001 From: longjin Date: Tue, 19 May 2026 08:42:51 +0000 Subject: [PATCH 5/5] fix(vfs): handle procfs splice short reads Signed-off-by: longjin --- .../src/filesystem/vfs/syscall/sys_splice.rs | 30 +++++-- kernel/src/ipc/pipe.rs | 27 ++++++ .../suites/normal/splice_concurrent_io.cc | 88 +++++++++++++++++++ 3 files changed, 136 insertions(+), 9 deletions(-) diff --git a/kernel/src/filesystem/vfs/syscall/sys_splice.rs b/kernel/src/filesystem/vfs/syscall/sys_splice.rs index 971b1e62c0..434a58414a 100644 --- a/kernel/src/filesystem/vfs/syscall/sys_splice.rs +++ b/kernel/src/filesystem/vfs/syscall/sys_splice.rs @@ -254,25 +254,33 @@ fn splice_file_to_pipe( ) -> Result { let pipe_inode = get_pipe_inode(pipe)?; - let wanted = splice_file_read_limit(file, offset, len.min(4096)); - if wanted == 0 { + let limit = len.min(4096); + let trusted_read_limit = splice_trusted_file_read_limit(file, offset, limit); + if trusted_read_limit == Some(0) { return Ok(0); } + let wanted = trusted_read_limit.unwrap_or(limit); let space = if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { let space = pipe_inode.writable_len(); if space == 0 && pipe_inode.has_readers() { return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); } - if wanted <= PIPE_BUF && space < wanted && pipe_inode.has_readers() { + if trusted_read_limit.is_some() + && wanted <= PIPE_BUF + && space < wanted + && pipe_inode.has_readers() + { return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); } space - } else { + } else if trusted_read_limit.is_some() { pipe_inode.wait_writable_for_splice(wanted)? + } else { + pipe_inode.wait_writable_any_for_splice()? }; - let buf_size = if space == 0 { + let buf_size = if trusted_read_limit.is_some() && space == 0 { wanted } else { wanted.min(space) @@ -312,20 +320,24 @@ fn splice_file_to_pipe( } } -fn splice_file_read_limit(file: &File, offset: Option, limit: usize) -> usize { +fn splice_trusted_file_read_limit( + file: &File, + offset: Option, + limit: usize, +) -> Option { if limit == 0 { - return 0; + return Some(0); } if matches!(file.file_type(), FileType::File) && splice_regular_file_has_trusted_size(file) { if let Ok(metadata) = file.metadata() { let size = metadata.size.max(0) as usize; let pos = offset.unwrap_or_else(|| file.pos()); - return limit.min(size.saturating_sub(pos)); + return Some(limit.min(size.saturating_sub(pos))); } } - limit + None } fn splice_regular_file_has_trusted_size(file: &File) -> bool { diff --git a/kernel/src/ipc/pipe.rs b/kernel/src/ipc/pipe.rs index 824a23b6e5..dcdeda7b60 100644 --- a/kernel/src/ipc/pipe.rs +++ b/kernel/src/ipc/pipe.rs @@ -648,6 +648,33 @@ impl LockedPipeInode { } } + /// Wait until the pipe has any writable byte for file->pipe splice. + /// + /// This matches Linux `wait_for_space()` for inputs whose exact readable + /// length is not known before calling into the file. The caller can then + /// cap the read by the returned byte space. + pub fn wait_writable_any_for_splice(&self) -> Result { + loop { + let guard = self.inner.lock(); + if guard.reader == 0 { + drop(guard); + let _ = send_kernel_signal_to_current(Signal::SIGPIPE); + return Err(SystemError::EPIPE); + } + + let used = guard.valid_cnt.max(0) as usize; + let space = guard.buf_size.saturating_sub(used); + if space > 0 { + return Ok(space); + } + + drop(guard); + if wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {}).is_err() { + return Err(SystemError::ERESTARTSYS); + } + } + } + /// 从管道中“窥视”最多 `len` 字节数据到 `buf`,但不消耗管道数据。 /// /// 返回实际拷贝的字节数(可能小于 `len`)。不会睡眠。 diff --git a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc index 23924a4af8..9358eb54c5 100644 --- a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc +++ b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc @@ -145,6 +145,21 @@ bool IsDragonOS() { strstr(uts.nodename, "dragonos") != nullptr; } +std::string ReadSmallFile(const char* path) { + const int fd = open(path, O_RDONLY); + if (fd < 0) { + return ""; + } + + char buf[256] = {}; + const ssize_t n = read(fd, buf, sizeof(buf)); + close(fd); + if (n <= 0 || n >= static_cast(sizeof(buf))) { + return ""; + } + return std::string(buf, static_cast(n)); +} + } // namespace TEST(SpliceConcurrentIo, FileToPipeNotStarvedByMadvise) { @@ -308,6 +323,79 @@ TEST(SpliceConcurrentIo, ProcfsZeroSizeRegularFileCanSpliceData) { close(file_fd); } +TEST(SpliceConcurrentIo, ProcfsShortReadNeedsOnlyActualPipeSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS procfs splice regression test"; + } + + struct sigaction sa {}; + sa.sa_handler = AlarmHandler; + ASSERT_EQ(0, sigaction(SIGALRM, &sa, nullptr)) << strerror(errno); + alarm(5); + + constexpr const char* kPath = "/proc/loadavg"; + const std::string expected = ReadSmallFile(kPath); + ASSERT_GT(expected.size(), 0U); + ASSERT_LT(expected.size(), 4096U); + + const int file_fd = open(kPath, O_RDONLY); + ASSERT_GE(file_fd, 0) << strerror(errno); + + struct stat st {}; + ASSERT_EQ(0, fstat(file_fd, &st)) << strerror(errno); + ASSERT_TRUE(S_ISREG(st.st_mode)); + ASSERT_EQ(0, st.st_size); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, static_cast(expected.size())) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, expected.size()); + + ASSERT_EQ(static_cast(expected.size()), + splice(file_fd, nullptr, pipe_fds[1], nullptr, 4096, 0)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); + close(file_fd); + alarm(0); +} + +TEST(SpliceConcurrentIo, NonblockProcfsShortReadUsesAvailablePipeSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS procfs splice regression test"; + } + + constexpr const char* kPath = "/proc/loadavg"; + const std::string expected = ReadSmallFile(kPath); + ASSERT_GT(expected.size(), 0U); + ASSERT_LT(expected.size(), 4096U); + + const int file_fd = open(kPath, O_RDONLY); + ASSERT_GE(file_fd, 0) << strerror(errno); + + struct stat st {}; + ASSERT_EQ(0, fstat(file_fd, &st)) << strerror(errno); + ASSERT_TRUE(S_ISREG(st.st_mode)); + ASSERT_EQ(0, st.st_size); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, static_cast(expected.size())) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, expected.size()); + + ASSERT_EQ(static_cast(expected.size()), + splice(file_fd, nullptr, pipe_fds[1], nullptr, 4096, + SPLICE_F_NONBLOCK)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); + close(file_fd); +} + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();