Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 56 additions & 22 deletions kernel/src/filesystem/vfs/syscall/sys_splice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::filesystem::vfs::file::FileMode;
use crate::filesystem::vfs::FileFlags;
use crate::filesystem::vfs::{file::File, syscall::SpliceFlags, FileType};
use crate::ipc::kill::send_signal_to_pid;
use crate::ipc::pipe::LockedPipeInode;
use crate::ipc::pipe::{LockedPipeInode, PIPE_BUF};
use crate::process::resource::RLimitID;
use crate::process::ProcessManager;
use crate::syscall::table::Syscall;
Expand Down Expand Up @@ -254,7 +254,37 @@ fn splice_file_to_pipe(
) -> Result<usize, SystemError> {
let pipe_inode = get_pipe_inode(pipe)?;

let buf_size = len.min(4096);
let limit = len.min(4096);
let trusted_read_limit = splice_trusted_file_read_limit(file, offset, limit);
if trusted_read_limit == Some(0) {
return Ok(0);
}
let wanted = trusted_read_limit.unwrap_or(limit);

let space = if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) {
let space = pipe_inode.writable_len();
if space == 0 && pipe_inode.has_readers() {
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
if trusted_read_limit.is_some()
&& wanted <= PIPE_BUF
&& space < wanted
&& pipe_inode.has_readers()
{
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
space
} else if trusted_read_limit.is_some() {
pipe_inode.wait_writable_for_splice(wanted)?
} else {
pipe_inode.wait_writable_any_for_splice()?
};

let buf_size = if trusted_read_limit.is_some() && space == 0 {
wanted
} else {
wanted.min(space)
};
let mut buffer = vec![0u8; buf_size];

// 从文件读取
Expand All @@ -272,26 +302,6 @@ fn splice_file_to_pipe(

buffer.truncate(read_len);

// Linux-like nonblocking semantics for file->pipe splice:
// - SPLICE_F_NONBLOCK makes the splice nonblocking regardless of the pipe fd's O_NONBLOCK.
// - When "atomic" (<= PIPE_BUF), lack of space yields EAGAIN (no partial write).
// - When non-atomic, write as much as fits and return partial.
if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) {
let space = pipe_inode.writable_len();
if space == 0 && pipe_inode.has_readers() {
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
if buffer.len() <= crate::ipc::pipe::PIPE_BUF && space < buffer.len() {
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
let to_write = if space == 0 {
buffer.len()
} else {
buffer.len().min(space)
};
buffer.truncate(to_write);
}

// 写入 pipe
let written = if flags.contains(SpliceFlags::SPLICE_F_NONBLOCK) {
pipe_inode.write_from_splice_nonblock(&buffer)
Expand All @@ -310,6 +320,30 @@ fn splice_file_to_pipe(
}
}

fn splice_trusted_file_read_limit(
file: &File,
offset: Option<usize>,
limit: usize,
) -> Option<usize> {
if limit == 0 {
return Some(0);
}

if matches!(file.file_type(), FileType::File) && splice_regular_file_has_trusted_size(file) {
if let Ok(metadata) = file.metadata() {
let size = metadata.size.max(0) as usize;
let pos = offset.unwrap_or_else(|| file.pos());
return Some(limit.min(size.saturating_sub(pos)));
}
}

None
}

fn splice_regular_file_has_trusted_size(file: &File) -> bool {
matches!(file.inode().fs().name(), "ext4" | "fat" | "tmpfs" | "ramfs")
}

/// pipe 到 file 的数据传输
fn splice_pipe_to_file(
pipe: &File,
Expand Down
69 changes: 69 additions & 0 deletions kernel/src/ipc/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,75 @@ impl LockedPipeInode {
Ok(to_write)
}

/// Wait for pipe space before file->pipe splice reads from the input file.
///
/// The caller must pass the maximum number of bytes it can actually read
/// from the input file for this splice attempt. DragonOS pipes are byte-ring
/// based, so requests up to PIPE_BUF wait for the complete readable chunk;
/// larger requests wait for any space and may complete partially.
pub fn wait_writable_for_splice(&self, len: usize) -> Result<usize, SystemError> {
if len == 0 {
return Ok(0);
}

let need_atomic = len <= PIPE_BUF;
loop {
let guard = self.inner.lock();
if guard.reader == 0 {
drop(guard);
let _ = send_kernel_signal_to_current(Signal::SIGPIPE);
return Err(SystemError::EPIPE);
}

let used = guard.valid_cnt.max(0) as usize;
let space = guard.buf_size.saturating_sub(used);
if (need_atomic && space >= len) || (!need_atomic && space > 0) {
return Ok(if need_atomic { len } else { len.min(space) });
}

drop(guard);
let wait_result = if need_atomic {
wq_wait_event_interruptible!(
self.write_wait_queue,
self.writeable_len_at_least(len),
{}
)
} else {
wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {})
};
if wait_result.is_err() {
return Err(SystemError::ERESTARTSYS);
}
}
}

/// Wait until the pipe has any writable byte for file->pipe splice.
///
/// This matches Linux `wait_for_space()` for inputs whose exact readable
/// length is not known before calling into the file. The caller can then
/// cap the read by the returned byte space.
pub fn wait_writable_any_for_splice(&self) -> Result<usize, SystemError> {
loop {
let guard = self.inner.lock();
if guard.reader == 0 {
drop(guard);
let _ = send_kernel_signal_to_current(Signal::SIGPIPE);
return Err(SystemError::EPIPE);
}

let used = guard.valid_cnt.max(0) as usize;
let space = guard.buf_size.saturating_sub(used);
if space > 0 {
return Ok(space);
}

drop(guard);
if wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {}).is_err() {
return Err(SystemError::ERESTARTSYS);
}
}
}

/// 从管道中“窥视”最多 `len` 字节数据到 `buf`,但不消耗管道数据。
///
/// 返回实际拷贝的字节数(可能小于 `len`)。不会睡眠。
Expand Down
131 changes: 125 additions & 6 deletions kernel/src/libs/rwsem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use core::{
use alloc::rc::Rc;
use system_error::SystemError;

use super::wait_queue::WaitQueue;
use crate::process::ProcessManager;

use super::wait_queue::{WaitQueue, Waiter};

/// A mutex that provides data access to either one writer or many readers.
///
Expand Down Expand Up @@ -47,6 +49,7 @@ use super::wait_queue::WaitQueue;
#[derive(Debug)]
pub struct RwSem<T: ?Sized> {
lock: AtomicUsize,
waiters: AtomicUsize,
queue: WaitQueue,
val: UnsafeCell<T>,
}
Expand Down Expand Up @@ -95,6 +98,7 @@ impl<T> RwSem<T> {
Self {
val: UnsafeCell::new(val),
lock: AtomicUsize::new(0),
waiters: AtomicUsize::new(0),
queue: WaitQueue::default(),
}
}
Expand All @@ -107,7 +111,11 @@ impl<T: ?Sized> RwSem<T> {
/// upreaders present.
#[track_caller]
pub fn read(&self) -> RwSemReadGuard<'_, T> {
self.queue.wait_until(|| self.try_read())
if let Some(guard) = self.try_read() {
return guard;
}

self.wait_read(false).unwrap()
}

/// Acquires a write mutex and sleep until it can be acquired.
Expand All @@ -116,25 +124,45 @@ impl<T: ?Sized> RwSem<T> {
/// or readers present.
#[track_caller]
pub fn write(&self) -> RwSemWriteGuard<'_, T> {
self.queue.wait_until(|| self.try_write())
if self.waiters.load(Acquire) == 0 || ProcessManager::current_pcb().preempt_count() != 0 {
if let Some(guard) = self.try_write() {
return guard;
}
}

self.wait_write(false).unwrap()
}

/// Acquires a upread mutex and sleep until it can be acquired.
///
/// The calling thread will sleep until there are no writers or upreaders present.
#[track_caller]
pub fn upread(&self) -> RwSemUpgradeableGuard<'_, T> {
self.queue.wait_until(|| self.try_upread())
if let Some(guard) = self.try_upread() {
return guard;
}

self.wait_upread(false).unwrap()
}

/// Blocking read acquire (interruptible).
pub fn read_interruptible(&self) -> Result<RwSemReadGuard<'_, T>, SystemError> {
self.queue.wait_until_interruptible(|| self.try_read())
if let Some(guard) = self.try_read() {
return Ok(guard);
}

self.wait_read(true)
}

/// Blocking write acquire (interruptible).
pub fn write_interruptible(&self) -> Result<RwSemWriteGuard<'_, T>, SystemError> {
self.queue.wait_until_interruptible(|| self.try_write())
if self.waiters.load(Acquire) == 0 || ProcessManager::current_pcb().preempt_count() != 0 {
if let Some(guard) = self.try_write() {
return Ok(guard);
}
}

self.wait_write(true)
}

/// Attempts to acquire a read lock.
Expand Down Expand Up @@ -194,6 +222,97 @@ impl<T: ?Sized> RwSem<T> {
pub fn get_mut(&mut self) -> &mut T {
self.val.get_mut()
}

fn wait_read(&self, interruptible: bool) -> Result<RwSemReadGuard<'_, T>, SystemError> {
self.waiters.fetch_add(1, AcqRel);
let (waiter, waker) = Waiter::new_pair();

loop {
if let Err(e) = self.queue.register_waker(waker.clone()) {
self.waiters.fetch_sub(1, Release);
return Err(e);
}

if let Some(guard) = self.try_read() {
self.queue.remove_waker(&waker);
self.waiters.fetch_sub(1, Release);
return Ok(guard);
}

if let Err(e) = waiter.wait(interruptible) {
self.queue.remove_waker(&waker);
self.waiters.fetch_sub(1, Release);
return Err(e);
}
}
}

fn wait_write(&self, interruptible: bool) -> Result<RwSemWriteGuard<'_, T>, SystemError> {
let had_waiters = self.waiters.fetch_add(1, AcqRel) != 0;
let mut must_sleep_once = had_waiters;
let (waiter, waker) = Waiter::new_pair();

loop {
if let Err(e) = self.queue.register_waker(waker.clone()) {
self.waiters.fetch_sub(1, Release);
return Err(e);
}

if must_sleep_once && self.waiters.load(Acquire) == 1 {
must_sleep_once = false;
}

if !must_sleep_once {
if let Some(guard) = self.try_write() {
self.queue.remove_waker(&waker);
self.waiters.fetch_sub(1, Release);
return Ok(guard);
}
}
must_sleep_once = false;

if let Err(e) = waiter.wait(interruptible) {
self.queue.remove_waker(&waker);
self.waiters.fetch_sub(1, Release);
return Err(e);
}
}
}

fn wait_upread(
&self,
interruptible: bool,
) -> Result<RwSemUpgradeableGuard<'_, T>, SystemError> {
let had_waiters = self.waiters.fetch_add(1, AcqRel) != 0;
let mut must_sleep_once = had_waiters;
let (waiter, waker) = Waiter::new_pair();

loop {
if let Err(e) = self.queue.register_waker(waker.clone()) {
self.waiters.fetch_sub(1, Release);
return Err(e);
}

if must_sleep_once && self.waiters.load(Acquire) == 1 {
must_sleep_once = false;
}

if !must_sleep_once {
if let Some(guard) = self.try_upread() {
self.queue.remove_waker(&waker);
self.waiters.fetch_sub(1, Release);
return Ok(guard);
}
}
must_sleep_once = false;

if let Err(e) = waiter.wait(interruptible) {
self.queue.remove_waker(&waker);
self.waiters.fetch_sub(1, Release);
return Err(e);
}
}
}
}

impl<T: ?Sized> Deref for RwSemReadGuard<'_, T> {
Expand Down
Loading
Loading