-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure NPE is always through when VirtualProcessor.onError(null) is invoked #25311
Conversation
@@ -259,14 +256,17 @@ import scala.util.control.NonFatal | |||
case est @ Establishing(_, false, OptionVal.None) ⇒ | |||
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onError(${t.getMessage}), loop") | |||
if (!compareAndSet(est, est.copy(onErrorBuffered = OptionVal.Some(ex)))) rec(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where the actual bug being fixed was - if establishing, and the error was successfully buffered, then an NPE wasn't thrown if error was null.
ca00286
to
eb0bcbe
Compare
Test PASSed. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Looks good. Could you add a test or two (I'm thinking Subscriber
and null
states should be easy to test) also, since it doesn't seem to be covered by the TCK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, the actual regression was for Establishing
and that would likely be impossible to reproduce repeatedly. NVM. LGTM!
Yeah, I don't think the case is practical to reproduce, though an interesting thing is, I'm pretty sure I upgraded to Akka Streams 2.5.12 (where the regression was introduced), and didn't encounter the issue, but then when I upgraded to 2.5.13, it now causes tests to fail more often than not. The thing that is reproducing this is the Akka Streams implementation of MicroProfile Reactive Streams Operators, which has a TCK that pretty much throws everything it can at the Reactive Streams TCK, for example here's the verification for the You can see at the bottom it provides a Reactive Streams TCK In #24749 a test was introduced that exercised at least one of the paths in this code, and I'm sure there are other tests that exercise other parts - it's probably just luck that the existing tests haven't been tripped by this yet. |
Ok, I just ran it against 2.5.12, it definitely exhibits in 2.5.12. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, with an additional improvement of the debug lines
@@ -240,16 +240,13 @@ import scala.util.control.NonFatal | |||
case null ⇒ | |||
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onError(${t.getMessage}) -> ErrorPublisher") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those debug lines are never enabled in released code, but for correctness the t.getMessage
should be changed to ex.getMessage
to avoid possible NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
…nvoked This fix is similar to akka#24749, fixing a spec violation bug that was introduced in akka#24722.
Test PASSed. |
Fixes #25339 |
This fix is similar to #24749, fixing a spec violation bug that was introduced in #24722.
I've rearranged the code to ensure that hopefully it never regresses again, instead of having to check if the exception was null in every case, it now simply does it at the end, assuming it successfully reaches the end.