Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Read more about the [motivation and use cases](docs/why-taskmill.md).
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use taskmill::{
Scheduler, Priority, IoBudget, TaskSubmission, TaskExecutor,
Module, Scheduler, IoBudget, TaskSubmission, TaskExecutor,
TaskContext, TaskError,
};

Expand All @@ -33,17 +33,19 @@ impl TaskExecutor for ThumbnailGenerator {
async fn main() {
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.executor("thumbnail", Arc::new(ThumbnailGenerator))
.module(Module::new("media")
.executor("thumbnail", Arc::new(ThumbnailGenerator)))
.max_concurrency(8)
.with_resource_monitoring()
.build()
.await
.unwrap();

let media = scheduler.module("media");
let sub = TaskSubmission::new("thumbnail")
.payload_json(&serde_json::json!({"path": "/photos/img.jpg"}))
.expected_io(IoBudget::disk(4096, 1024));
scheduler.submit(&sub).await.unwrap();
media.submit(sub).await.unwrap();

let token = CancellationToken::new();
scheduler.run(token).await;
Expand Down
32 changes: 16 additions & 16 deletions benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
Priority, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
Module, Priority, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
TaskSubmission,
};
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl TaskExecutor for ByteProgressExecutor {
async fn build_scheduler(max_concurrency: usize) -> Scheduler {
Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.executor("test", Arc::new(NoopExecutor))
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.max_concurrency(max_concurrency)
.poll_interval(std::time::Duration::from_millis(10))
.build()
Expand All @@ -64,7 +64,7 @@ fn bench_submit(c: &mut Criterion) {
let sched = build_scheduler(4).await;
for i in 0..1000 {
sched
.submit(&TaskSubmission::new("test").key(format!("s-{i}")))
.submit(&TaskSubmission::new("bench::test").key(format!("s-{i}")))
.await
.unwrap();
}
Expand All @@ -80,13 +80,13 @@ fn bench_submit_dedup_hit(c: &mut Criterion) {
let sched = build_scheduler(4).await;
// First submit creates the task.
sched
.submit(&TaskSubmission::new("test").key("same-key"))
.submit(&TaskSubmission::new("bench::test").key("same-key"))
.await
.unwrap();
// Subsequent submits hit the dedup path.
for _ in 0..999 {
sched
.submit(&TaskSubmission::new("test").key("same-key"))
.submit(&TaskSubmission::new("bench::test").key("same-key"))
.await
.unwrap();
}
Expand All @@ -103,7 +103,7 @@ fn bench_dispatch_and_complete(c: &mut Criterion) {

for i in 0..1000 {
sched
.submit(&TaskSubmission::new("test").key(format!("d-{i}")))
.submit(&TaskSubmission::new("bench::test").key(format!("d-{i}")))
.await
.unwrap();
}
Expand Down Expand Up @@ -168,7 +168,7 @@ fn bench_concurrency_scaling(c: &mut Criterion) {

for i in 0..500 {
sched
.submit(&TaskSubmission::new("test").key(format!("cs-{i}")))
.submit(&TaskSubmission::new("bench::test").key(format!("cs-{i}")))
.await
.unwrap();
}
Expand Down Expand Up @@ -205,7 +205,7 @@ fn bench_batch_submit(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = build_scheduler(4).await;
let submissions: Vec<_> = (0..1000)
.map(|i| TaskSubmission::new("test").key(format!("b-{i}")))
.map(|i| TaskSubmission::new("bench::test").key(format!("b-{i}")))
.collect();
sched.submit_batch(&submissions).await.unwrap();
});
Expand All @@ -231,7 +231,7 @@ fn bench_mixed_priority_dispatch(c: &mut Criterion) {
let priority = priorities[i % priorities.len()];
sched
.submit(
&TaskSubmission::new("test")
&TaskSubmission::new("bench::test")
.key(format!("mp-{i}"))
.priority(priority),
)
Expand Down Expand Up @@ -271,7 +271,7 @@ fn bench_byte_progress_overhead(c: &mut Criterion) {

for i in 0..500 {
sched
.submit(&TaskSubmission::new("test").key(format!("bp-noop-{i}")))
.submit(&TaskSubmission::new("bench::test").key(format!("bp-noop-{i}")))
.await
.unwrap();
}
Expand Down Expand Up @@ -301,13 +301,13 @@ fn bench_byte_progress_overhead(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.executor(
.module(Module::new("bench").executor(
"byte-test",
Arc::new(ByteProgressExecutor {
total: 1_048_576,
chunk_size: 1024,
}),
)
))
.max_concurrency(8)
.poll_interval(std::time::Duration::from_millis(10))
.progress_interval(std::time::Duration::from_millis(100))
Expand All @@ -317,7 +317,7 @@ fn bench_byte_progress_overhead(c: &mut Criterion) {

for i in 0..500 {
sched
.submit(&TaskSubmission::new("byte-test").key(format!("bp-{i}")))
.submit(&TaskSubmission::new("bench::byte-test").key(format!("bp-{i}")))
.await
.unwrap();
}
Expand Down Expand Up @@ -352,13 +352,13 @@ fn bench_byte_progress_snapshot(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.executor(
.module(Module::new("bench").executor(
"byte-test",
Arc::new(ByteProgressExecutor {
total: 10_485_760,
chunk_size: 65_536,
}),
)
))
.max_concurrency(100)
.poll_interval(std::time::Duration::from_millis(10))
.build()
Expand All @@ -368,7 +368,7 @@ fn bench_byte_progress_snapshot(c: &mut Criterion) {
// Submit and dispatch 100 tasks.
for i in 0..100 {
sched
.submit(&TaskSubmission::new("byte-test").key(format!("snap-{i}")))
.submit(&TaskSubmission::new("bench::byte-test").key(format!("snap-{i}")))
.await
.unwrap();
}
Expand Down
Loading
Loading