-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
+str Add onErrorComplete operator. #31639
Conversation
53c8d5d
to
79b4ce5
Compare
4a0d34e
to
82cdeb9
Compare
f6a452c
to
f189504
Compare
Hi @He-Pin, Thanks for the effort, but this feature doesn't really address my original request. One of my requirements is that I didn't want to catch exceptions blindly, but rather avoid throwing them in the first place (as |
I think we can add a akka's timeout exception,which extends the current TimeoutExpection? |
#31640 does resolve my issue with the exception (if you add the Having said, I think the |
@ncreep Yes, I think the current |
@He-Pin, that does sound good. But I wonder whether there's something more general that can be done here. Some implementation that won't be specific to just the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this, together with #31640 it nicely covers the various timeout scenarios.
I wonder if we should skip the no-param variation though, handing it Throwable
in user code seems concise enough for the catch-all IMO.
Also not sure about the name, should it rather be something like completeOn[IdleTimeoutException]
? Recover would be to turn the stream back into something working, but this operator always completes.
Cool, I was following the old name, your |
@ncreep I was thinking about that too, but never came out a better idea. Will try to get this merged first:) do you have something in mind? |
f189504
to
acf6574
Compare
Still on holiday, will try to find more time to get it ready. |
6278b13
to
c935b2c
Compare
akka-stream/src/main/scala/akka/stream/impl/fusing/CompleteOn.scala
Outdated
Show resolved
Hide resolved
@He-Pin, no, unfortunately I'm not proficient enough in the underlying mechanics of Akka streams to have something concrete in mind. But I can imagine that whatever the general solution would be it would have to include some change to the underlying signalling mechanism. So that you can "catch" completion signals and then convert those into exceptions. So on its own, I'm probably reinventing the wheel here, so I'll stop babbling... |
@patriknw Any suggestion? I want to allocate one day to continue work on this. |
c935b2c
to
59628f6
Compare
0512877
to
7f96cb2
Compare
791345a
to
73715c1
Compare
f64674a
to
e39f338
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, lets go with the empty param list like this 👍
e39f338
to
042f240
Compare
042f240
to
ac553f6
Compare
@patriknw I have rebased on to the current main |
3585715
to
ce06a9a
Compare
* | ||
* '''Cancels when''' downstream cancels | ||
*/ | ||
def completeOn(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Source[Out, Mat] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the naming has been discussed, but this name doesn't include much about that it is about errors. Might be difficult to discover. onErrorComplete
is better. I still like recoverWithComplete
because it's close to what we already have, but seems like @johanandren didn't like it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are :
onErrorComplete(..)
onErrorContinue(...)
onErrorMap(...)
onErrorReturn(...)
onErrorResume(...)
onErrorStop(...)
in reactor-core. and should we use completeOn
, completeWhen
, or recoverWithComplete
here? naming is always hard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@octonato What's your input? the recoverWithComplete
align better but seems like completeWhen
is a great name too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that onErrorComplete
is better for discoverability and it express much better that the method has the intention to be used in case of an error.
About recoverWithComplete
, it is less interesting, I think. Internally, we are 'recovering' with empty Source, but that's an implementation detail.
From a user perspective, I don't think recover makes sense because we are not recovering (and continuing) when an error occurs. We are completing gracefully.
That said, maybe a overloaded onErrorComplete
(Predicated and PF) will do the job. Clear intention and discoverable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johanandren I updated the PR to onErrorComplete
and the https://github.com/akka/akka-persistence-r2dbc is using Flux
too.
And as we here we only matching on error types, and completeOn
/completeWhen
is more generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go with onErrorComplete
ce06a9a
to
3a85e16
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
#31671
@marcospereira @ncreep Would you like to give some inputs?
Have to duplicate some code, as the ambiguous reference to overloaded definition issue.
Still need to add some tests.