New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix in VirtualProcessor #24722
Bugfix in VirtualProcessor #24722
Conversation
this.required_spec109_mustIssueOnSubscribeForNonNullSubscriber() | ||
i += 1 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this, but just to show that it passes the PR-validation (I got this to fail even locally before fix)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok please remove then :)
obj match { | ||
case subscription: Subscription ⇒ | ||
if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(subscriber, subscription) | ||
val establishing = Establishing.create(subscriber) | ||
if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(establishing, subscription) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen before is that the state was set to Both
here, but before establishSubscription
would get to call subscriber.onSubscribe
onComplete
would execute and complete the sub, which would then get an onSubscribe
call after being completed (that's the spec violation bug).
I fixed this by introducing an intermediate state Establishing
which isn't changed to Both
until after the onSubscribe
call.
case _ ⇒ // spec violation or cancellation race, but nothing we can do | ||
case Establishing(s) ⇒ | ||
// keep trying until subscription established and can complete it | ||
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}), loop") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems I missed the recursion waiting for establish to complete here, but I'm not sure it should the done or my comment is wrong.
case Establishing(s) ⇒ | ||
// keep trying until subscription established and can complete it | ||
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete, loop") | ||
onComplete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here we hot-spin to guarantee the onComplete
happens after the subscription has happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure that will complete?
is there a risk of live lock (starvation) if several threads ends up spinning here and there is no thread remaining to complete the subscription?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only situation it wouldn't complete is if the establishing subscription never completes nor fails, I'm not sure what scenario that would be (or what we could possibly do about it).
I don't know the RS spec good enough to answer the part about several threads. A can several threads call try to complete the same subscriber? @ktoso ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's dangerous to spin and depend on that another thread unlocks the spinning. See for example this discussion https://groups.google.com/d/msg/akka-user/dRxTX8WmWlA/ku-nM0KcHJ8J and #17216 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for digging that up, had no idea.
I think the solution would be to buffer onComplete, if establishing, rather than spinning, and then having establish trigger the complete if needed when done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be illegal to not complete establishing a sub, but it could happen; good to buffer onComplete instead
build.sbt
Outdated
@@ -344,7 +344,7 @@ lazy val streamTestkit = akkaModule("akka-stream-testkit") | |||
|
|||
lazy val streamTests = akkaModule("akka-stream-tests") | |||
.dependsOn(streamTestkit % "test->test", remote % "test->test", stream) | |||
.settings(Dependencies.streamTests) | |||
.settings(Dependencies.streamTests ++ Dependencies.streamTestsTck) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover, I'll remove
Test FAILed. |
This reverts commit ae5e41c.
956b756
to
0bb6779
Compare
Test FAILed. |
MiMa failure (added new internal classes) |
Test PASSed. |
resolved conflict after Jame's fix in onError handler here; Stared long at impl and looks correct indeed The resolution 85dc427 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, please see conflict resolution and merge
Test FAILed. |
Test PASSed. |
case _ ⇒ // spec violation or cancellation race, but nothing we can do | ||
case est @ Establishing(_, false, OptionVal.None) ⇒ | ||
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onError(${t.getMessage}), loop") | ||
if (!compareAndSet(est, est.copy(onErrorBuffered = OptionVal.Some(ex)))) rec(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spec violation bug here - if the error is successfully buffered, and the exception was null, then this method doesn't throw an NPE, which it's required to do.
…nvoked This fix is similar to akka#24749, fixing a spec violation bug that was introduced in akka#24722.
…nvoked This fix is similar to akka#24749, fixing a spec violation bug that was introduced in akka#24722.
…nvoked This fix is similar to akka#24749, fixing a spec violation bug that was introduced in akka#24722.
…nvoked This fix is similar to akka#24749, fixing a spec violation bug that was introduced in akka#24722.
* [PORT[ Bugfix in VirtualProcessor (akka/akka#24722) * Update API Verify list * Fix errors reported by TCK * Remove all recursions
Fixes #24581