Skip to content

Commit

Permalink
[ENG-1513] Better integration between Jobs and processing Actors (spa…
Browse files Browse the repository at this point in the history
…cedriveapp#1974)

* First draft on new task system

* Removing save to disk from task system

* Bunch of concurrency issues

* Solving Future impl issue when pausing tasks

* Fix cancel and abort

* Bunch of fixes on pause, suspend, resume, cancel and abort
Also better error handling on task completion for the user

* New capabilities to return an output on a task

* Introducing a simple way to linear backoff on failed steal

* Sample actor where tasks can dispatch more tasks

* Rustfmt

* Steal test to make sure

* Stale deps cleanup

* Removing unused utils

* Initial lib docs

* Docs ok

* Memory cleanup on idle

---------

Co-authored-by: Vítor Vasconcellos <vasconcellos.dev@gmail.com>
  • Loading branch information
fogodev and HeavenVolkoff committed Feb 26, 2024
1 parent 53713a9 commit dba85eb
Show file tree
Hide file tree
Showing 22 changed files with 4,064 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ To run the landing page:

If you encounter any issues, ensure that you are using the following versions of Rust, Node and Pnpm:

- Rust version: **1.73.0**
- Rust version: **1.75.0**
- Node version: **18.17**
- Pnpm version: **8.0.0**

Expand Down
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ swift-rs = { version = "1.0.6" }
# Third party dependencies used by one or more of our crates
anyhow = "1.0.75"
async-channel = "2.0.0"
async-trait = "0.1.77"
axum = "0.6.20"
base64 = "0.21.5"
blake3 = "1.5.0"
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "sd-core"
version = "0.2.4"
description = "Virtual distributed filesystem engine that powers Spacedrive."
authors = ["Spacedrive Technology Inc."]
rust-version = "1.73.0"
rust-version = "1.75.0"
license = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }
Expand Down Expand Up @@ -51,6 +51,7 @@ sd-cloud-api = { version = "0.1.0", path = "../crates/cloud-api" }

# Workspace dependencies
async-channel = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
base64 = { workspace = true }
blake3 = { workspace = true }
Expand Down Expand Up @@ -100,7 +101,6 @@ webp = { workspace = true }
# Specific Core dependencies
async-recursion = "1.0.5"
async-stream = "0.3.5"
async-trait = "^0.1.74"
bytes = "1.5.0"
ctor = "0.2.5"
directories = "5.0.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/ai/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
authors = ["Ericson Soares <ericson@spacedrive.com>"]
readme = "README.md"
description = "A simple library to generate video thumbnails using ffmpeg with the webp format"
rust-version = "1.73.0"
rust-version = "1.75.0"
license = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/file-path-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "sd-file-path-helper"
version = "0.1.0"
authors = ["Ericson Soares <ericson@spacedrive.com>"]
readme = "README.md"
rust-version = "1.73.0"
rust-version = "1.75.0"
license = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }
Expand Down
42 changes: 42 additions & 0 deletions crates/task-system/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[package]
name = "sd-task-system"
version = "0.1.0"
authors = ["Ericson \"Fogo\" Soares <ericson.ds999@gmail.com>"]
rust-version = "1.75.0"
license.workspace = true
edition.workspace = true
repository.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# Workspace deps
async-channel = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
futures-concurrency = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"parking_lot",
"rt-multi-thread",
"time",
] }
tokio-stream = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

# External deps
downcast-rs = "1.2.0"
pin-project = "1.1.4"

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "test-util", "fs"] }
tempfile = { workspace = true }
rand = "0.8.5"
tracing-test = { version = "^0.2.4", features = ["no-env-filter"] }
thiserror = { workspace = true }
lending-stream = "1.0.0"
serde = { workspace = true, features = ["derive"] }
rmp-serde = { workspace = true }
uuid = { workspace = true, features = ["serde"] }
28 changes: 28 additions & 0 deletions crates/task-system/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::{error::Error, fmt};

use super::task::TaskId;

/// Task system's error type definition, representing when internal errors occurs.
#[derive(Debug, thiserror::Error)]
pub enum SystemError {
#[error("task not found <task_id='{0}'>")]
TaskNotFound(TaskId),
#[error("task aborted <task_id='{0}'>")]
TaskAborted(TaskId),
#[error("task join error <task_id='{0}'>")]
TaskJoin(TaskId),
#[error("forced abortion for task <task_id='{0}'> timed out")]
TaskForcedAbortTimeout(TaskId),
}

/// Trait for errors that can be returned by tasks, we use this trait as a bound for the task system generic
/// error type.
///
///With this trait, we can have a unified error type through all the tasks in the system.
pub trait RunError: Error + fmt::Debug + Send + Sync + 'static {}

/// We provide a blanket implementation for all types that also implements
/// [`std::error::Error`](https://doc.rust-lang.org/std/error/trait.Error.html) and
/// [`std::fmt::Debug`](https://doc.rust-lang.org/std/fmt/trait.Debug.html).
/// So you will not need to implement this trait for your error type, just implement the `Error` and `Debug`
impl<T: Error + fmt::Debug + Send + Sync + 'static> RunError for T {}
71 changes: 71 additions & 0 deletions crates/task-system/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//!
//! # Task System
//!
//! Spacedrive's Task System is a library that provides a way to manage and execute tasks in a concurrent
//! and parallel environment.
//!
//! Just bring your own unified error type and dispatch some tasks, the system will handle enqueueing,
//! parallel execution, and error handling for you. Aside from some niceties like:
//! - Round robin scheduling between workers following the available CPU cores on the user machine;
//! - Work stealing between workers for better load balancing;
//! - Gracefully pause and cancel tasks;
//! - Forced abortion of tasks;
//! - Prioritizing tasks that will suspend running tasks without priority;
//! - When the system is shutdown, it will return all pending and running tasks to theirs dispatchers, so the user can store them on disk or any other storage to be re-dispatched later;
//!
//!
//! ## Basic example
//!
//! ```
//! use sd_task_system::{TaskSystem, Task, TaskId, ExecStatus, TaskOutput, Interrupter, TaskStatus};
//! use async_trait::async_trait;
//! use thiserror::Error;
//!
//! #[derive(Debug, Error)]
//! pub enum SampleError {
//! #[error("Sample error")]
//! SampleError,
//! }
//!
//! #[derive(Debug)]
//! pub struct ReadyTask {
//! id: TaskId,
//! }
//!
//! #[async_trait]
//! impl Task<SampleError> for ReadyTask {
//! fn id(&self) -> TaskId {
//! self.id
//! }
//!
//! async fn run(&mut self, _interrupter: &Interrupter) -> Result<ExecStatus, SampleError> {
//! Ok(ExecStatus::Done(TaskOutput::Empty))
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let system = TaskSystem::new();
//!
//! let handle = system.dispatch(ReadyTask { id: TaskId::new_v4() }).await;
//!
//! assert!(matches!(
//! handle.await,
//! Ok(TaskStatus::Done(TaskOutput::Empty))
//! ));
//!
//! system.shutdown().await;
//! }
//! ```
mod error;
mod message;
mod system;
mod task;
mod worker;

pub use error::{RunError, SystemError as TaskSystemError};
pub use system::{Dispatcher as TaskDispatcher, System as TaskSystem};
pub use task::{
AnyTaskOutput, ExecStatus, Interrupter, InterrupterFuture, InterruptionKind, IntoAnyTaskOutput,
IntoTask, Task, TaskHandle, TaskId, TaskOutput, TaskStatus,
};
63 changes: 63 additions & 0 deletions crates/task-system/src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use tokio::sync::oneshot;

use super::{
error::{RunError, SystemError},
task::{TaskId, TaskWorkState},
worker::WorkerId,
};

#[derive(Debug)]
pub(crate) enum SystemMessage {
IdleReport(WorkerId),
WorkingReport(WorkerId),
ResumeTask {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
PauseNotRunningTask {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
CancelNotRunningTask {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
ForceAbortion {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
NotifyIdleWorkers {
start_from: WorkerId,
task_count: usize,
},
ShutdownRequest(oneshot::Sender<Result<(), SystemError>>),
}

#[derive(Debug)]
pub(crate) enum WorkerMessage<E: RunError> {
NewTask(TaskWorkState<E>),
TaskCountRequest(oneshot::Sender<usize>),
ResumeTask {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
PauseNotRunningTask {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
CancelNotRunningTask {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
ForceAbortion {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
ShutdownRequest(oneshot::Sender<()>),
StealRequest(oneshot::Sender<Option<TaskWorkState<E>>>),
WakeUp,
}
Loading

0 comments on commit dba85eb

Please sign in to comment.