Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Panic on `send` in channels by default? #314

Open
matklad opened this Issue Jan 29, 2019 · 22 comments

Comments

Projects
None yet
5 participants
@matklad
Copy link
Contributor

matklad commented Jan 29, 2019

Hi! This is somewhat of a continuation of "should dropping receiver close the sender" issue. I've migrated rust-analyzer to crossbeam-channel 0.3, and the thing I've noticed is that every .send is followed by .unwrap. Perhaps we should make this unwrapping behavior the default, and introduce a separate checked_send which returns a Result?

Sending to a channel without receivers seems to indicate a programmer's error in the majority of cases: there's either a bug in communication structure such that receivers exit too early (direct bug), or a reciver is closed due to a panic (indirect bug). In Rust it is OK to panic on bugs by default, while providing checked alternatives:

  • indexing a vector with [] panic on out of bounds, and has .get for check access
  • in debug, integer overflow panics and has checked_add

Returning a result by default seems to be bad:

  • it is confusing and is not a "pit of success": with Result<T, SendError> the user has choices:
    • forward error (boilerplate which just moves problem to the wrong place)
    • handle error, but correct handling is impossible in most cases
    • ignore error, which is OK (0.2 semantics EDIT: not exactly, 0.2 blocked), but not optimal: it's better to see a panic than a deadlock
    • unwrap: this I think is correct (propagates bugs), but is a hard choice to make, especially for novice user, because "unwraps are bad"
  • it significantly devalues unwrap: unwrap is indeed bad in a lot of places, and its much easier to spot one during code review if there are fewer of them.
  • unwrap happens in the wrong place. If we panic inside crossbeam, we can set a custom message of "sending to a closed channel" instead of a generic one "unwrapped Err".

As a counter point, a similar API is exposed by std's mutexes. They return Results by default, and almost all uses of .lock() are followed by .unwrap/.expect. I would argue that this design is also less ideal, and that std should have panicked by default, providing checked_lock methods for rare cases when you do want to handle this somehow.

@BurntSushi

This comment has been minimized.

Copy link

BurntSushi commented Jan 29, 2019

This is what 0.2.* did. I believe there was a discussion on this point, but I can't find it in the issue tracker.

(I personally tend to agree with you.)

@matklad

This comment has been minimized.

Copy link
Contributor Author

matklad commented Jan 29, 2019

This is what 0.2.* did.

To clarify, 0.2 indeed returned () from .send, but it blocked rather than panicked on closed channel.

@BurntSushi

This comment has been minimized.

Copy link

BurntSushi commented Jan 29, 2019

@matklad Ah right, yes, thank you. That is an important clarification!

@stjepang

This comment has been minimized.

Copy link
Member

stjepang commented Jan 29, 2019

Hi there!

The previous discussion @BurntSushi mentions is: crossbeam-rs/crossbeam-channel#61

A couple points on this issue:

  • Yes, unwrap() on send() is similar to unwrap() on mutex lock(). It's debatable what to do when disconnected/poisoned. Do we return a Result, do we send/lock anyway, do we panic? Honestly, this is just something people will disagree on and no choice will make everyone happy.

  • It's true that [] and + panic, but send() is not an operator - it's a method and therefore more like get() and checked_add(). :) But this might be a bit silly point on my part and I see what you're saying.

  • There is the precedent of send() returning a Result in all other channel crates except chan. And channel crates already have very minor but annoying API differences.

  • Even if send() panics when the channel is disconnected, we still need to return a Result because the channel might be full (if it's bounded). We could get around that by splitting Sender into BoundedSender and UnboundedSender, and then UnboundedSender::send() would return (). But then we'd probably implement some kind of Sender trait for both BoundedSender and UnboundedSender so that they can be passed to Select::send()... It gets messy. :) EDIT: By send() I actually meant try_send().

To summarize, I think send(foo).unwrap() is not bad, just mildly annoying. And all the other options seem to be worse than this. API design is hard. :(

@matklad

This comment has been minimized.

Copy link
Contributor Author

matklad commented Jan 30, 2019

Even if send() panics when the channel is disconnected, we still need to return a Result because the channel might be full (if it's bounded).

Looks like there's some kind of misunderstanding here. Today send blocks if the channel is full, and I don't suggest changing that. To be more precise, I suggest the following changes:

  • rename current send method and send clause in the select! macro to checked_send
  • add a new send method and select! clause like this:
fn send(&self, value: T) { self.check_send().expect("sending to a close channel") }
  • don't change anything about the try_send method.

There is the precedent of send() returning a Result in all other channel crates except chan. And channel crates already have very minor but annoying API differences.

That's true, yeah, but I think crossbeam-channel should be the canonical channel library, and all others should copy API from it :) I suspect the current situation is primarily due to the fact that std's channel return a Result. I am not sure it's a good idea, in this case, to align with std, due to the fact that mpsc is a somewhat underwhelming part of rust standard library (mainly due to the lack of select, but perhaps due to API choices as well?).

but send() is not an operator - it's a method and therefore more like

A silly counterpoint is that next_powe_of_two is also a method, and has the same "panic by default with checked variant" behavior :) But yeah, this I think is the single method in stdlib with this behavior.

It's debatable what to do when disconnected/poisoned. Do we return a Result, do we send/lock anyway, do we panic? Honestly, this is just something people will disagree on and no choice will make everyone happy.

This is the point where I think we disagree the most :) From the code I write and the code I've seen, you just do .send(value).unwrap() every(*) single time. And that seems to be the correct behavior as well.

(*) there are of course situations when Result is useful, but that's why I suggest having a check_send as well, which will make happy the minority for whom .send().unwrap() is the wrong pattern.

Finally, move one stack frame up to the crossbeam-rs/crossbeam-channel#61 and crossbeam-rs/crossbeam-channel#39, I think the story went as follows:

  • Go has strictly unidirectional flow, that has a number of benefits and in general leads to better architecture most of the time. So, 0.2 went with infallible send, without check_send
  • In #61, a major flaw was discovered: the failure mode of infallible send is that it's either deadlocks the thread, or allocates unbounded number of memory, which are pretty nasty outcomes, and which you can only handle by forcefully killing the thread from outside.
  • At the same time, it was understood that in Go failure mode is more benign: channels are bounded and there's deadlock detector to deal with deadlocks.
  • as a result, send in 0.3 returns Result.

However I think my proposal of send/checked_send more faithfully mirrors Go's situation: dataflow is still unidirectional and deadlocks are a bug, which leads to deterministic tearing down of the context. checked_send is a discouraged escape hatch for "know what you are doing" situation.

To be clear, I don't want to argue a lot about this, I don't have too much experience with complex CSP/Actor systems, but the current proliferation of noisy unwraps seems unfortunate :-)

@stjepang

This comment has been minimized.

Copy link
Member

stjepang commented Jan 30, 2019

Looks like there's some kind of misunderstanding here. Today send blocks if the channel is full, and I don't suggest changing that.

Sorry, my mistake! :(

I really had try_send() in mind, which can fail because the channel is either full or disconnected. So if we made try_send() panic when the channel is disconnected, it'd still need to return a Result because the channel could be full.

But anyway, I see now you weren't suggesting to change try_send() at all...

A silly counterpoint is that next_powe_of_two is also a method, and has the same "panic by default with checked variant" behavior :)

Ah, nice! I haven't thought of that one. ;)

That's true, yeah, but I think crossbeam-channel should be the canonical channel library, and all others should copy API from it :)

A bit off-topic, but this is one of the situations where I'd love the libs team to weigh in with their opinion. I want to make decisions that will overall make most people happy, but it's kinda overwhelming since there are so many disagreeing opinions.

There was even a point in time where the Servo folks didn't like the interface of crossbeam-channel v0.2 so much they just forked the library. Even a few third party crates with minor API tweaks popped up.

However I think my proposal of send/checked_send more faithfully mirrors Go's situation: dataflow is still unidirectional and deadlocks are a bug, which leads to deterministic tearing down of the context. checked_send is a discouraged escape hatch for "know what you are doing" situation.

Your analysis of the previous discussions is correct. And the reasoning for send()/checked_send() is totally sound - I honestly don't have any strong counterpoints.

Now, the current API is perfect in many ways except two things might get tiresome: unwrapping on send and the sender/receiver split. Both are sometimes annoying, but not deal breakers, I think.

If we automatically panic on send, that annoyance will go away but then we need to deal with checked_send in select! and the semantics of selection + panicking get a little bit more complicated. Some other oddities might crop up and I'm not sure it's a clear win in the end.

But again, I do acknowledge the benefits of the proposed solution, it's just that there are tradeoffs and I don't feel confident it'd be a total success. :(

@matklad

This comment has been minimized.

Copy link
Contributor Author

matklad commented Jan 30, 2019

But again, I do acknowledge the benefits of the proposed solution, it's just that there are tradeoffs and I don't feel confident it'd be a total success.

That is totally fair! I myself only confident that the proposed API will work better for the kind of code I use crossbeam-channel for: a handful of communicating threads where the amount of data is small, but the error conditions are numerous and a clean tear-down is important.

A bit off-topic, but this is one of the situations where I'd love the libs team to weigh in with their opinion.

👍 I feel it's important to do some kind of libz-blits or an RFC process for crossbeam-channel 1.0: this is one of the "interface" libraries.

@stjepang

This comment has been minimized.

Copy link
Member

stjepang commented Jan 30, 2019

Let's see what the libs team has to say in Berlin, perhaps we can arrange some kind of crossbeam-channel 1.0 RFC this year :)

@BurntSushi

This comment has been minimized.

Copy link

BurntSushi commented Jan 30, 2019

@stjepang I won't be in Berlin, but I think you know my opinions on the matter. :-)

There was even a point in time where the Servo folks didn't like the interface of crossbeam-channel v0.2 so much they just forked the library. Even a few third party crates with minor API tweaks popped up.

Is there more on this? Did the Servo folks give more detail about their concerns?

@BurntSushi

This comment has been minimized.

Copy link

BurntSushi commented Jan 30, 2019

@stjepang I added this ticket to the library team's agenda in Berlin, although I don't know if it will stick. You should bug people while you're there too. :-)

@stjepang

This comment has been minimized.

Copy link
Member

stjepang commented Jan 30, 2019

Err, "forked" is not the right word - "wrapped" is a better one. Relevant comments:

A bit later they updated crossbeam-channel to 0.3 and deleted servo-channel: servo/servo#22142

Thanks, I will go around bugging people! :)

@matklad

This comment has been minimized.

Copy link
Contributor Author

matklad commented Feb 8, 2019

Good example from standard library: RefCell::(borrow|try_borrow): https://doc.rust-lang.org/std/cell/struct.RefCell.html#method.try_borrow

@stjepang

This comment has been minimized.

Copy link
Member

stjepang commented Feb 16, 2019

We discussed crossbeam-channel during All Hands. Sometime this year I will write a blog post/RFC that lays out all the potential breaking changes and unresolved questions we have before publishing version 1.0. The libs team will then review and help us make final decisions.

@asajeffrey

This comment has been minimized.

Copy link

asajeffrey commented Mar 2, 2019

My experience in Servo's use of unwrap() is that we've ended up with a lot of places where programmer's maintain invariants (e.g. "the matching receiver of this send is still alive") which is true during steady-state, but not during shutdown, so we end up with Servo panicking during shutdown quite regularly. There is a lot of Servo code that now looks like:

    let v = match chan.recv() {
        Ok(v) => v,
        Err(e) => return warn!("Receive failed ({}), e"),
    };

Another thing that might be worth discussing is that double-panic kills the entire process, so any panicking which runs during Drop is dangerous, e.g. a send to a thread to get it to do some resource reclamation.

@matklad

This comment has been minimized.

Copy link
Contributor Author

matklad commented Mar 2, 2019

There is a lot of Servo code that now looks like:

This is Err on recv, which is totally valid and required for
shutdown. Perhaps you've meant to show code with error on send
instead?

@asajeffrey would be cool to dig into these cases more. At least from
my experience with programming with channels in a medium-sized code
base, there are usually several ways to arrange communication between
parties.

There's a solution when you explicitelly code for shut-down, and it
sort-of works for planned shutdown, but fails miserably on unexpected
panic.

And there's a solution where you just rely on the ownership semantics
and "close on drop" behavior, where shutdown happens automatically and
works the same for panicing and non-panicking cases.

Panic on send definitely nudges me towards architectures of the second
kind. I don't know if this transfers to the servo-sized codebases, so
that's why it's interesting to see concrete cases here. It maybe the
case that large codebases just have different constraints, but it also
could be the case that channel ownership in Servo is not exactly
ideal :)

Here's a specific case where I believe panicking helped me to write
more robust code:

In rust-analyzer, I have a virtual file system, whose task is to
maintain a consistent snapshot of filesystem state, while watching for
changing files.

The interface to watcher/scanner "actor" of VSF (which is a separate
thread) is a pair of crossbeam channels channels: the actor receives a
directory to watch, scans it using walkdir and then watches for
changes. Results of the scan/watching are send to the output channel, OUT.

The file watching library I am using, notify, exposes notifications
as std::mpsc channel, so I have to burn an additional thread to
convert between the channels. Initially, I was going to send events
directly to OUT, with the following code:

loop {
    let event = match mpsc_receiver.recv() {
    	Ok(it) => it,
	Err(_) => break, // notify library was stopped, return
    };
    match OUT.send(transform_event(event)) {
    	Ok(()) => (),
	Err(_) => break, // the client of the VFS stopped, return
    }
}

However, I realized that the second Err case happens because I fail
to join this event-transforming thread when shutting down the VFS
itself. That is, I was going to leak a thread during shutdown.

So I refactored the code to make this helper thread and the VFS actor
thread essentially scoped threads, which are guaranteed to join on
exiting the scope (due to normal termination, ?, or panic). What
guided me through this refactoring was realization that .send should
never fail, which means that client should outlive VFS and VFS should
outlive the helper thread.

@asajeffrey

This comment has been minimized.

Copy link

asajeffrey commented Mar 2, 2019

The matching code for send is either the same, or just let _ = chan.send(v); depending on what we want to do with the warning, and whether we want to continue in the case that the receiver has closed.

There are two cases where we need to ensure no panicking. The first is in any code which may run in panic-handlers, to avoid double-panic killing the entire process. The second is in any code which does crash-handling, e.g. by displaying a crash page which invites the user to submit an issue report. In particular the constellation (which manages the content threads) and webrender (which is responsible for painting) have to be robust against panic, even if content threads are panicking.

If I ruled the world then we'd have a deny(panic) attribute we could apply to functions where their robustness is important, but unfortunately it's not obvious how to interface this with (e.g.) generic types without ending up going full out into effect systems, effect polymorphism, effect algebras etc. etc. So we end up with lints that just ban syntax like .unwrap().

@emk

This comment has been minimized.

Copy link

emk commented Mar 3, 2019

(I'm posting this here at @BurntSushi's suggestion, based on our discussion on Reddit. Note that a lot of this comment is in the context of tokio::mpsc, or even tokio::mpsc channels being used as streams, and so this may not always directly apply to crossbeam::mpsc. This is submitted as anecdotal evidence. Please take it with a grain of salt.)

At Faraday, we mostly work with tokio channels. And in almost every case, I've found that code calling send must make an explicit decision about how to handle failed send calls, and that panicking is virtually always the wrong answer.

Let me back up and provide a bit more context. Most of the channels and streams that I see at work are ultimately writing to either:

1. A network socket with a tokio::codec, where either of the ends of the codec may be attached to a channel.

We have a partial open source example of this, in our public fork of rust-amqp that uses tokio to implement true bidirectional channels. As you can see, we check the result of every call to send in this code, and either propagate it to our caller or log a warning.

Every one of these send failures can be see in our logs during a typical week, and it would be unacceptable for any of them to panic. A lost AMQP server connection might mean that the caller needs to either (a) reconnect, or (b) log a high-priority error indicating that a queue message has failed. The key insight here is that all of our channels are ultimately mapped to an underlying network socket, which can close unexpectedly with an ECONNRESET. And this maps directly to a send error.

2. Channels being used to emulate Unix pipes.

These channels typically have a structure which conceptually looks something like this:

data_producer | data_transformer | data_sink

The code in data_producer will normally contain something like:

// Pseudo-code! Tokio channels are actually a bit trickier than this.
await!(dest.send(Ok(bytes)))
    .map_err(|_| format_err!("broken pipe in data_producer"))?;

There's generally also a Context type involved somewhere:

struct Context {
    error_channel: mpsc::Sender<Error>,
    log: slog::Logger,
}

The corresponding mpsc::Receiver<Error> is monitored by a supervisor task. We don't have a good open source example of this yet, but you might take a look at the Context type in our unfinished dbcrossbar tool. For the corresponding channel senders, see copy_reader_to_stream (for AsyncRead) or SyncStreamWriter (for Write).

Going back to our example:

data_producer | data_transformer | data_sink

If an error occurs in, say, data_sink, then data_sink will report the error on error_channel, and drop the receiver of the channel it uses to read from data_transformer. This will cause the broken pipe to propagate back through data_transformer and data_producer. This is exactly analogous to the traditional Unix EPIPE / SIGPIPE, which Unix uses to shut down pipe writers when a pipe reader fails unexpectedly.

In fact, it's actually really hard to shut down a loosely-coupled pipeline like this without using something like EPIPE to propagate the error. In the general case, data_producer is living off in its own world, and it may look something like this code, which uses PostgreSQL copy_out to take a data stream from PostgreSQL and forward it directly to a tokio::mpsc::Sender<BytesMut, _>:

    // Use `pipe` and a background thread to convert a `Write` to `Read`.
    let url = url.clone();
    let (mut wtr, stream) = SyncStreamWriter::pipe(ctx.clone());
    thread::spawn(move || {
        // Run our code in a `try` block so we can capture errors returned by
        // `?` without needing to give up ownership of `wtr` to a local closure.
        let result: Result<()> = try {
            let conn = connect(&url)?;
            let stmt = conn.prepare(&sql)?;
            stmt.copy_out(&[], &mut wtr)?;
        };

        // Report any errors to our stream.
        if let Err(err) = result {
            error!(ctx.log(), "error reading from PostgreSQL: {}", err);
            if wtr.send_error(err).is_err() {
                error!(ctx.log(), "cannot report error to foreground thread");
            }
        }
    });

Here, the actual send call is occurring deep inside stmt.copy_out, most of which is single-threaded C code. So there's no way to tell PostgreSQL to stop writing data onto that stream except by returning a Unix I/O error, and the classic error in this case is EPIPE.

Outside example: Servo

According to this comment by u/asajeffrey on r/rust, Servo has found that send(...).unwrap() is tricky to get right:

There were quite a few commits that did this, e.g. servo/servo@01b6e4a, but mostly they replaced ....unwrap() by if let Err(e) = ... { return warn!(...); }.

My feeling is that a lot of code is written using unwrap() on the basis of maintaining some invariant of the program, and normally we got that right during steady state, but not always during startup, and quite often not during shutdown. Servo still panics a lot during shutdown, due to code that assumes the other end of a channel is still alive, but the shutdown order ended up being an unanticipated one. Sigh.

This mirrors my experience: Many send failures only occur in exceptional circumstances, such as when an AMQP network connection needs to be restarted, or when the program is shutting down. But if I don't want my program to panic randomly in the middle of an "orderly" shutdown, then I need to carefully think through what to do about each failed send. That may be as simple as logging it with error! so that I can debug it when it starts failing in production 6 months from now.

Outside example: Go channels

I don't write much Go code myself, but I've occasionally needed to debug large Go applications, including Vault and Pachyderm. In my limited experience, the code involved in shutting down multiple goroutines often involves subtle channel-related bugs. (And these goroutine shutdowns may occur every time a connection to a server is lost and needs to be recreated, so abort is not an acceptable solution.)

For an academic example, see this paper, which includes bugs like:

To demonstrate errors in message passing, we use a blocking bug from Kubernetes in Figure 1. The finishReq function creates a child goroutine using an anonymous function at line 4 to handle a request—a common practice in Go server programs. The child goroutine executes fn() and sends result back to the parent goroutine through channel ch at line 6. The child will block at line 6 until the parent pulls result from ch at line 9. Meanwhile, the parent will block at select until either when the child sends result to ch (line 9) or when a timeout happens (line 11). If timeout happens earlier or if Go runtime (non-deterministically) chooses the case at line 11 when both cases are valid, the parent will return from requestReq() at line 12, and no one else can pull result from ch any more, resulting in the child being blocked forever. The fix is to change ch from an unbuffered channel to a buffered one, so that the child goroutine can always send the result even when the parent has exit.

If you look at this closely, it's a classic EPIPE error. Specifically, the parent routine has exited, and the child routine is trying to write to a dead channel. The authors suggest fixing this by adding a buffer to ch, so that the child can write the result to the buffer, which will never be read. Personally, I would be happier if the child's send function failed outright with EPIPE, and the child was forced to figure out how to handle the error. Perhaps the child should just silently quit, or maybe it should log a message, or maybe it needs to use something like Context::error_channel to report the problem to a supervisor. However, one clearly wrong answer in this case would be for the child's send to abort the entire server process.

Note that if the child process were sending multiple messages (perhaps data chunks), the buffered channel workaround wouldn't be enough, and the only easy solution would be for send to fail with an error.

Summary

At least in my experience with tokio channels (and my more limited experience with Go channels), a majority of send calls can fail. This is often triggered when a network socket closes (ECONNRESET) or a pipe reader closes early (EPIPE). This may happen during application shutdown (as in the Servo case) or during a connection restart (our rust-amqp fork).

Because of this experience, my standard coding guiding is assume every send might fail, and decide how to handle it appropriately. 90% of the time, this may mean writing the error to a log somewhere and then giving up, or reporting it to a supervisory channel. But some decision needs to be made. So if there were a #![deny(clippy::unchecked_send)] lint, I would definitely use it as part of our company coding style.

Now, I'll be the first to admit that I'm dragging in examples from tokio::mpsc and even Go here. So maybe none of this applies directly to typical uses of crossbeam::mpsc! But I hope this explains why I consider send(...).unwrap() to be giant warning sign, especially once networks or pipes are involved. Basically, ECONNRESET and EPIPE both map pretty directly to a failed send. And so I've wound up distrusting unwrap() on channels almost as much I distrust it on function calls.

(Actually, I'm really curious about how I could design this kind of code so that sends could never fail! Maybe I'll learn something fascinating and useful today.)

@matklad

This comment has been minimized.

Copy link
Contributor Author

matklad commented Mar 3, 2019

Thank you so much for these examples, @emk! I think they fully apply to crossbeam-channel.

I think the most important case here is failable pipeline: producer | transformer | consumer, where consumer might legitimately fail due to external error. For example, consumer might be writing results to a database, and connection might die with io::Error.

Propagating send errors back from the consumer helps to tear down the pipeline, but several aspects of this approach are sub-optimal:

  • in-flight messages are lost. Specifically, every time a pipeline segment terminates when its sender fails, it drops all messages queued up in its receiver
  • no error information is propagated, only the mere fact that error occurred
  • bugs could be masqueraded as legitimate errors. Arguably, if consumer or transformer panics (that is, it has a bug), producer should panic by default as well.

If we want to maintain unidirectional dataflow (that is, treat closed sender as a bug), a possible solution would be for consumer, upon encountering an io::Error, to send it via an additional errors channel to the entity that spawned the pipeline in the first place, and then to error-log all pending messages. That is, something like this:

while let Some(msg) = receiver.recv() {
    match db.write(msg) {
        Ok(()) => (),
        Err(e) => {
            ctx.error_sink.send(e);
            break;
        }
    }
}

for msg in receiver {
    log::error!("failed to write message to db, db is dead: {:?}", msg)
}

The entity than can shut-down the pipeline at the beginning, by terminating the producer. In the database connection example we can be even fancier, and instead of logging all pending messages, we can send both the io::Error and the Receiver to the supervisor, which could create a new database connection and spawn a new consumer, without loosing the messages.

I do understand that this is a bit of "architecture astronaut" type of the solution, and I haven't actually tired in in practice, so I can't say if it'll work. Though remarkably the Context type you've mentioned seems to be a core part of this solution as well.

The second example is "panicking during shutdown". Here I think handling errors on send produces an objectively less than ideal solution: namely, messages are implicitly lost during shutdown. Maybe it's not a big deal, but maybe it's saving some user state to the database? The solution here seems to be to architecture shutdown sequence in such a way that you terminate producers first, wait for them to finish, and only after terminate consumers.

This is a situation where "panicking by default on send" might nudge you to a better architecture: if dropping receiver while sender is alive is an error, than you'll have to arrange a clean shutdown sequence in topological order. If send returns a result, the path of least resistance is to add logging on send.

Finally, the Go example also seems like the case where result on send is worse solution. Specifically, we have a parent coroutine and a child coroutine, and the problem is that, although parent has spawn the child, it's not interested in child's result for some reason. The proposed solution allows for the situation where the parent has finished, but the child is alive. That is, the child is effectively a zombie: it might hold onto some resources and it might do computations, but they are not interesting to anyone, and child will only know that once it attempts send. I think the better fix would be to make sure that parent just always joins the child in the end, even if it doesn't need the result anymore.

All that said, I am now not sure that panicking on send is the right choice for crossbeam =) I personally am still convinced that this is the right model to think about channels, but I also see that there's a large existing body of code which relies on handling errors on send.

@BurntSushi

This comment has been minimized.

Copy link

BurntSushi commented Mar 3, 2019

Awesome exchange! Thanks so much to both of you for writing all that stuff out. It's a lot of work!

I just wanted to respond to a teeny detail:

but I also see that there's a large existing body of code which relies on handling errors on send.

I think if this were the only concern, we could say, "in crossbeam-channel 1.0, if your code depended on send returning an error, you should use checked_send instead." This is a viable migration since uses of send that check its error will fail to compile under your proposal. So I just mean to say that there does exist a reasonable migration path. Of course, this isn't to say that this is the only concern!

@emk

This comment has been minimized.

Copy link

emk commented Mar 5, 2019

Thank you everybody for this wonderful discussion!

I spent much of today writing async code, and I encountered the first time where I actually wanted to write tx.send(...).expect("..."). I now believe that some very common and totally legitimate uses of channels will always expect the receiver to be present.

But I still believe there other perfectly reasonable use cases where EPIPE is by far the easiest and most correct solution.

I'd like to respond to everybody's comments in more detail, with examples of each use case, but it may have to wait a day or two until I merge this branch, which uses @BurntSushi's csv library to parse a streaming CSV file and convert it to PostgreSQL BINARY format (the relevant code is here, but ignore the README). This will be plugged in between two tokio::sync::mpsc channels as a data_transformer, and it will count on receiving an io::ErrorKind::BrokenPipe if the data consumer fails.

So I hope to have more to say very soon, once I get this example hooked up and passing integration tests!

@emk

This comment has been minimized.

Copy link

emk commented Mar 8, 2019

So I had a very busy week with channels and streams, and everything now seems to be running very nicely in production, so it's time write the follow-up I promised. :-)

I've written a blog post summarizing my experiences with send(...).unwrap(). The most interesting example from that post is the async function below, which I thought contained a safe send call. It actually panicked within the first hour. Can you find the bug?

/// Run a synchronous function `f` in a background worker thread and return its
/// value.
pub(crate) async fn run_sync_fn_in_background<F, T>(
    thread_name: String,
    f: F,
) -> Result<T>
where
    F: (FnOnce() -> Result<T>) + Send + 'static,
    T: Send + 'static,
{
    // Spawn a worker thread outside our thread pool to do the actual work.
    let (sender, receiver) = mpsc::channel(1);
    let thr = thread::Builder::new().name(thread_name);
    let handle = thr
        .spawn(move || {
            sender.send(f()).wait().expect(
                "should always be able to send results from background thread",
            );
        })
        .context("could not spawn thread")?;

    // Wait for our worker to report its results.
    let background_result = await!(receiver.into_future());
    let result = match background_result {
        // The background thread sent an `Ok`.
        Ok((Some(Ok(value)), _receiver)) => Ok(value),
        // The background thread sent an `Err`.
        Ok((Some(Err(err)), _receiver)) => Err(err),
        // The background thread exitted without sending anything. This
        // shouldn't happen.
        Ok((None, _receiver)) => {
            unreachable!("background thread did not send any results");
        }
        // We couldn't read a result from the background thread, probably
        // because it panicked.
        Err(_) => Err(format_err!("background thread panicked")),
    };

    // Block until our worker exits. This is a synchronous block in an
    // asynchronous task, but the background worker already reported its result,
    // so the wait should be short.
    handle.join().expect("background worker thread panicked");
    result
}

I didn't see the bug here. I mean, clearly, the receiver does nothing but block until it gets an answer, so how could it fail?

The problem, it turned out, was that the receiver was owned by a Future, and the Future could be canceled using drop(future) or future.timeout(...). And this drop or timeout might be happening very far away in the source code. So it's incredibly hard to guarantee that this code will never be reused in a way where the future might get destroyed early. And so that send(...).wait().expect(...) is going to inevitably blow up.

I could try to fix this with controls channels or orderly shutdowns, but it means that my APIs no longer compose as nicely, and that I have to painfully vigilant.

I think that these kinds of Future issues might be one of the big differences between @BurntSushi's experiences and my own. Futures are much more cancelable than threads, and this affects the entire design.

@matklad:

in-flight messages are lost. Specifically, every time a pipeline segment terminates when its sender fails, it drops all messages queued up in its receiver

Yes, this is normally the behavior that I want. The data on the channel might be, say, the second half of a database row in PostgreSQL BINARY format. And if PostgreSQL isn't accepting data any more data, then I'm just going to throw it away and recover at a much higher level.

no error information is propagated, only the mere fact that error occurred

I can think of some use cases where it would be nice to get a std::io::Error instead of a SendError, but it's usually not that important. Unix has survived for decades with the vague EPIPE (or SIGPIPE, because too many programs ignore write errors, which is less of a problem in Rust), and TCP stacks just send ECONNRESET.

bugs could be masqueraded as legitimate errors. Arguably, if consumer or transformer panics (that is, it has a bug), producer should panic by default as well.

In general, I set panic = "abort", so I'm 100% OK with send panicking if the receiver panicked. The more interesting case for me is when the receiver exited in an orderly fashion (maybe because PostgreSQL returned an error, or because the receiver was held by a Future that got dropped by future.timeout(...)). In cases like this, with an orderly drop(receiver), then I feel like send should return an error.

The entity than can shut-down the pipeline at the beginning, by terminating the producer. In the database connection example we can be even fancier, and instead of logging all pending messages, we can send both the io::Error and the Receiver to the supervisor, which could create a new database connection and spawn a new consumer, without loosing the messages.

I've written code like this, using things like Arc<AtomicBool> or control channels, and my experience is that it's really hard to get right. Or at least, that I can't get it right, and certainly not in the usual case where I'm mixing channels and futures.

Here I think handling errors on send produces an objectively less than ideal solution: namely, messages are implicitly lost during shutdown. Maybe it's not a big deal, but maybe it's saving some user state to the database?

As a general rule, most of my programs lean towards functional designs, idempotent designs, or proper transactions. If some data needs to be written to disk, I'm going to design around the fact that I need to see a return value from fsync or a successful transaction from my database library. This may mean not using channels for certain things.

Except for rendezvous channels, channels have buffers that can contain one or more items. If the receiver has already hit a hard error and given up, what happens to the contents of the channel buffer?

Basically, any work in progress inside the sender is essentially just an extension of the channel buffer. If the receiver needs to exit abruptly, then both the channel buffer and the sender's work will be lost. Or the receiver's error handling code will need to carefully shut down all active senders and drain the channel buffers. And error handling code tends to be poorly tested and buggy.

All that said, I am now not sure that panicking on send is the right choice for crossbeam =)

It might be the right choice for crossbeam, though I'd still be uneasy about it. But I'm pretty sure that it's not the right choice if receiver is ultimately owned by a future, or for tokio::sync::mpsc in general.

@emk

This comment has been minimized.

Copy link

emk commented Mar 9, 2019

I'm beginning to suspect there's a deeper issue with await! here that I want to understand, and I think than send(...) returning an error isn't the only subtle interaction between channels and await!.

I wrote up another blog post, because this seemed like it was too long for a GitHub issue. Once again, thank you to everybody for participating in this discussion! I've been learning a lot as we talk about these issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.