From 6e084f5d80ed66920f63f75554ae91a385140397 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 10 Jan 2017 18:26:47 +0100 Subject: [PATCH] =htc #748 optimize frame creation for streamed WS messages Previously, a single empty frame was created to start a WS message. Now, a streamed message will only be begun when the first data comes in. --- .../engine/ws/MessageToFrameRenderer.scala | 21 +++++++++--- .../akka/http/impl/util/StreamUtils.scala | 9 ++++++ .../http/impl/engine/ws/MessageSpec.scala | 32 ++++++------------- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala index 031ca12f0cd..e9c8502ab37 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala @@ -6,9 +6,9 @@ package akka.http.impl.engine.ws import akka.NotUsed import akka.util.ByteString -import akka.stream.scaladsl.{ Source, Flow } - +import akka.stream.scaladsl.{ Flow, Source } import Protocol.Opcode +import akka.http.impl.util.StreamUtils import akka.http.scaladsl.model.ws._ /** @@ -22,9 +22,20 @@ private[http] object MessageToFrameRenderer { // FIXME: fragment? Source.single(FrameEvent.fullFrame(opcode, None, data, fin = true)) - def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, NotUsed] = - Source.single(FrameEvent.empty(opcode, fin = false)) ++ - data.map(FrameEvent.fullFrame(Opcode.Continuation, None, _, fin = false)) ++ + def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, Any] = + data.via(StreamUtils.statefulMap(() ⇒ { + var isFirst = true + + { data ⇒ + val frameOpcode = + if (isFirst) { + isFirst = false + opcode + } else Opcode.Continuation + + FrameEvent.fullFrame(frameOpcode, None, data, fin = false) + } + })) ++ Source.single(FrameEvent.emptyLastContinuationFrame) Flow[Message] diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 69fec1b7a15..f9e5d808fea 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -256,6 +256,15 @@ private[http] object StreamUtils { } } } + + /** + * Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map. + */ + def statefulMap[T, U](functionConstructor: () ⇒ T ⇒ U): Flow[T, U, NotUsed] = + Flow[T].statefulMapConcat { () ⇒ + val f = functionConstructor() + i ⇒ f(i) :: Nil + } } /** diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 201c18720fe..4858cb1c8a5 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -266,13 +266,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with pushMessage(msg) val sub = pub.expectSubscription() - expectFrameHeaderOnNetwork(Opcode.Binary, 0, fin = false) - val data1 = data.take(3) val data2 = data.drop(3) sub.sendNext(data1) - expectFrameOnNetwork(Opcode.Continuation, data1, fin = false) + expectFrameOnNetwork(Opcode.Binary, data1, fin = false) sub.sendNext(data2) expectFrameOnNetwork(Opcode.Continuation, data2, fin = false) @@ -288,13 +286,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with pushMessage(msg) val sub = pub.expectSubscription() - expectFrameHeaderOnNetwork(Opcode.Binary, 0, fin = false) - val data1 = data.take(3) val data2 = data.drop(3) sub.sendNext(data1) - expectMaskedFrameOnNetwork(Opcode.Continuation, data1, fin = false) + expectMaskedFrameOnNetwork(Opcode.Binary, data1, fin = false) sub.sendNext(data2) expectMaskedFrameOnNetwork(Opcode.Continuation, data2, fin = false) @@ -323,15 +319,13 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with pushMessage(msg) val sub = pub.expectSubscription() - expectFrameHeaderOnNetwork(Opcode.Text, 0, fin = false) - val text1 = text.take(3) val text1Bytes = ByteString(text1, "UTF-8") val text2 = text.drop(3) val text2Bytes = ByteString(text2, "UTF-8") sub.sendNext(text1) - expectFrameOnNetwork(Opcode.Continuation, text1Bytes, fin = false) + expectFrameOnNetwork(Opcode.Text, text1Bytes, fin = false) sub.sendNext(text2) expectFrameOnNetwork(Opcode.Continuation, text2Bytes, fin = false) @@ -353,12 +347,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with pushMessage(msg) val sub = pub.expectSubscription() - expectFrameHeaderOnNetwork(Opcode.Text, 0, fin = false) sub.sendNext(half1) expectNoNetworkData() sub.sendNext(half2) - expectFrameOnNetwork(Opcode.Continuation, ByteString(gclef, "utf8"), fin = false) + expectFrameOnNetwork(Opcode.Text, ByteString(gclef, "utf8"), fin = false) } "for a streamed message with a chunk being larger than configured maximum frame size" in pending "and mask input on the client side" in new ClientTestSetup { @@ -368,15 +361,13 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with pushMessage(msg) val sub = pub.expectSubscription() - expectFrameOnNetwork(Opcode.Text, ByteString.empty, fin = false) - val text1 = text.take(3) val text1Bytes = ByteString(text1, "UTF-8") val text2 = text.drop(3) val text2Bytes = ByteString(text2, "UTF-8") sub.sendNext(text1) - expectMaskedFrameOnNetwork(Opcode.Continuation, text1Bytes, fin = false) + expectMaskedFrameOnNetwork(Opcode.Text, text1Bytes, fin = false) sub.sendNext(text2) expectMaskedFrameOnNetwork(Opcode.Continuation, text2Bytes, fin = false) @@ -423,12 +414,10 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with val msg = BinaryMessage(Source.fromPublisher(outPub)) pushMessage(msg) - expectFrameHeaderOnNetwork(Opcode.Binary, 0, fin = false) - val outSub = outPub.expectSubscription() val outData1 = ByteString("abc", "ASCII") outSub.sendNext(outData1) - expectFrameOnNetwork(Opcode.Continuation, outData1, fin = false) + expectFrameOnNetwork(Opcode.Binary, outData1, fin = false) val pingMask = Random.nextInt() val pingData = maskedASCII("pling", pingMask)._1 @@ -509,12 +498,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with val pub = TestPublisher.manualProbe[ByteString]() val msg = BinaryMessage(Source.fromPublisher(pub)) pushMessage(msg) - expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) val data = ByteString("abc", "ASCII") val dataSub = pub.expectSubscription() dataSub.sendNext(data) - expectFrameOnNetwork(Opcode.Continuation, data, fin = false) + expectFrameOnNetwork(Opcode.Binary, data, fin = false) dataSub.sendComplete() expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true) @@ -550,12 +538,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with val pub = TestPublisher.manualProbe[ByteString]() val msg = BinaryMessage(Source.fromPublisher(pub)) pushMessage(msg) - expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) val data = ByteString("abc", "ASCII") val dataSub = pub.expectSubscription() dataSub.sendNext(data) - expectFrameOnNetwork(Opcode.Continuation, data, fin = false) + expectFrameOnNetwork(Opcode.Binary, data, fin = false) dataSub.sendComplete() expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true) @@ -614,12 +601,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with val pub = TestPublisher.manualProbe[ByteString]() val msg = BinaryMessage(Source.fromPublisher(pub)) pushMessage(msg) - expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false) val data = ByteString("abc", "ASCII") val dataSub = pub.expectSubscription() dataSub.sendNext(data) - expectFrameOnNetwork(Opcode.Continuation, data, fin = false) + expectFrameOnNetwork(Opcode.Binary, data, fin = false) messageOut.sendComplete() expectNoNetworkData() // need to wait for substream to close