diff --git a/kernel/src/filesystem/vfs/syscall/sys_splice.rs b/kernel/src/filesystem/vfs/syscall/sys_splice.rs index fa9ba38e3..434a58414 100644 --- a/kernel/src/filesystem/vfs/syscall/sys_splice.rs +++ b/kernel/src/filesystem/vfs/syscall/sys_splice.rs @@ -3,7 +3,7 @@ use crate::filesystem::vfs::file::FileMode; use crate::filesystem::vfs::FileFlags; use crate::filesystem::vfs::{file::File, syscall::SpliceFlags, FileType}; use crate::ipc::kill::send_signal_to_pid; -use crate::ipc::pipe::LockedPipeInode; +use crate::ipc::pipe::{LockedPipeInode, PIPE_BUF}; use crate::process::resource::RLimitID; use crate::process::ProcessManager; use crate::syscall::table::Syscall; @@ -254,7 +254,37 @@ fn splice_file_to_pipe( ) -> Result { let pipe_inode = get_pipe_inode(pipe)?; - let buf_size = len.min(4096); + let limit = len.min(4096); + let trusted_read_limit = splice_trusted_file_read_limit(file, offset, limit); + if trusted_read_limit == Some(0) { + return Ok(0); + } + let wanted = trusted_read_limit.unwrap_or(limit); + + let space = if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { + let space = pipe_inode.writable_len(); + if space == 0 && pipe_inode.has_readers() { + return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); + } + if trusted_read_limit.is_some() + && wanted <= PIPE_BUF + && space < wanted + && pipe_inode.has_readers() + { + return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); + } + space + } else if trusted_read_limit.is_some() { + pipe_inode.wait_writable_for_splice(wanted)? + } else { + pipe_inode.wait_writable_any_for_splice()? + }; + + let buf_size = if trusted_read_limit.is_some() && space == 0 { + wanted + } else { + wanted.min(space) + }; let mut buffer = vec![0u8; buf_size]; // 从文件读取 @@ -272,26 +302,6 @@ fn splice_file_to_pipe( buffer.truncate(read_len); - // Linux-like nonblocking semantics for file->pipe splice: - // - SPLICE_F_NONBLOCK makes the splice nonblocking regardless of the pipe fd's O_NONBLOCK. - // - When "atomic" (<= PIPE_BUF), lack of space yields EAGAIN (no partial write). - // - When non-atomic, write as much as fits and return partial. - if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { - let space = pipe_inode.writable_len(); - if space == 0 && pipe_inode.has_readers() { - return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); - } - if buffer.len() <= crate::ipc::pipe::PIPE_BUF && space < buffer.len() { - return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); - } - let to_write = if space == 0 { - buffer.len() - } else { - buffer.len().min(space) - }; - buffer.truncate(to_write); - } - // 写入 pipe let written = if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) { pipe_inode.write_from_splice_nonblock(&buffer) @@ -310,6 +320,30 @@ fn splice_file_to_pipe( } } +fn splice_trusted_file_read_limit( + file: &File, + offset: Option, + limit: usize, +) -> Option { + if limit == 0 { + return Some(0); + } + + if matches!(file.file_type(), FileType::File) && splice_regular_file_has_trusted_size(file) { + if let Ok(metadata) = file.metadata() { + let size = metadata.size.max(0) as usize; + let pos = offset.unwrap_or_else(|| file.pos()); + return Some(limit.min(size.saturating_sub(pos))); + } + } + + None +} + +fn splice_regular_file_has_trusted_size(file: &File) -> bool { + matches!(file.inode().fs().name(), "ext4" | "fat" | "tmpfs" | "ramfs") +} + /// pipe 到 file 的数据传输 fn splice_pipe_to_file( pipe: &File, diff --git a/kernel/src/ipc/pipe.rs b/kernel/src/ipc/pipe.rs index 02ab69a02..dcdeda7b6 100644 --- a/kernel/src/ipc/pipe.rs +++ b/kernel/src/ipc/pipe.rs @@ -606,6 +606,75 @@ impl LockedPipeInode { Ok(to_write) } + /// Wait for pipe space before file->pipe splice reads from the input file. + /// + /// The caller must pass the maximum number of bytes it can actually read + /// from the input file for this splice attempt. DragonOS pipes are byte-ring + /// based, so requests up to PIPE_BUF wait for the complete readable chunk; + /// larger requests wait for any space and may complete partially. + pub fn wait_writable_for_splice(&self, len: usize) -> Result { + if len == 0 { + return Ok(0); + } + + let need_atomic = len <= PIPE_BUF; + loop { + let guard = self.inner.lock(); + if guard.reader == 0 { + drop(guard); + let _ = send_kernel_signal_to_current(Signal::SIGPIPE); + return Err(SystemError::EPIPE); + } + + let used = guard.valid_cnt.max(0) as usize; + let space = guard.buf_size.saturating_sub(used); + if (need_atomic && space >= len) || (!need_atomic && space > 0) { + return Ok(if need_atomic { len } else { len.min(space) }); + } + + drop(guard); + let wait_result = if need_atomic { + wq_wait_event_interruptible!( + self.write_wait_queue, + self.writeable_len_at_least(len), + {} + ) + } else { + wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {}) + }; + if wait_result.is_err() { + return Err(SystemError::ERESTARTSYS); + } + } + } + + /// Wait until the pipe has any writable byte for file->pipe splice. + /// + /// This matches Linux `wait_for_space()` for inputs whose exact readable + /// length is not known before calling into the file. The caller can then + /// cap the read by the returned byte space. + pub fn wait_writable_any_for_splice(&self) -> Result { + loop { + let guard = self.inner.lock(); + if guard.reader == 0 { + drop(guard); + let _ = send_kernel_signal_to_current(Signal::SIGPIPE); + return Err(SystemError::EPIPE); + } + + let used = guard.valid_cnt.max(0) as usize; + let space = guard.buf_size.saturating_sub(used); + if space > 0 { + return Ok(space); + } + + drop(guard); + if wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {}).is_err() { + return Err(SystemError::ERESTARTSYS); + } + } + } + /// 从管道中“窥视”最多 `len` 字节数据到 `buf`,但不消耗管道数据。 /// /// 返回实际拷贝的字节数(可能小于 `len`)。不会睡眠。 diff --git a/kernel/src/libs/rwsem.rs b/kernel/src/libs/rwsem.rs index 3d3e64e03..ec84995ae 100644 --- a/kernel/src/libs/rwsem.rs +++ b/kernel/src/libs/rwsem.rs @@ -15,7 +15,9 @@ use core::{ use alloc::rc::Rc; use system_error::SystemError; -use super::wait_queue::WaitQueue; +use crate::process::ProcessManager; + +use super::wait_queue::{WaitQueue, Waiter}; /// A mutex that provides data access to either one writer or many readers. /// @@ -47,6 +49,7 @@ use super::wait_queue::WaitQueue; #[derive(Debug)] pub struct RwSem { lock: AtomicUsize, + waiters: AtomicUsize, queue: WaitQueue, val: UnsafeCell, } @@ -95,6 +98,7 @@ impl RwSem { Self { val: UnsafeCell::new(val), lock: AtomicUsize::new(0), + waiters: AtomicUsize::new(0), queue: WaitQueue::default(), } } @@ -107,7 +111,11 @@ impl RwSem { /// upreaders present. #[track_caller] pub fn read(&self) -> RwSemReadGuard<'_, T> { - self.queue.wait_until(|| self.try_read()) + if let Some(guard) = self.try_read() { + return guard; + } + + self.wait_read(false).unwrap() } /// Acquires a write mutex and sleep until it can be acquired. @@ -116,7 +124,13 @@ impl RwSem { /// or readers present. #[track_caller] pub fn write(&self) -> RwSemWriteGuard<'_, T> { - self.queue.wait_until(|| self.try_write()) + if self.waiters.load(Acquire) == 0 || ProcessManager::current_pcb().preempt_count() != 0 { + if let Some(guard) = self.try_write() { + return guard; + } + } + + self.wait_write(false).unwrap() } /// Acquires a upread mutex and sleep until it can be acquired. @@ -124,17 +138,31 @@ impl RwSem { /// The calling thread will sleep until there are no writers or upreaders present. #[track_caller] pub fn upread(&self) -> RwSemUpgradeableGuard<'_, T> { - self.queue.wait_until(|| self.try_upread()) + if let Some(guard) = self.try_upread() { + return guard; + } + + self.wait_upread(false).unwrap() } /// Blocking read acquire (interruptible). pub fn read_interruptible(&self) -> Result, SystemError> { - self.queue.wait_until_interruptible(|| self.try_read()) + if let Some(guard) = self.try_read() { + return Ok(guard); + } + + self.wait_read(true) } /// Blocking write acquire (interruptible). pub fn write_interruptible(&self) -> Result, SystemError> { - self.queue.wait_until_interruptible(|| self.try_write()) + if self.waiters.load(Acquire) == 0 || ProcessManager::current_pcb().preempt_count() != 0 { + if let Some(guard) = self.try_write() { + return Ok(guard); + } + } + + self.wait_write(true) } /// Attempts to acquire a read lock. @@ -194,6 +222,97 @@ impl RwSem { pub fn get_mut(&mut self) -> &mut T { self.val.get_mut() } + + fn wait_read(&self, interruptible: bool) -> Result, SystemError> { + self.waiters.fetch_add(1, AcqRel); + let (waiter, waker) = Waiter::new_pair(); + + loop { + if let Err(e) = self.queue.register_waker(waker.clone()) { + self.waiters.fetch_sub(1, Release); + return Err(e); + } + + if let Some(guard) = self.try_read() { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Ok(guard); + } + + if let Err(e) = waiter.wait(interruptible) { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Err(e); + } + } + } + + fn wait_write(&self, interruptible: bool) -> Result, SystemError> { + let had_waiters = self.waiters.fetch_add(1, AcqRel) != 0; + let mut must_sleep_once = had_waiters; + let (waiter, waker) = Waiter::new_pair(); + + loop { + if let Err(e) = self.queue.register_waker(waker.clone()) { + self.waiters.fetch_sub(1, Release); + return Err(e); + } + + if must_sleep_once && self.waiters.load(Acquire) == 1 { + must_sleep_once = false; + } + + if !must_sleep_once { + if let Some(guard) = self.try_write() { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Ok(guard); + } + } + must_sleep_once = false; + + if let Err(e) = waiter.wait(interruptible) { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Err(e); + } + } + } + + fn wait_upread( + &self, + interruptible: bool, + ) -> Result, SystemError> { + let had_waiters = self.waiters.fetch_add(1, AcqRel) != 0; + let mut must_sleep_once = had_waiters; + let (waiter, waker) = Waiter::new_pair(); + + loop { + if let Err(e) = self.queue.register_waker(waker.clone()) { + self.waiters.fetch_sub(1, Release); + return Err(e); + } + + if must_sleep_once && self.waiters.load(Acquire) == 1 { + must_sleep_once = false; + } + + if !must_sleep_once { + if let Some(guard) = self.try_upread() { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Ok(guard); + } + } + must_sleep_once = false; + + if let Err(e) = waiter.wait(interruptible) { + self.queue.remove_waker(&waker); + self.waiters.fetch_sub(1, Release); + return Err(e); + } + } + } } impl Deref for RwSemReadGuard<'_, T> { diff --git a/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc new file mode 100644 index 000000000..9358eb54c --- /dev/null +++ b/user/apps/tests/dunitest/suites/normal/splice_concurrent_io.cc @@ -0,0 +1,402 @@ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace { + +constexpr char kByte = 0x01; +constexpr int kIterations = 1000; + +void AlarmHandler(int) { + _exit(124); +} + +class TempFile { + public: + TempFile() { + char tmpl[] = "/tmp/dunitest_splice_concurrent_XXXXXX"; + fd_ = mkstemp(tmpl); + if (fd_ >= 0) { + strcpy(path_, tmpl); + } + } + + ~TempFile() { + if (fd_ >= 0) { + close(fd_); + } + if (path_[0] != '\0') { + unlink(path_); + } + } + + TempFile(const TempFile&) = delete; + TempFile& operator=(const TempFile&) = delete; + + bool valid() const { + return fd_ >= 0; + } + + int fd() const { + return fd_; + } + + private: + int fd_ = -1; + char path_[sizeof("/tmp/dunitest_splice_concurrent_XXXXXX")] = {}; +}; + +struct TestState { + std::atomic done {false}; + std::atomic failures {0}; + int file_fd = -1; + int pipe_read_fd = -1; + void* mapping = MAP_FAILED; +}; + +void* FileReaderThread(void* arg) { + auto* state = static_cast(arg); + while (!state->done.load(std::memory_order_acquire)) { + char byte = 0; + if (lseek(state->file_fd, 0, SEEK_SET) < 0) { + state->failures.fetch_add(1, std::memory_order_relaxed); + continue; + } + + const ssize_t n = read(state->file_fd, &byte, 1); + if (n < 0) { + if (errno != EINTR) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + continue; + } + if (n == 1 && byte != kByte) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + } + return nullptr; +} + +void* PipeReaderThread(void* arg) { + auto* state = static_cast(arg); + while (!state->done.load(std::memory_order_acquire)) { + char byte = 0; + const ssize_t n = read(state->pipe_read_fd, &byte, 1); + if (n < 0) { + if (errno != EINTR) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + continue; + } + if (n == 1 && byte != kByte) { + state->failures.fetch_add(1, std::memory_order_relaxed); + } + } + return nullptr; +} + +void* MadviseThread(void* arg) { + auto* state = static_cast(arg); + while (!state->done.load(std::memory_order_acquire)) { + madvise(state->mapping, 4096, MADV_DONTNEED); + } + return nullptr; +} + +void FillPipeLeavingSpace(int fd, int capacity, size_t free_space) { + ASSERT_GT(capacity, 0); + ASSERT_LT(free_space, static_cast(capacity)); + + const size_t fill = static_cast(capacity) - free_space; + std::vector bytes(fill, 0x7f); + size_t written = 0; + while (written < fill) { + const ssize_t n = write(fd, bytes.data() + written, fill - written); + ASSERT_GT(n, 0) << strerror(errno); + written += static_cast(n); + } +} + +bool IsDragonOS() { + struct utsname uts {}; + if (uname(&uts) != 0) { + return false; + } + return strstr(uts.release, "dragonos") != nullptr || + strstr(uts.nodename, "dragonos") != nullptr; +} + +std::string ReadSmallFile(const char* path) { + const int fd = open(path, O_RDONLY); + if (fd < 0) { + return ""; + } + + char buf[256] = {}; + const ssize_t n = read(fd, buf, sizeof(buf)); + close(fd); + if (n <= 0 || n >= static_cast(sizeof(buf))) { + return ""; + } + return std::string(buf, static_cast(n)); +} + +} // namespace + +TEST(SpliceConcurrentIo, FileToPipeNotStarvedByMadvise) { + struct sigaction sa {}; + sa.sa_handler = AlarmHandler; + ASSERT_EQ(0, sigaction(SIGALRM, &sa, nullptr)) << strerror(errno); + alarm(20); + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + + void* mapping = mmap(nullptr, 4096, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + ASSERT_NE(MAP_FAILED, mapping) << strerror(errno); + + TestState state; + state.file_fd = file.fd(); + state.pipe_read_fd = pipe_fds[0]; + state.mapping = mapping; + + pthread_t file_reader {}; + pthread_t pipe_reader {}; + pthread_t madvise_thread {}; + ASSERT_EQ(0, pthread_create(&file_reader, nullptr, FileReaderThread, &state)) + << strerror(errno); + ASSERT_EQ(0, pthread_create(&pipe_reader, nullptr, PipeReaderThread, &state)) + << strerror(errno); + ASSERT_EQ(0, pthread_create(&madvise_thread, nullptr, MadviseThread, &state)) + << strerror(errno); + + for (int i = 0; i < kIterations; ++i) { + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + ASSERT_EQ(1, splice(file.fd(), nullptr, pipe_fds[1], nullptr, 1, 0)) + << strerror(errno); + } + + state.done.store(true, std::memory_order_release); + close(pipe_fds[1]); + + ASSERT_EQ(0, pthread_join(file_reader, nullptr)) << strerror(errno); + ASSERT_EQ(0, pthread_join(pipe_reader, nullptr)) << strerror(errno); + ASSERT_EQ(0, pthread_join(madvise_thread, nullptr)) << strerror(errno); + + close(pipe_fds[0]); + munmap(mapping, 4096); + alarm(0); + + EXPECT_EQ(0, state.failures.load(std::memory_order_relaxed)); +} + +TEST(SpliceConcurrentIo, FileToPipeShortReadNeedsOnlyAnyPipeSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS byte-ring pipe regression test"; + } + + struct sigaction sa {}; + sa.sa_handler = AlarmHandler; + ASSERT_EQ(0, sigaction(SIGALRM, &sa, nullptr)) << strerror(errno); + alarm(5); + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, 1) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, 1); + + ASSERT_EQ(1, splice(file.fd(), nullptr, pipe_fds[1], nullptr, 4096, 0)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); + alarm(0); +} + +TEST(SpliceConcurrentIo, NonblockFileToPipeShortReadUsesAvailableSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS byte-ring pipe regression test"; + } + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + ASSERT_EQ(1, write(file.fd(), &kByte, 1)) << strerror(errno); + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, 1) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, 1); + + ASSERT_EQ(1, + splice(file.fd(), nullptr, pipe_fds[1], nullptr, 4096, + SPLICE_F_NONBLOCK)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); +} + +TEST(SpliceConcurrentIo, NonblockFileToPipePipeBufNeedsCompleteSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS byte-ring pipe regression test"; + } + + TempFile file; + ASSERT_TRUE(file.valid()) << strerror(errno); + std::vector bytes(4096, kByte); + ASSERT_EQ(static_cast(bytes.size()), + write(file.fd(), bytes.data(), bytes.size())) + << strerror(errno); + ASSERT_EQ(0, lseek(file.fd(), 0, SEEK_SET)) << strerror(errno); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, 1) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, 1); + + errno = 0; + EXPECT_EQ(-1, + splice(file.fd(), nullptr, pipe_fds[1], nullptr, bytes.size(), + SPLICE_F_NONBLOCK)); + EXPECT_EQ(EAGAIN, errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); +} + +TEST(SpliceConcurrentIo, ProcfsZeroSizeRegularFileCanSpliceData) { + const int file_fd = open("/proc/cpuinfo", O_RDONLY); + ASSERT_GE(file_fd, 0) << strerror(errno); + + struct stat st {}; + ASSERT_EQ(0, fstat(file_fd, &st)) << strerror(errno); + ASSERT_TRUE(S_ISREG(st.st_mode)); + ASSERT_EQ(0, st.st_size); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + + const ssize_t spliced = splice(file_fd, nullptr, pipe_fds[1], nullptr, 4096, 0); + ASSERT_GT(spliced, 0) << strerror(errno); + + std::vector bytes(static_cast(spliced)); + ASSERT_EQ(spliced, read(pipe_fds[0], bytes.data(), bytes.size())) << strerror(errno); + const std::string text(bytes.begin(), bytes.end()); + EXPECT_NE(std::string::npos, text.find("processor")); + + close(pipe_fds[0]); + close(pipe_fds[1]); + close(file_fd); +} + +TEST(SpliceConcurrentIo, ProcfsShortReadNeedsOnlyActualPipeSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS procfs splice regression test"; + } + + struct sigaction sa {}; + sa.sa_handler = AlarmHandler; + ASSERT_EQ(0, sigaction(SIGALRM, &sa, nullptr)) << strerror(errno); + alarm(5); + + constexpr const char* kPath = "/proc/loadavg"; + const std::string expected = ReadSmallFile(kPath); + ASSERT_GT(expected.size(), 0U); + ASSERT_LT(expected.size(), 4096U); + + const int file_fd = open(kPath, O_RDONLY); + ASSERT_GE(file_fd, 0) << strerror(errno); + + struct stat st {}; + ASSERT_EQ(0, fstat(file_fd, &st)) << strerror(errno); + ASSERT_TRUE(S_ISREG(st.st_mode)); + ASSERT_EQ(0, st.st_size); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, static_cast(expected.size())) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, expected.size()); + + ASSERT_EQ(static_cast(expected.size()), + splice(file_fd, nullptr, pipe_fds[1], nullptr, 4096, 0)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); + close(file_fd); + alarm(0); +} + +TEST(SpliceConcurrentIo, NonblockProcfsShortReadUsesAvailablePipeSpace) { + if (!IsDragonOS()) { + GTEST_SKIP() << "DragonOS procfs splice regression test"; + } + + constexpr const char* kPath = "/proc/loadavg"; + const std::string expected = ReadSmallFile(kPath); + ASSERT_GT(expected.size(), 0U); + ASSERT_LT(expected.size(), 4096U); + + const int file_fd = open(kPath, O_RDONLY); + ASSERT_GE(file_fd, 0) << strerror(errno); + + struct stat st {}; + ASSERT_EQ(0, fstat(file_fd, &st)) << strerror(errno); + ASSERT_TRUE(S_ISREG(st.st_mode)); + ASSERT_EQ(0, st.st_size); + + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(0, pipe(pipe_fds)) << strerror(errno); + const int capacity = fcntl(pipe_fds[1], F_GETPIPE_SZ); + ASSERT_GT(capacity, static_cast(expected.size())) << strerror(errno); + FillPipeLeavingSpace(pipe_fds[1], capacity, expected.size()); + + ASSERT_EQ(static_cast(expected.size()), + splice(file_fd, nullptr, pipe_fds[1], nullptr, 4096, + SPLICE_F_NONBLOCK)) + << strerror(errno); + + close(pipe_fds[0]); + close(pipe_fds[1]); + close(file_fd); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/user/apps/tests/dunitest/whitelist.txt b/user/apps/tests/dunitest/whitelist.txt index 8901766b3..a8ef260c1 100644 --- a/user/apps/tests/dunitest/whitelist.txt +++ b/user/apps/tests/dunitest/whitelist.txt @@ -13,6 +13,7 @@ normal/proc_self_limits normal/mlock_semantics normal/sched_affinity normal/sync_file_range +normal/splice_concurrent_io normal/rcu_selftest normal/test_tlb_shootdown normal/tcp_close_semantics