Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add receiver.disconnect() for atomic graceful shutdown #959

Closed
wants to merge 1 commit into from

Conversation

ryoqun
Copy link
Contributor

@ryoqun ryoqun commented Feb 6, 2023

Firstly, congrats for adaptation by std::sync::mpsc!

Hi, yes... Me again. :) I'm the guy who submitted this stale issue: #852. (i haven't forget it... just priority thing...)

However, I'm eager for this completely unrelated pr to get across the line this time really :)

In short, there's no way to shutdown channels from the receiver-side with guarantee of no loss of messages.

That's because the only way to shutdown channels explicitly is drop()-ing the last alive receiver and i can't .try_recv(), now that it's gone. ;)

So, there's still a chance sender has managed to sneak more, no matter hard I'm draining with loop-ed try_recv() before drop() at the receiver side in advance. (i.e. tocttou)

In other words, I'd like to clarify the semantics of channel shutdown and want to play with many short-lived ones.

This desire is arising from the use-case where I want to commit tasks from many worker threads to the committer thread and the committer is time-sliced and the worker needs to know which current time slice it succeed to commit or not as extremely fast as possible.

So, I'd like to map the time slice to each crossbeam_channels 1-to-1. At the end of any time slice, the workers-side and the committer-side should agree with identical set of committed tasks. Also, it's okay to take some time to switch to new crossbeam channel after first commit error due to expired time slice, which the committer determines and initiates the rollover via .disconnect().
So that there's no need for round-trip with futexs, which is the overhead for common pattern of passing a sender to return the commit result back. the .disconnect() approach will just require a few of just atomic CASes.

I think there's no performant alternative way, other than trying to persuade 70M-all-time-downloaded crate's maintainer's mind. :)

This pr is still a draft but I'd like to work on more if this sounds sensible (writing tests/supporting all flavors).

Thanks.

@ryoqun
Copy link
Contributor Author

ryoqun commented Feb 11, 2023

@taiki-e hey, i know you're busy... but, could you take a look? I'm very eager to finish this pr off.

@ryoqun
Copy link
Contributor Author

ryoqun commented Feb 28, 2023

@taiki-e i see your tagline says busy.... that said, i still want to tag you after 2 wks. seems you're japanese as well? here's cultural reference: 🐶

@ryoqun
Copy link
Contributor Author

ryoqun commented Apr 19, 2023

@taiki-e hi, I'm still interested in pushing this pr into merging. also, now that rust-lang/rust#108164 is merged downstream (rustc), how about cherry-picking the fix into this repo as well?

@ryoqun
Copy link
Contributor Author

ryoqun commented Aug 31, 2023

@taiki-e hi, another ping. could you spend a little of time to decide whether this api addition could ever be accepted or not at least? thanks in advance. :)

@taiki-e
Copy link
Member

taiki-e commented Dec 21, 2023

Thanks for the PR. I feel this would need investigations of possible designs, their advantages and disadvantages, and how they should interact with new concepts such as reconnection (#750), etc.

@ryoqun
Copy link
Contributor Author

ryoqun commented Dec 22, 2023

I feel this would need investigations of possible designs, their advantages and disadvantages

really thanks for replying!

however, i don't think there's many possible designs, though? Implementing it was straightforward in terms of adjusting the code base. it's semantically just like the drop() possibly with some return value. and when it's returing the value, the value should be owning the channel whollly as it does so only when it was the last receiver. And, i specifically made the fn to return Iter, so that usage is only limited to next()-ing remaining messages.

and how they should interact with new concepts such as reconnection (#750), etc.

like the above reasoning, i think they can just co-exist. i.e. you can freely new_sender() on not-disconnected receivers. and once disconnected successfully (ie it returned Some), you cannot do so, because there should be no live receivers by definition. also, reconnection concept sounds interering.

@John-Nagle
Copy link

John-Nagle commented Mar 18, 2024

Glad to see this happening. I have an application with channels between multiple processes, and clean shutdown without deadlocking requires way too much complexity.

(Use case: a metaverse client, which works much like a game client. When the client disconnects from a server, most of the system needs to shut down, but it's not a panic condition. Disconnection leads back to the login screen. There are channels between various threads, sometimes in both directions. When the only way to shut down a connection is to drop the Sender, the workaround is to encapsulate a Sender in Arc<Mutex<Option>>. Then a disconnect can be forced by taking the sender from the option and deleting it. This is clunky. Also, that mutex can result in deadlocks on bounded channels, if the sender blocks.)

@ryoqun
Copy link
Contributor Author

ryoqun commented May 8, 2024

@John-Nagle thanks for sharing your use case. glad that i hear another independent demand for this. Considering past similar issues as well, now that i think there's some demand for this functionality certainly.

Also, recently I came up another related concern: dropping hugely-buffered channels.

If the channel contains very large number of not-recv-ed messages, dropping it can potentially takes a very long time. And, currently, there's no way to programmatically control which side (receiver or sender) drop the actual channel. Or even which particular instance for that matter. so, just the last instance of receiver or sender could be unlucky. That's problematic when this occurs in some perf sensitive code path. Usually, we want to discard those channels in some back ground processing manner. yet, holding alive sender/receiver for that purpose means it's impossible to notify the opposite side about disconnection...

.... so, i think we need .disconnect() for sender as well, i think.

ideally the new api would look like this:

impl Sender {
    fn disconnect(self) -> Option<DisconnectedSender>;
}
impl Receiver {
    fn disconnect(self) -> Option<DisconnectedReceiver>;
}

struct DisconnectedSender(...)

struct DisconnectedReceiver(...)

impl DisconnectedSender {
    // for the use case where there's `fn finish(self)` on T
    // (i.e. non-trivial object cleaning up code outside `::drop()`)
    fn into_iter(self) -> impl Iterator;
    // ... or no inherent methods?
}

impl DisconnectedReceiver {
    fn into_iter(self) -> impl Iterator;
}

@ryoqun
Copy link
Contributor Author

ryoqun commented May 22, 2024

now that #1040 had been released, I'm focusing on this..

After another long thinking session, I think this is the latest api, which is simple and solves everything:

impl Sender {
    fn disconnect(&self);
    fn connect(&self);
}
impl Receiver {
    fn disconnect(&self);
    fn connect(&self);
}

I'll try to create a pr for this. stay tuned. :)

@ryoqun
Copy link
Contributor Author

ryoqun commented May 23, 2024

Closing this in favor of #1114

@ryoqun ryoqun closed this May 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants