Skip to content

Commit

Permalink
Ensure NPE is always through when VirtualProcessor.onError(null) is i…
Browse files Browse the repository at this point in the history
…nvoked (#25311)

* Ensure NPE is always through when VirtualProcessor.onError(null) is invoked
This fix is similar to #24749, fixing a spec violation bug that was
introduced in #24722.
  • Loading branch information
jroper authored and johanandren committed Jul 11, 2018
1 parent 169b81e commit fdcfa9d
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,35 +238,33 @@ import scala.util.control.NonFatal
@tailrec def rec(ex: Throwable): Unit =
get() match {
case null
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onError(${t.getMessage}) -> ErrorPublisher")
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onError(${ex.getMessage}) -> ErrorPublisher")
if (!compareAndSet(null, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case s: Subscription
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> ErrorPublisher")
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${ex.getMessage}) -> ErrorPublisher")
if (!compareAndSet(s, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case Both(s)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onError(${t.getMessage}) -> ErrorPublisher")
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onError(${ex.getMessage}) -> ErrorPublisher")
set(Inert)
try tryOnError(s, ex)
finally if (t == null) throw ex // must throw NPE, rule 2.13
tryOnError(s, ex)
case s: Subscriber[_] // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> Inert")
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${ex.getMessage}) -> Inert")
getAndSet(Inert) match {
case Inert // nothing to be done
case _ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
case est @ Establishing(_, false, OptionVal.None)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onError(${t.getMessage}), loop")
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onError(${ex.getMessage}), loop")
if (!compareAndSet(est, est.copy(onErrorBuffered = OptionVal.Some(ex)))) rec(ex)
case other // spec violation or cancellation race, but nothing we can do
if (t == null) throw ex // must throw NPE, rule 2.13
case other
// spec violation or cancellation race, but nothing we can do
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onError(${t.getMessage}). spec violation or cancellation race")
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onError(${ex.getMessage}). spec violation or cancellation race")
}

val ex = if (t == null) exceptionMustNotBeNullException else t
rec(ex)
// must throw NPE, rule 2.13
if (t == null) throw ex
}

@tailrec override def onComplete(): Unit = {
Expand Down

0 comments on commit fdcfa9d

Please sign in to comment.