Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Provide a select! macro #39

Open
nikola-jokic opened this issue Dec 5, 2023 · 6 comments
Open

Feature: Provide a select! macro #39

nikola-jokic opened this issue Dec 5, 2023 · 6 comments

Comments

@nikola-jokic
Copy link

Select would be very helpful especially when one channel is used to communicate cancellation while another is communicating elements.

However, I did not test closing a channel, but coming from go background, it would be very nice to have a select statement.

@fereidani
Copy link
Owner

Hi Nikola,
I come from a Golang background too. I did not find a single case of Golang select that cannot be replaced with rust enum and match statements, if you give me an example I will consider adding select to the library, you can convert every select to a message packaged as an enum and use match similar to the way that you use select in Golang. I think select is an anti-pattern for rust that should be avoided by using enum, which is faster and simpler.

@nikola-jokic
Copy link
Author

Thank you for considering it!

The problem I have with communicating through channels using enums is that the producer must know the enum variant that it should push. The cancellation is the best example I can come up with. I install a SIGINT handler on main. Once the signal is received, I would like to propagate that termination to multiple independent execution units in my code. Go's context package is a perfect example. It closes the channel, so everything using the same context knows that it should terminate.

Now, if we don't use select, and use enum, I would have to create a separate channel for each independent execution unit. Then, once I receive SIGINT, on each of those channels, I would have to push a correct enum variant. But let's set aside this inconvenience.
Let's say the main installs a handler on SIGINT and propagates a signal. Main spawns a listener task (or thread, doesn't matter). The listener task creates N workers. Now, I have to create a channel with an enum, that enum has to have at least 2 variants:

  1. Termination -- produced by the main
  2. WorkerMessage -- produced by the worker

But why should the worker know about the termination type? And even worse, what stops the worker from terminating the listener by sending a termination variant? This completely removes information hiding and allows components responsibilities that they should not have in the first place.

If you ask me, select is needed so we can compose multiple independent execution units that communicate through their own channels. The caller should choose the frequency, prioritization or how to react to each message. Each producer does not care about the context in which it is invoked. It simply communicates the type it knows.

@fereidani
Copy link
Owner

Hi Nikola,

Thank you for your detailed explanation. I understand your concerns regarding the use of enums and channels in Rust as compared to the select mechanism in Go, especially in the context of implementing a SIGINT handler to terminate multiple independent execution units.

In Rust, the issue of a worker accidentally terminating the listener by sending a termination variant can indeed be a concern. However, this issue is not unique to Rust and can also occur in Go if a coroutine mistakenly closes a shared channel. The key in both languages is to design your system with clear responsibilities and encapsulations.

In Rust, you can achieve a similar level of encapsulation and safety by using a combination of enums, channels, and encapsulated logic in structs or functions. This way, you can ensure that only specific parts of your code have the ability to send certain messages, like a termination signal.

Here's an example in Rust:

use kanal::{unbounded, Sender};
use std::thread;

enum Message {
    Termination,
    WorkerMessage(String),
}

// WorkerSender struct to encapsulate the Sender
struct WorkerSender {
    sender: Sender<Message>,
}

impl WorkerSender {
    fn new(sender: Sender<Message>) -> WorkerSender {
        WorkerSender { sender }
    }
    fn send(&self, message: String) {
        self.sender.send(Message::WorkerMessage(message)).unwrap();
    }
}

fn main() {
    let (tx, rx) = unbounded();

    // Spawn workers using the WorkerSender
    for i in 0..5 {
        let worker_sender = WorkerSender::new(tx.clone());
        thread::spawn(move || {
            worker(worker_sender, i);
        });
    }

    // rest of your logic here
}

fn worker(worker_sender: WorkerSender, id: i32) {
    let message = format!("Message from worker {}", id);
    worker_sender.send(message);
    // Note: Worker does not have direct access to send Termination
}

Let me know what you think.

@nikola-jokic
Copy link
Author

Oh, you are completely right, but your example beautifully demonstrates the need for a select statement.
The power of concurrency is that you can create multiple independent execution units that communicate using a channel. Let's say a worker produces string messages for simplicity's sake.

fn worker(msg_tx: Sender<String>, cancel_rx: Receiver<()>) {
  // do something
  tx.send(message);
}

When you look at the API of this function, you know the worker produces strings. It may receive a cancellation signal. It is completely independent of the context in which it is invoked. It has an API where it produces strings and may receive cancellation signals.

Then, the caller decides to handle multiple things. I will write it here in Go to illustrate it:

ch := make(chan string, 1)
workerStop := make(chan struct{}, 1)
go worker(ch, workerStop)
stop := make(chan struct{}, 1)
select {
case <- stop:
  // start cleaning up internal state
  // decide how to stop workers, for simplicity, just close the channel
  close(workerStop)
  // handle rest of the cleanup
case <- ch:
  // handle message
}

As you can see, the worker is completely independent of the context, and the select handles multiple paths of execution. Without select, we would have to create an enum with select variants, then write a wrapper like you did to limit the scope and then use the concurrent worker. So if you want to start a worker in another context, you would have to create a different wrapper with different message types.
Having said this, another added benefit of select statements is having channels with different buffer sizes. Let's say we have two types of workers (worker A and worker B). They produce values at different rates. Let's also say that worker A produces values at a significantly faster rate than worker B. Based on the ready channels, select can pick one pseudo-randomly. If I decide to share the same channel with enum variants, then it can take a long time before a message from worker B is handled. Also, I would have to create a channel with the buffer size by taking into the account buffer size needed for both worker A and worker B. And not only that, let's say worker A has a buffer size of 10, and worker B has a buffer size of 1. I create a channel of 11 elements. But worker A produces 11 values before they are received so worker B is blocked. The ideal scenario here is to block only worker A, while worker B can safely continue to push its data.

And for performance argument (although this is probably not a problem unless you are running on embedded), enum takes the size of the largest element. The worst case scenario for the example above is that worker B produces large messages, while worker A produces small messages. Worker B needs only 1 element buffer size while worker A needs 10. Buffer allocation would be 11 * WorkerBMessage, while we only need 1 * WorkerBMessage + 10 * WorkerAMessage. Again, this is a crazy edge case but just something to point out 😄

Lastly, please feel free to close this issue if you think it is not worth doing ☺️. At the end of the day, you would be the one supporting a new feature, and this is not something simple. I'm just trying to describe why having select and completely independent channels benefits the caller. I feel like merging messages into a single channel eliminates other benefits of select (mostly pseudo-random selection, decoupling and buffer sizes), that can't be easily implemented.

@newfla
Copy link

newfla commented Jan 18, 2024

@nikola-jokic maybe tokio::select + tokio_utils::CancellationToken should do the trick

Example

@nikola-jokic
Copy link
Author

Oh for sure, I'm using tokio select macro in my async environments, but I wanted to use this library for my sync environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants