Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are reordered with detached sending #225

Closed
Quotique opened this issue May 5, 2024 · 7 comments
Closed

Messages are reordered with detached sending #225

Quotique opened this issue May 5, 2024 · 7 comments

Comments

@Quotique
Copy link

Quotique commented May 5, 2024

Hello. I'm trying to migrate my project to xtra 0.6.0 (from 0.5.2) and looking for replacement for do_send_async method (without awaiting for response). As I understood it must be something like .send(smth).detach().await. But now I have a problem with an order of received messages. It is not same as they are sent.

Here is example with version 0.5.2 (works):

use async_trait::async_trait;
use xtra::prelude::*;
use xtra::spawn::Tokio;

#[derive(Default)]
struct Printer {
    last_printed: usize,
}

struct Print(usize);

impl Actor for Printer {}

impl Message for Print {
    type Result = ();
}

#[async_trait]
impl Handler<Print> for Printer {
    async fn handle(&mut self, print: Print, _ctx: &mut Context<Self>) {
        assert!(self.last_printed < print.0);
        println!("Printing {}", print.0);
        self.last_printed = print.0;
    }
}

#[tokio::main]
async fn main() {
    let addr = Printer::default().create(None).spawn(&mut Tokio::Global);
    for i in 1..1_000_000_000 {
        let _ = addr
            .do_send_async(Print(i))
            .await
            .expect("Printer should not be dropped");
    }
}

Same with version 0.6.0 (fails):

use xtra::prelude::*;

#[derive(Default, xtra::Actor)]
struct Printer {
    last_printed: usize,
}

struct Print(usize);

impl Handler<Print> for Printer {
    type Return = ();

    async fn handle(&mut self, print: Print, _ctx: &mut Context<Self>) {
        assert!(self.last_printed < print.0);
        println!("Printing {}", print.0);
        self.last_printed = print.0;
    }
}

#[tokio::main]
async fn main() {
    let addr = xtra::spawn_tokio(Printer::default(), Mailbox::unbounded());
    for i in 1..1_000_000_000 {
        let _ = addr
            .send(Print(i))
            .detach()
            .await
            .expect("Printer should not be dropped");
    }
}

Do you have any tips for achieving similar behavior?

@thomaseizinger
Copy link
Collaborator

thomaseizinger commented May 5, 2024

See #94 for a discussion on this. The tl;dr is that this is by design. Detaching from the response literally means you give up control over scheduling. It is like spawning tasks or threads.

If you want things in sequence, wait for the response and then queue the next message. If you don't want to block the current task on that, move the entire loop into a new task:

tokio::spawn(async move {
   for ... {
       address.send().await;
   }
})

@Quotique
Copy link
Author

Quotique commented May 6, 2024

Unfortunately, none of the options suits me. I made a small patch for my problem, maybe it will be useful for someone else: Quotique@74f20c2

@Quotique Quotique closed this as completed May 6, 2024
@thomaseizinger
Copy link
Collaborator

Can you explain why that doesn't work for your usecase? We thought about this a lot so I am curious to learn where the current model seems to fall short.

@Quotique
Copy link
Author

Quotique commented May 6, 2024

I use a router actor for load balancing between worker actors. In my case, messages can be grouped, for example, by user id. This way I can have a separate actor to handle individual user requests and a lightweight balancer. Messages from different users are processed independently. But messages from the same user must be processed in the same order. For example, the user can cancel a previous request.

So

  1. Router by design can't wait for a response from the worker.
  2. Reverse reordering on worker side is too complicated.
  3. Spawning a task on each message leads to huge overhead on router side. This may become a bottleneck.
  4. If I spawn one task to wait for responses, I need a sending mechanism for futures and so on.

Maybe I missed something, but it seemed easiest to me to make the patch.

@thomaseizinger
Copy link
Collaborator

3. Spawning a task on each message leads to huge overhead on router side. This may become a bottleneck.

I'd like to see benchmarks for this assumption. A task in tokio only incurs a single allocation. You should be able to spawn several thousands a second without problems.

You also wouldn't spawn a task for each message, that is redundant. You only need to spawn a task for a group of messages that cares about ordering. Unless you are doing absolutely trivial stuff, the overhead of spawning a single task is neglible in that context.

  1. Router by design can't wait for a response from the worker.

Maybe your router shouldn't be an actor? Xtra supports work-stealing by default (it is an mpmc channel). So you can spawn multiple actors that read from the same mailbox (by cloning it).

@Quotique
Copy link
Author

Quotique commented May 7, 2024

Maybe your router shouldn't be an actor? Xtra supports work-stealing by default (it is an mpmc channel). So you can spawn multiple actors that read from the same mailbox (by cloning it).

Yeah, I know about this option, but my actors are statefull, I need a guarantee that all user messages will be proceed by same actor. Work-stealing delivers message to a first free actor, right?
So, my router looks like this:

impl Handler<MyCoolMessage> for Router {
    type Return = ();

    async fn handle(&mut self, msg: MyCoolMessage, ctx: &mut Context<Self>) {
        if let Some(worker) = self.routes.get(&msg.user_id) {
            let _ = worker.send(msg).await;
        } else {
             // Error handling stuff
        }
    }
}

I'd like to see benchmarks for this assumption. A task in tokio only incurs a single allocation. You should be able to spawn several thousands a second without problems.

You're probably right. But tokio scheduler is a black box for me. I'm not sure that everything will fine if I spawn a million of tasks per second. I'll benchmark different approaches later. But now I need to focus on other tasks.

@thomaseizinger
Copy link
Collaborator

thomaseizinger commented May 7, 2024

Maybe your router shouldn't be an actor? Xtra supports work-stealing by default (it is an mpmc channel). So you can spawn multiple actors that read from the same mailbox (by cloning it).

Yeah, I know about this option, but my actors are statefull, I need a guarantee that all user messages will be proceed by same actor.

I can only speak from an abstract point of view but if you have multiple messages that need to arrive in the same order at the same actor, perhaps the messages are too fine granular and whatever you are delegating to your actor should just be one message?

Guarding state is where the actor model shines. Actors and their messages are like (micro) services and their APIs. They should always be in a consistent state from the perspective of their callers. They can't control how another service uses their API so they should be able to handle every message in every possible state.

If some messages only make sense as a group, I'd consider merging the messages to instead model whatever the group represents as an operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants