Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

=htc #20236 enable streaming responses with Connection: close #20254

Merged
merged 1 commit into from Apr 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -169,21 +169,17 @@ private[http] object OutgoingConnectionBlueprint {
private def entitySubstreamStarted = entitySource ne null
private def idle = this
private var completionDeferred = false
private var completeOnMessageEnd = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the existing boolean field be used for both completion scenarios? (doesn't matter much, just curious)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about that too but wasn't sure if completionDeferred is guaranteed to be set after completeOnMessageEnd. completionDeferred |= closeRequested should work though.


def setIdleHandlers(): Unit = {
if (completionDeferred) {
completeStage()
} else {
setHandler(in, idle)
setHandler(out, idle)
}
}
def setIdleHandlers(): Unit =
if (completeOnMessageEnd || completionDeferred) completeStage()
else setHandlers(in, out, idle)

def onPush(): Unit = grab(in) match {
case ResponseStart(statusCode, protocol, headers, entityCreator, closeRequested) ⇒
val entity = createEntity(entityCreator) withSizeLimit parserSettings.maxContentLength
push(out, HttpResponse(statusCode, headers, entity, protocol))
if (closeRequested) completeStage()
completeOnMessageEnd = closeRequested

case MessageStartError(_, info) ⇒
throw IllegalResponseException(info)
Expand Down
Expand Up @@ -7,7 +7,6 @@ package akka.http.impl.engine.client
import scala.concurrent.duration._
import scala.reflect.ClassTag
import org.scalatest.Inside
import org.scalatest.concurrent.ScalaFutures
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.util.ByteString
import akka.event.NoLogging
Expand Down Expand Up @@ -111,10 +110,72 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.

requestsSub.sendComplete()
netOut.expectComplete()
netInSub.sendComplete()
responses.expectComplete()
}

"has a response with a chunked entity and Connection: close" in new TestSetup {
sendStandardRequest()

sendWireData(
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|Connection: close
|
|""")
sendWireData("3\nABC\n")
sendWireData("4\nDEFX\n")
sendWireData("0\n\n")

val HttpResponse(_, _, HttpEntity.Chunked(ct, chunks), _) = expectResponse()
ct shouldEqual ContentTypes.`application/octet-stream`

val probe = TestSubscriber.manualProbe[ChunkStreamPart]()
chunks.runWith(Sink.fromSubscriber(probe))
val sub = probe.expectSubscription()
sub.request(4)
probe.expectNext(HttpEntity.Chunk("ABC"))
probe.expectNext(HttpEntity.Chunk("DEFX"))
probe.expectNext(HttpEntity.LastChunk)
probe.expectComplete()

// explicit `requestsSub.sendComplete()` not needed
netOut.expectComplete()
responses.expectComplete()
}

"has a request with a chunked entity and Connection: close" in new TestSetup {
requestsSub.sendNext(HttpRequest(
entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, Source(List("ABC", "DEFX").map(ByteString(_)))),
headers = List(Connection("close"))
))

expectWireData(
"""GET / HTTP/1.1
|Connection: close
|Host: example.com
|User-Agent: akka-http/test
|Transfer-Encoding: chunked
|Content-Type: text/plain; charset=UTF-8
|
|""")
expectWireData("3\nABC\n")
expectWireData("4\nDEFX\n")
expectWireData("0\n\n")

sendWireData(
"""HTTP/1.1 200 OK
|Connection: close
|Content-Length: 0
|
|""")

expectResponse()

// explicit `requestsSub.sendComplete()` not needed
responses.expectComplete()
netOut.expectComplete()
}

"exhibits eager request stream completion" in new TestSetup {
requestsSub.sendNext(HttpRequest())
requestsSub.sendComplete()
Expand Down Expand Up @@ -759,7 +820,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
}

val (netOut, netIn) = {
val netOut = TestSubscriber.manualProbe[ByteString]
val netOut = TestSubscriber.manualProbe[ByteString]()
val netIn = TestPublisher.manualProbe[ByteString]()

RunnableGraph.fromGraph(GraphDSL.create(OutgoingConnectionBlueprint(Host("example.com"), settings, NoLogging)) { implicit b ⇒
Expand Down