Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/ci_format.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Check Code Format

on:
push:
branches:
- master
pull_request:
branches:
- master

env:
RUST_BACKTRACE: 1

jobs:
check:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- name: Setup Rust Toolchain
run: |
rustup default nightly
rustup component add rustfmt
- name: Check Format
run: cargo fmt --all -- --check
16 changes: 9 additions & 7 deletions compio-fs/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ use crate::{Metadata, OpenOptions, Permissions};
/// it was opened with. The `File` type provides **positional** read and write
/// operations. The file does not maintain an internal cursor. The caller is
/// required to specify an offset when issuing an operation.
///
///
/// If you'd like to use methods from [`AsyncRead`](`compio_io::AsyncRead`) or [`AsyncWrite`](`compio_io::AsyncWrite`) traits,
/// you can wrap `File` with [`std::io::Cursor`].
///
///
/// If you'd like to use methods from [`AsyncRead`](`compio_io::AsyncRead`) or
/// [`AsyncWrite`](`compio_io::AsyncWrite`) traits, you can wrap `File` with
/// [`std::io::Cursor`].
///
/// # Examples
/// ```ignore
/// use compio::fs::File;
Expand All @@ -35,13 +37,13 @@ use crate::{Metadata, OpenOptions, Permissions};
///
/// let file = File::open("foo.txt").await?;
/// let cursor = Cursor::new(file);
///
///
/// let int = cursor.read_u32().await?;
/// let float = cursor.read_f32().await?;
///
///
/// let mut string = String::new();
/// let BufResult(result, string) = cursor.read_to_string(string).await;
///
///
/// let mut buf = vec![0; 1024];
/// let BufResult(result, buf) = cursor.read_exact(buf).await;
/// ```
Expand Down
35 changes: 21 additions & 14 deletions compio-runtime/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::runtime::scheduler::{
drop_hook::DropHook, local_queue::LocalQueue, send_wrapper::SendWrapper,
};
use std::{cell::RefCell, future::Future, marker::PhantomData, rc::Rc, sync::Arc, task::Waker};

use async_task::{Runnable, Task};
use compio_driver::NotifyHandle;
use crossbeam_queue::SegQueue;
use slab::Slab;
use std::{cell::RefCell, future::Future, marker::PhantomData, rc::Rc, sync::Arc, task::Waker};

use crate::runtime::scheduler::{
drop_hook::DropHook, local_queue::LocalQueue, send_wrapper::SendWrapper,
};

mod drop_hook;
mod local_queue;
Expand All @@ -28,8 +30,8 @@ impl TaskQueue {

/// Pushes a `Runnable` task to the appropriate queue.
///
/// If the current thread is the same as the creator thread, push to the local queue.
/// Otherwise, push to the sync queue.
/// If the current thread is the same as the creator thread, push to the
/// local queue. Otherwise, push to the sync queue.
fn push(&self, runnable: Runnable, notify: &NotifyHandle) {
if let Some(local_queue) = self.local_queue.get() {
local_queue.push(runnable);
Expand All @@ -41,7 +43,8 @@ impl TaskQueue {
}
}

/// Pops at most one task from each queue and returns them as `(local_task, sync_task)`.
/// Pops at most one task from each queue and returns them as `(local_task,
/// sync_task)`.
///
/// # Safety
///
Expand All @@ -52,7 +55,8 @@ impl TaskQueue {

let local_task = local_queue.pop();

// Perform an empty check as a fast path, since `SegQueue::pop()` is more expensive.
// Perform an empty check as a fast path, since `SegQueue::pop()` is more
// expensive.
let sync_task = if self.sync_queue.is_empty() {
None
} else {
Expand Down Expand Up @@ -145,13 +149,15 @@ impl Scheduler {
};

let schedule = {
// The schedule closure is managed by the `Waker` and may be dropped on another thread,
// so use `Weak` to ensure the `TaskQueue` is always dropped on the creator thread.
// The schedule closure is managed by the `Waker` and may be dropped on another
// thread, so use `Weak` to ensure the `TaskQueue` is always dropped
// on the creator thread.
let task_queue = Arc::downgrade(&self.task_queue);

move |runnable| {
// The `upgrade()` never fails because all tasks are dropped when the `Scheduler` is dropped,
// if a `Waker` is used after that, the schedule closure will never be called.
// The `upgrade()` never fails because all tasks are dropped when the
// `Scheduler` is dropped, if a `Waker` is used after that, the
// schedule closure will never be called.
task_queue.upgrade().unwrap().push(runnable, &notify);
}
};
Expand Down Expand Up @@ -216,8 +222,9 @@ impl Scheduler {
// Then drop all scheduled tasks, which will drop all futures.
//
// SAFETY:
// Since spawned tasks are not required to be `Send`, they must always be dropped
// on the same thread. Because `Scheduler` is `!Send` and `!Sync`, this is safe.
// Since spawned tasks are not required to be `Send`, they must always be
// dropped on the same thread. Because `Scheduler` is `!Send` and
// `!Sync`, this is safe.
//
// This method is only called on `TaskQueue`'s creator thread
// because `Scheduler` is `!Send` and `!Sync`.
Expand Down
3 changes: 2 additions & 1 deletion compio-runtime/tests/drop.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use futures_util::task::AtomicWaker;
use std::{
future::Future,
pin::Pin,
Expand All @@ -7,6 +6,8 @@ use std::{
thread::{self, ThreadId},
};

use futures_util::task::AtomicWaker;

struct DropWatcher {
waker: Arc<AtomicWaker>,
thread_id: ThreadId,
Expand Down
Loading