Skip to content

Commit

Permalink
Merge pull request #19757 from drewhk/wip-19553-one2onebidi-fix2-drewhk
Browse files Browse the repository at this point in the history
#19553 Fix One2OneBidi not failing if inner flow completion wins the race with cancellation
  • Loading branch information
drewhk committed Feb 12, 2016
2 parents ebb915a + 35c0688 commit db8ebd7
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
/**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
package akka.http.impl.util

import akka.NotUsed
import akka.stream.scaladsl.BidiFlow

import scala.util.control.NoStackTrace
import akka.stream._
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }

object One2OneBidiFlow {
/**
* INTERNAL API
*/
private[http] object One2OneBidiFlow {

case class UnexpectedOutputException(element: Any) extends RuntimeException(element.toString) with NoStackTrace
case object OutputTruncationException extends RuntimeException with NoStackTrace
case object OutputTruncationException
extends RuntimeException("Inner stream finished before inputs completed. Outputs might have been truncated.")
with NoStackTrace

/**
* Creates a generic ``BidiFlow`` which verifies that another flow produces exactly one output element per
Expand Down Expand Up @@ -80,9 +86,10 @@ object One2OneBidiFlow {
}
} else throw new UnexpectedOutputException(element)
}
override def onUpstreamFinish(): Unit =
if (pending == 0 && !innerFlowCancelled) complete(outOut)
override def onUpstreamFinish(): Unit = {
if (pending == 0 && isClosed(inIn) && !innerFlowCancelled) complete(outOut)
else throw OutputTruncationException
}
})

setHandler(outOut, new OutHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.http.impl.engine.client

import akka.http.impl.util.One2OneBidiFlow

import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.{ ActorMaterializerSettings, FlowShape, ActorMaterializer }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,6 @@ class HttpServerSpec extends AkkaSpec(

val HttpRequest(POST, _, _, entity, _) = expectRequest()
responses.sendNext(HttpResponse(entity = entity))
responses.sendComplete()

expectResponseWithWipedDate(
"""HTTP/1.1 200 OK
Expand All @@ -689,6 +688,7 @@ class HttpServerSpec extends AkkaSpec(
rec(100000)

netIn.sendComplete()
responses.sendComplete()
requests.request(1)
requests.expectComplete()
netOut.expectComplete()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.impl.util

import java.util.concurrent.atomic.AtomicInteger

import akka.NotUsed
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, _ }
import org.scalactic.ConversionCheckedTripleEquals

import scala.concurrent.Await
import scala.concurrent.duration._

class One2OneBidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
implicit val materializer = ActorMaterializer()

"A One2OneBidiFlow" must {

def test(flow: Flow[Int, Int, NotUsed]) =
Source(List(1, 2, 3)).via(flow).grouped(10).runWith(Sink.head)

"be fully transparent for valid one-to-one streams" in assertAllStagesStopped {
val f = One2OneBidiFlow[Int, Int](-1) join Flow[Int].map(_ * 2)
Await.result(test(f), 1.second) should ===(Seq(2, 4, 6))
}

"be fully transparent to errors" in {
val f = One2OneBidiFlow[Int, Int](-1) join Flow[Int].map(x 10 / (x - 2))
an[ArithmeticException] should be thrownBy Await.result(test(f), 1.second)
}

"trigger an `OutputTruncationException` if the wrapped stream completes early" in assertAllStagesStopped {
val flowInProbe = TestSubscriber.probe[Int]()
val flowOutProbe = TestPublisher.probe[Int]()

val testSetup = One2OneBidiFlow[Int, Int](-1) join Flow.fromSinkAndSource(
Sink.fromSubscriber(flowInProbe),
Source.fromPublisher(flowOutProbe))

val upstreamProbe = TestPublisher.probe[Int]()
val downstreamProbe = TestSubscriber.probe[Int]()

Source.fromPublisher(upstreamProbe).via(testSetup).runWith(Sink.fromSubscriber(downstreamProbe))

upstreamProbe.ensureSubscription()
downstreamProbe.ensureSubscription()
flowInProbe.ensureSubscription()
flowOutProbe.ensureSubscription()

downstreamProbe.request(1)
flowInProbe.request(1)

upstreamProbe.sendNext(1)
flowInProbe.expectNext(1)
flowOutProbe.sendNext(1)
downstreamProbe.expectNext(1)

flowOutProbe.sendComplete()
upstreamProbe.expectCancellation()
flowInProbe.expectError(One2OneBidiFlow.OutputTruncationException)
downstreamProbe.expectError(One2OneBidiFlow.OutputTruncationException)
}

"trigger an `OutputTruncationException` if the wrapped stream cancels early" in assertAllStagesStopped {
val flowInProbe = TestSubscriber.probe[Int]()
val flowOutProbe = TestPublisher.probe[Int]()

val testSetup = One2OneBidiFlow[Int, Int](-1) join Flow.fromSinkAndSource(
Sink.fromSubscriber(flowInProbe),
Source.fromPublisher(flowOutProbe))

val upstreamProbe = TestPublisher.probe[Int]()
val downstreamProbe = TestSubscriber.probe[Int]()

Source.fromPublisher(upstreamProbe).via(testSetup).runWith(Sink.fromSubscriber(downstreamProbe))

upstreamProbe.ensureSubscription()
downstreamProbe.ensureSubscription()
flowInProbe.ensureSubscription()
flowOutProbe.ensureSubscription()

downstreamProbe.request(1)
flowInProbe.request(1)

upstreamProbe.sendNext(1)
flowInProbe.expectNext(1)
flowOutProbe.sendNext(1)
downstreamProbe.expectNext(1)

flowInProbe.cancel()
upstreamProbe.expectCancellation()

flowOutProbe.sendComplete()
downstreamProbe.expectError(One2OneBidiFlow.OutputTruncationException)
}

"trigger an `UnexpectedOutputException` if the wrapped stream produces out-of-order elements" in assertAllStagesStopped {
new Test() {
inIn.sendNext(1)
inOut.requestNext() should ===(1)

outIn.sendNext(2)
outOut.requestNext() should ===(2)

outOut.request(1)
outIn.sendNext(3)
outOut.expectError(new One2OneBidiFlow.UnexpectedOutputException(3))
}
}

"fully propagate cancellation" in assertAllStagesStopped {
new Test() {
inIn.sendNext(1)
inOut.requestNext() should ===(1)

outIn.sendNext(2)
outOut.requestNext() should ===(2)

outOut.cancel()
outIn.expectCancellation()

inOut.cancel()
inIn.expectCancellation()
}
}

"backpressure the input side if the maximum number of pending output elements has been reached" in assertAllStagesStopped {
val MAX_PENDING = 24

val out = TestPublisher.probe[Int]()
val seen = new AtomicInteger

Source(1 to 1000)
.log("", seen.set)
.via(One2OneBidiFlow[Int, Int](MAX_PENDING) join Flow.fromSinkAndSourceMat(Sink.ignore, Source.fromPublisher(out))(Keep.left))
.runWith(Sink.ignore)

Thread.sleep(50)
val x = seen.get()
(1 to 8) foreach out.sendNext
Thread.sleep(50)
seen.get should ===(x + 8)

out.sendComplete() // To please assertAllStagesStopped
}
}

class Test(maxPending: Int = -1) {
val inIn = TestPublisher.probe[Int]()
val inOut = TestSubscriber.probe[Int]()
val outIn = TestPublisher.probe[Int]()
val outOut = TestSubscriber.probe[Int]()

Source.fromPublisher(inIn).via(One2OneBidiFlow[Int, Int](maxPending) join Flow.fromSinkAndSourceMat(Sink.fromSubscriber(inOut), Source.fromPublisher(outIn))(Keep.left)).runWith(Sink.fromSubscriber(outOut))
}
}

This file was deleted.

0 comments on commit db8ebd7

Please sign in to comment.