Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix in VirtualProcessor #24722

Merged
merged 18 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion akka-stream/src/main/mima-filters/2.5.11.backwards.excludes
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# #24604 Deduplicate logic for IODispatcher
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$")

# #24581 RS violation
ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.VirtualProcessor$Both")
178 changes: 133 additions & 45 deletions akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,22 @@ import scala.util.control.NonFatal
* INTERNAL API
*/
@InternalApi private[stream] object VirtualProcessor {

// intentional syntax to make compile time constant
final val Debug = false

case object Inert {
val subscriber = new CancellingSubscriber[Any]
}
case class Both(subscriber: Subscriber[Any])
final case class Both(subscriber: Subscriber[Any])
object Both {
def create(s: Subscriber[_]) = Both(s.asInstanceOf[Subscriber[Any]])
}

final case class Establishing(sub: Subscriber[Any])
object Establishing {
def create(s: Subscriber[_]) = Establishing(s.asInstanceOf[Subscriber[Any]])
}
}

/**
Expand All @@ -51,29 +60,38 @@ import scala.util.control.NonFatal
* downstream and upstream, this needs an atomic state machine which looks a
* little like this:
*
* +--------------+ (2) +------------+
* | null | ----------> | Subscriber |
* +--------------+ +------------+
* | |
* (1) | | (1)
* \|/ \|/
* +--------------+ (2) +------------+ --\
* | Subscription | ----------> | Both | | (4)
* +--------------+ +------------+ <-/
* | |
* (3) | | (3)
* \|/ \|/
* +--------------+ (2) +------------+ --\
* | Publisher | ----------> | Inert | | (4, *)
* +--------------+ +------------+ <-/
*
* +--------+ (2) +---------------+
* | null +------------>+ Subscriber |
* +---+----+ +-----+---------+
* | |
* (1)| | (1)
* v v
* +---+----------+ (2) +-----+---------+
* | Subscription +------>+ Establishing |
* +---+----------+ +-----+---------+
* | |
* | | (4)
* | v
* | +-----+---------+ ---
* | (3) | Both | | (5)
* | +-----+---------+ <--
* | |
* | |
* v v
* +---+----------+ (2) +-----+---------+ ---
* | Publisher +-----> | Inert | | (5, *)
* +--------------+ +---------------+ <--
*
*
* The idea is to keep the major state in only one atomic reference. The actions
* that can happen are:
*
* (1) onSubscribe
* (2) subscribe
* (3) onError / onComplete
* (4) onNext
* (4) establishing subscription completes
* (5) onNext
* (*) Inert can be reached also by cancellation after which onNext is still fine
* so we just silently ignore possible spec violations here
*
Expand All @@ -98,20 +116,28 @@ import scala.util.control.NonFatal
import VirtualProcessor._

override def toString: String = s"VirtualProcessor(${this.hashCode()})"
if (VirtualProcessor.Debug) println(s"created: $this")

override def subscribe(s: Subscriber[_ >: T]): Unit = {
@tailrec def rec(sub: Subscriber[Any]): Unit =
@tailrec def rec(sub: Subscriber[Any]): Unit = {
get() match {
case null ⇒ if (!compareAndSet(null, s)) rec(sub)
case null ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).subscribe.rec($s) -> sub")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does VirtualPublisher#$hashCode mean in these prints?

no $ for sub?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to identify different VirtualPublisher instances, so that's why I included the hashcode, when I was trying to figure it out. I think default java Object does a "Type#hashcode" when it toStrings so mimicked that. The interesting part about sub is already in the string as $s is the actual subscription

Not 100% sure we need to leave the debugging in here at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, now I see the $h...

if (!compareAndSet(null, s)) rec(sub)
case subscription: Subscription ⇒
if (compareAndSet(subscription, Both(sub))) establishSubscription(sub, subscription)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscription).subscribe.rec($s) -> Establishing(sub)")
val establishing = Establishing(sub)
if (compareAndSet(subscription, establishing)) establishSubscription(establishing, subscription)
else rec(sub)
case pub: Publisher[_] ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($pub).subscribe.rec($s) -> Inert")
if (compareAndSet(pub, Inert)) pub.subscribe(sub)
else rec(sub)
case _ ⇒
case other ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).subscribe.rec($s): rejectAdditionalSubscriber")
rejectAdditionalSubscriber(sub, "VirtualProcessor")
}
}

if (s == null) {
val ex = subscriberMustNotBeNullException
Expand All @@ -121,14 +147,17 @@ import scala.util.control.NonFatal
}

override final def onSubscribe(s: Subscription): Unit = {

@tailrec def rec(obj: AnyRef): Unit =
@tailrec def rec(obj: AnyRef): Unit = {
get() match {
case null ⇒ if (!compareAndSet(null, obj)) rec(obj)
case null ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onSubscribe.rec($s) -> ${obj.getClass}")
if (!compareAndSet(null, obj)) rec(obj)
case subscriber: Subscriber[_] ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscriber).onSubscribe.rec($s) moving to both")
obj match {
case subscription: Subscription ⇒
if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(subscriber, subscription)
val establishing = Establishing.create(subscriber)
if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(establishing, subscription)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen before is that the state was set to Both here, but before establishSubscription would get to call subscriber.onSubscribe onComplete would execute and complete the sub, which would then get an onSubscribe call after being completed (that's the spec violation bug).

I fixed this by introducing an intermediate state Establishing which isn't changed to Both until after the onSubscribe call.

else rec(obj)
case pub: Publisher[_] ⇒
getAndSet(Inert) match {
Expand All @@ -137,9 +166,11 @@ import scala.util.control.NonFatal
}
}
case _ ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(_).onSubscribe.rec($s) spec violation")
// spec violation
tryCancel(s)
}
}

if (s == null) {
val ex = subscriptionMustNotBeNullException
Expand All @@ -148,18 +179,21 @@ import scala.util.control.NonFatal
} else rec(s)
}

private def establishSubscription(subscriber: Subscriber[_], subscription: Subscription): Unit = {
private def establishSubscription(establishing: Establishing, subscription: Subscription): Unit = {
val wrapped = new WrappedSubscription(subscription)
try {
subscriber.onSubscribe(wrapped)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription(wrapped)")
establishing.sub.onSubscribe(wrapped)
// Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before
// onSubscribe completed
wrapped.ungateDemandAndRequestBuffered()
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription(wrapped) -> Both")
set(Both(establishing.sub)) // only place we transition from establishing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can also be (already) in Both, but I guess that is fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless it could also have moved to Inert and we are setting it back to Both by this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only be able to go to Both by first being Establishing, if it isn't I made a mistake somewhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you follow who is calling establishSubscription it's one case where it's already Both and before that Subscriber

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that one now while rewriting to not do the spin. So fixed in a push soon.

} catch {
case NonFatal(ex) ⇒
set(Inert)
tryCancel(subscription)
tryOnError(subscriber, ex)
tryOnError(establishing.sub, ex)
}
}

Expand All @@ -172,43 +206,68 @@ import scala.util.control.NonFatal
@tailrec def rec(ex: Throwable): Unit =
get() match {
case null ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onError(${t.getMessage}) -> ErrorPublisher")
if (!compareAndSet(null, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case s: Subscription ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> ErrorPublisher")
if (!compareAndSet(s, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case Both(s) ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onError(${t.getMessage}) -> ErrorPublisher")
set(Inert)
try tryOnError(s, ex)
finally if (t == null) throw ex // must throw NPE, rule 2:13
case s: Subscriber[_] ⇒ // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> Inert")
getAndSet(Inert) match {
case Inert ⇒ // nothing to be done
case _ ⇒ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
case _ ⇒ // spec violation or cancellation race, but nothing we can do
case Establishing(s) ⇒
// keep trying until subscription established and can complete it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"keep trying until" is misleading? it only retries if someone else changed it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups, I think it's a left over comment from the spinning version. I'll update

if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}), loop")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems I missed the recursion waiting for establish to complete here, but I'm not sure it should the done or my comment is wrong.

rec(ex)

case other ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onError(${t.getMessage}). spec violation or cancellation race")
// spec violation or cancellation race, but nothing we can do
}

val ex = if (t == null) exceptionMustNotBeNullException else t
rec(ex)
}

@tailrec override final def onComplete(): Unit =
@tailrec override final def onComplete(): Unit = {
get() match {
case null ⇒ if (!compareAndSet(null, EmptyPublisher)) onComplete()
case s: Subscription ⇒ if (!compareAndSet(s, EmptyPublisher)) onComplete()
case Both(s) ⇒
case null ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onComplete -> EmptyPublisher")
if (!compareAndSet(null, EmptyPublisher)) onComplete()
case s: Subscription ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> EmptyPublisher")
if (!compareAndSet(s, EmptyPublisher)) onComplete()
case b @ Both(s) ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert")
set(Inert)
tryOnComplete(s)
case s: Subscriber[_] ⇒ // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert")
set(Inert)
EmptyPublisher.subscribe(s)
case _ ⇒ // spec violation or cancellation race, but nothing we can do
case Establishing(s) ⇒
// keep trying until subscription established and can complete it
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete, loop")
onComplete()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we hot-spin to guarantee the onComplete happens after the subscription has happened.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure that will complete?
is there a risk of live lock (starvation) if several threads ends up spinning here and there is no thread remaining to complete the subscription?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only situation it wouldn't complete is if the establishing subscription never completes nor fails, I'm not sure what scenario that would be (or what we could possibly do about it).

I don't know the RS spec good enough to answer the part about several threads. A can several threads call try to complete the same subscriber? @ktoso ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's dangerous to spin and depend on that another thread unlocks the spinning. See for example this discussion https://groups.google.com/d/msg/akka-user/dRxTX8WmWlA/ku-nM0KcHJ8J and #17216 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging that up, had no idea.

I think the solution would be to buffer onComplete, if establishing, rather than spinning, and then having establish trigger the complete if needed when done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be illegal to not complete establishing a sub, but it could happen; good to buffer onComplete instead

case other ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onComplete spec violation")
// spec violation or cancellation race, but nothing we can do
}
}

override def onNext(t: T): Unit =
if (t == null) {
val ex = elementMustNotBeNullException
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.onNext(null)")
@tailrec def rec(): Unit =
get() match {
case x @ (null | _: Subscription) ⇒ if (!compareAndSet(x, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec()
Expand All @@ -219,28 +278,47 @@ import scala.util.control.NonFatal
rec()
throw ex // must throw NPE, rule 2:13
} else {
@tailrec def rec(): Unit =
@tailrec def rec(): Unit = {
get() match {
case Both(s) ⇒
try s.onNext(t)
catch {
try {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onNext($t).rec()")
s.onNext(t)
} catch {
case NonFatal(e) ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onNext($t) threw, spec violation -> Inert")
set(Inert)
throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e)
}
case Establishing(s) ⇒
try {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Establishing($s)).onNext($t).rec()")
s.onNext(t)
} catch {
case NonFatal(e) ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Establishing($s)).onNext($t) threw, spec violation -> Inert")
set(Inert)
throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e)
}

case s: Subscriber[_] ⇒ // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onNext($t).rec(): spec violation -> Inert")
val ex = new IllegalStateException(noDemand)
getAndSet(Inert) match {
case Inert ⇒ // nothing to be done
case _ ⇒ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
throw ex
case Inert | _: Publisher[_] ⇒ // nothing to be done
case Inert | _: Publisher[_] ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Inert|Publisher).onNext($t).rec(): nop")
// nothing to be done
case other ⇒
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onNext($t).rec() -> ErrorPublisher")
val pub = ErrorPublisher(new IllegalStateException(noDemand), "failed-VirtualPublisher")
if (!compareAndSet(other, pub)) rec()
else throw pub.t
}
}
rec()
}

Expand All @@ -261,6 +339,7 @@ import scala.util.control.NonFatal

// Release
def ungateDemandAndRequestBuffered(): Unit = {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}.WrappedSubscription($real).ungateDemandAndRequestBuffered")
// Ungate demand
val requests = getAndSet(PassThrough).demand
// And request buffered demand
Expand All @@ -269,11 +348,13 @@ import scala.util.control.NonFatal

override def request(n: Long): Unit = {
if (n < 1) {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}.WrappedSubscription($real).request($n)")
tryCancel(real)
VirtualProcessor.this.getAndSet(Inert) match {
case Both(s) ⇒ rejectDueToNonPositiveDemand(s)
case Inert ⇒ // another failure has won the race
case _ ⇒ // this cannot possibly happen, but signaling errors is impossible at this point
case Both(s) ⇒ rejectDueToNonPositiveDemand(s)
case Establishing(s) ⇒ rejectDueToNonPositiveDemand(s)
case Inert ⇒ // another failure has won the race
case _ ⇒ // this cannot possibly happen, but signaling errors is impossible at this point
}
} else {
// NOTE: At this point, batched requests might not have been dispatched, i.e. this can reorder requests.
Expand All @@ -283,13 +364,19 @@ import scala.util.control.NonFatal
// The only invariant we need to keep is to never emit more requests than the downstream emitted so far.
@tailrec def bufferDemand(n: Long): Unit = {
val current = get()
if (current eq PassThrough) real.request(n)
else if (!compareAndSet(current, Buffering(current.demand + n))) bufferDemand(n)
if (current eq PassThrough) {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription($real).bufferDemand($n) passthrough")
real.request(n)
} else if (!compareAndSet(current, Buffering(current.demand + n))) {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription($real).bufferDemand($n) buffering")
bufferDemand(n)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only adding debug logging

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, didn't catch this, you think I should remove it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, was only an observation for future reviewers

}
bufferDemand(n)
}
}
override def cancel(): Unit = {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription.cancel() -> Inert")
VirtualProcessor.this.set(Inert)
real.cancel()
}
Expand Down Expand Up @@ -317,7 +404,6 @@ import scala.util.control.NonFatal
@InternalApi private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] {
import ReactiveStreamsCompliance._
import VirtualProcessor.Inert

override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(subscriber)
@tailrec def rec(): Unit = {
Expand All @@ -337,7 +423,8 @@ import scala.util.control.NonFatal
rec() // return value is boolean only to make the expressions above compile
}

@tailrec final def registerPublisher(pub: Publisher[_]): Unit =
@tailrec final def registerPublisher(pub: Publisher[_]): Unit = {
if (VirtualProcessor.Debug) println(s"$this.registerPublisher: $pub")
get() match {
case null ⇒
if (!compareAndSet(null, pub)) registerPublisher(pub) // retry
Expand All @@ -352,6 +439,7 @@ import scala.util.control.NonFatal
case unexpected ⇒
throw new IllegalStateException(s"internal error, unexpected state: $unexpected")
}
}

override def toString: String = s"VirtualPublisher(state = ${get()})"
}
Expand Down