Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages priority & inbox unification #85

Merged
merged 72 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
9b018fd
Super early idea for message priorities
Restioson Jun 13, 2022
80c9fdc
Optimistic heap implementation
Restioson Jun 14, 2022
bac2f0e
Remove broadcast queues that have no receivers
Restioson Jun 15, 2022
6ae1241
Remove custom binary heap and add default queue
Restioson Jun 15, 2022
4b2227c
do some work on bounding + fmt
Restioson Jun 15, 2022
19d02ec
Tests compile & properly Drop SendFuture
Restioson Jun 16, 2022
bffd74a
Code cleanup
Restioson Jun 16, 2022
c585852
Convert receive to future combinator
Restioson Jun 16, 2022
01b8539
Send message back into channel if recv dropped
Restioson Jun 16, 2022
aad5630
Allow ReceiveFutures to be cancelled
Restioson Jun 16, 2022
67a3c18
Merge branch 'master' into inbox-refactor
Restioson Jun 17, 2022
3411c58
Replace drop_notice infrastructure
Restioson Jun 18, 2022
0e1ffd1
Change eq/ord semantics & add test
Restioson Jun 18, 2022
93b3e96
Reimplement fn capacity
Restioson Jun 18, 2022
3e37854
Remove BroadcastMessage, rename wrapper to it
Restioson Jun 18, 2022
523e7f1
Fmt
Restioson Jun 18, 2022
3894dd1
Address review comments
Restioson Jun 18, 2022
5b0d4d1
Merge branch 'master' into inbox-refactor
Restioson Jun 20, 2022
1342c5e
Use while let & fmt
Restioson Jun 20, 2022
08e335a
Fix copy-paste error in doc
Restioson Jun 20, 2022
5bfbaa7
Remove redundant map
Restioson Jun 20, 2022
f0e2f11
Use ? over combinator (shorter)
Restioson Jun 20, 2022
de52e47
Encapsulate ref counters a bit better
Restioson Jun 20, 2022
97c2681
Use map_or
Restioson Jun 20, 2022
8c5e563
Rename broadcast / stolen
Restioson Jun 20, 2022
17c4893
pub(crate) -> pub
Restioson Jun 20, 2022
b8f66fc
TxStrong::new -> TxStrong::try_new
Restioson Jun 20, 2022
b602c09
Remove unnecessary impl of BroadcastEnvelope
Restioson Jun 20, 2022
43d2e1b
Remove unnecessary Clone impl
Restioson Jun 20, 2022
08b7c5c
Remove unused AddressMessage type
Restioson Jun 20, 2022
fad6a55
Merge branch 'master' into inbox-refactor
Restioson Jun 20, 2022
65a1e2c
Port tests from #51
thomaseizinger Jun 20, 2022
64773ba
Merge branch 'master' into inbox-refactor
Restioson Jun 20, 2022
1b83c2e
Remove unnecessary early return
Restioson Jun 20, 2022
f306cda
Sink impl
Restioson Jun 20, 2022
e088b65
Fix cloning issue
Restioson Jun 20, 2022
ef54834
Fmt & move broadcast to Address
Restioson Jun 20, 2022
4bdd00a
Clarify why `message_channel::private` needs to exist (#87)
thomaseizinger Jun 20, 2022
5534582
Reorder match & remove old comment
Restioson Jun 20, 2022
4ca0426
Address some review comments
Restioson Jun 20, 2022
75dcfd6
Reorder functions
Restioson Jun 20, 2022
fd80e24
bounded broadcast POC
Restioson Jun 21, 2022
b9aeabe
Test other types of send
Restioson Jun 21, 2022
9c351df
Fmt
Restioson Jun 21, 2022
21e6976
impl len
Restioson Jun 21, 2022
8a0c87e
update tracing
Restioson Jun 21, 2022
9b1c81b
update async trait
Restioson Jun 21, 2022
e24ad3a
Update for minimal versions & clippy
Restioson Jun 21, 2022
8c2fe7e
Allow priority to be set
Restioson Jun 21, 2022
f2fd590
Add sink test
Restioson Jun 21, 2022
31672e2
Add handle order test
Restioson Jun 21, 2022
14959f4
Add a send waiter order test
Restioson Jun 21, 2022
af80166
Fmt
Restioson Jun 21, 2022
c8a0160
Add broadcast tail test
Restioson Jun 21, 2022
02efe77
Add yet another broadcast tail test
Restioson Jun 21, 2022
afe08e0
Fmt
Restioson Jun 21, 2022
79c06a4
remove superseded tests
Restioson Jun 21, 2022
705470a
remove unnecessary `let _`
Restioson Jun 21, 2022
9a22a6f
Introduce `BroadcastFuture` (#88)
thomaseizinger Jun 21, 2022
9205ac3
Use SeqCst where unsure
Restioson Jun 21, 2022
b43c94e
Docs
Restioson Jun 21, 2022
1f4a485
Docs & remove negative priority messages
Restioson Jun 21, 2022
cc79d1a
Remove priority enum in favour of option
Restioson Jun 21, 2022
dee90a3
Fmt
Restioson Jun 21, 2022
fef4221
Set priority for addresses and channels
Restioson Jun 21, 2022
79127a0
Set priority for addresses and channels
Restioson Jun 21, 2022
2a72e44
update loc count
Restioson Jun 21, 2022
0aaef6b
Update src/address.rs
Restioson Jun 21, 2022
400dba0
Update src/message_channel.rs
Restioson Jun 21, 2022
f9145bf
Update src/context.rs
Restioson Jun 22, 2022
51de9b6
Address review comments & update example in README
Restioson Jun 22, 2022
f9d63bb
Remove dead comments
Restioson Jun 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ categories = ["asynchronous", "concurrency"]
rust-version = "1.56.0"

[dependencies]
async-trait = "0.1.36"
barrage = "0.2.2"
async-trait = "0.1.56"
catty = "0.1.5"
flume = { version = "0.10.13", default-features = false, features = ["async"] }
futures-core = { version = "0.3.21", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.21", default-features = false }
futures-util = { version = "0.3.21", default-features = false, features = ["sink", "alloc"] }
pollster = "0.2"
pin-project-lite = "0.2.9"
event-listener = "2.4.0"
spin = { version = "0.9.3", default-features = false, features = ["spin_mutex"] }

# Feature `timing`
futures-timer = { version = "3.0", optional = true, default-features = false }
Expand All @@ -41,7 +40,7 @@ wasm-bindgen = { version = "0.2", optional = true, default-features = false }
wasm-bindgen-futures = { version = "0.4", optional = true, default-features = false }

# Feature `with-tracing-0_1`
tracing = { version = "0.1", optional = true, default-features = false }
tracing = { version = "0.1.35", optional = true, default-features = false }

[dev-dependencies]
rand = "0.8"
Expand Down Expand Up @@ -96,7 +95,7 @@ name = "backpressure"
required-features = ["with-tokio-1"]

[[example]]
name = "address_is_sink"
name = "address_sink"
required-features = ["with-tokio-1", "tokio/full"]

[[example]]
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ For better ergonomics with xtra, try the [spaad](https://crates.io/crates/spaad)

## Features
- Safe: there is no unsafe code in xtra.
- Tiny: xtra is less than 2kloc.
- Tiny: xtra is around 2kloc.
- Lightweight: xtra has few dependencies, most of which are lightweight (except `futures`).
- Asynchronous and synchronous message handlers.
- Simple asynchronous message handling interface which allows `async`/`await` syntax even when borrowing `self`.
Expand All @@ -32,7 +32,11 @@ impl Printer {
}
}

impl Actor for Printer {}
#[async_trait]
impl Actor for Printer {
type Stop = ();
async fn stopped(self) {}
}

struct Print(String);

Expand All @@ -41,7 +45,7 @@ impl Handler<Print> for Printer {
type Return = ();

async fn handle(&mut self, print: Print, _ctx: &mut Context<Self>) {
self.times += 1; // no ActorFuture or anything just to access `self`
self.times += 1;
println!("Printing {}. Printed {} times so far.", print.0, self.times);
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/address_is_sink.rs → examples/address_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn main() {
repeat(10)
.take(4)
.map(|number| Ok(Add(number)))
.forward(addr.clone())
.forward(addr.clone().into_sink())
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/basic_wasm_bindgen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
crate-type = ["cdylib", "rlib"]

[dependencies]
wasm-bindgen = { version = "0.2.68", default-features = false }
wasm-bindgen = { version = "0.2.81", default-features = false }
wasm-bindgen-futures = { version = "0.4.13", default-features = false }
xtra = { path = "../..", features = ["with-wasm_bindgen-0_2"] }
async-trait = "0.1"
Expand Down
18 changes: 7 additions & 11 deletions examples/crude_bench.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::time::{Duration, Instant};

use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use std::future::Future;
use std::time::{Duration, Instant};
use xtra::prelude::*;
use xtra::spawn::Tokio;
use xtra::NameableSending;
use xtra::Receiver;
use xtra::SendFuture;
use xtra::{ActorErasedSending, NameableSending};

struct Counter {
count: usize,
Expand Down Expand Up @@ -94,7 +92,7 @@ impl Handler<TimeReturn> for ReturnTimer {
}
}

const COUNT: usize = 50_000_000; // May take a while on some machines
const COUNT: usize = 10_000_000; // May take a while on some machines

async fn do_address_benchmark<R>(
name: &str,
Expand All @@ -108,7 +106,7 @@ async fn do_address_benchmark<R>(

// rounding overflow
for _ in 0..COUNT {
let _ = f(&addr).await;
let _ = f(&addr).now_or_never();
}

// awaiting on GetCount will make sure all previous messages are processed first BUT introduces
Expand Down Expand Up @@ -149,13 +147,11 @@ async fn do_parallel_address_benchmark<R>(

async fn do_channel_benchmark<M, RM>(
name: &str,
f: impl Fn(
&dyn MessageChannel<M, Return = ()>,
) -> SendFuture<(), BoxFuture<'static, Receiver<()>>, RM>,
f: impl Fn(&dyn MessageChannel<M, Return = ()>) -> SendFuture<(), ActorErasedSending<()>, RM>,
) where
Counter: Handler<M, Return = ()> + Send,
M: Send + 'static,
SendFuture<(), BoxFuture<'static, Receiver<()>>, RM>: Future,
SendFuture<(), ActorErasedSending<()>, RM>: Future,
{
let addr = Counter { count: 0 }.create(None).spawn(&mut Tokio::Global);
let chan = &addr as &dyn MessageChannel<M, Return = ()>;
Expand Down
3 changes: 1 addition & 2 deletions examples/interleaved_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ impl Handler<Initialized> for ActorB {
async fn handle(&mut self, m: Initialized, ctx: &mut Context<Self>) {
println!("ActorB: Initialized");
let actor_a = m.0;
let send = actor_a.send(Hello);
ctx.join(self, send).await.unwrap();
ctx.join(self, actor_a.send(Hello)).await.unwrap();
}
}

Expand Down
3 changes: 1 addition & 2 deletions examples/send_interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ impl Handler<Greet> for Greeter {
#[tokio::main]
async fn main() {
let addr = Greeter::default().create(None).spawn(&mut Tokio::Global);

greeter_stream(500).forward(addr).await.unwrap();
greeter_stream(500).forward(addr.into_sink()).await.unwrap();
}

fn greeter_stream(delay: u64) -> impl Stream<Item = Result<Greet, Disconnected>> {
Expand Down
Loading