Skip to content

Commit

Permalink
=htc #748 optimize frame creation for streamed WS messages
Browse files Browse the repository at this point in the history
Previously, a single empty frame was created to start a WS message. Now,
a streamed message will only be begun when the first data comes in.
  • Loading branch information
jrudolph committed Jan 16, 2017
1 parent 88c36cf commit 6e084f5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 28 deletions.
Expand Up @@ -6,9 +6,9 @@ package akka.http.impl.engine.ws

import akka.NotUsed
import akka.util.ByteString
import akka.stream.scaladsl.{ Source, Flow }

import akka.stream.scaladsl.{ Flow, Source }
import Protocol.Opcode
import akka.http.impl.util.StreamUtils
import akka.http.scaladsl.model.ws._

/**
Expand All @@ -22,9 +22,20 @@ private[http] object MessageToFrameRenderer {
// FIXME: fragment?
Source.single(FrameEvent.fullFrame(opcode, None, data, fin = true))

def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, NotUsed] =
Source.single(FrameEvent.empty(opcode, fin = false)) ++
data.map(FrameEvent.fullFrame(Opcode.Continuation, None, _, fin = false)) ++
def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, Any] =
data.via(StreamUtils.statefulMap(() {
var isFirst = true

{ data
val frameOpcode =
if (isFirst) {
isFirst = false
opcode
} else Opcode.Continuation

FrameEvent.fullFrame(frameOpcode, None, data, fin = false)
}
})) ++
Source.single(FrameEvent.emptyLastContinuationFrame)

Flow[Message]
Expand Down
Expand Up @@ -256,6 +256,15 @@ private[http] object StreamUtils {
}
}
}

/**
* Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map.
*/
def statefulMap[T, U](functionConstructor: () T U): Flow[T, U, NotUsed] =
Flow[T].statefulMapConcat { ()
val f = functionConstructor()
i f(i) :: Nil
}
}

/**
Expand Down
Expand Up @@ -266,13 +266,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
pushMessage(msg)
val sub = pub.expectSubscription()

expectFrameHeaderOnNetwork(Opcode.Binary, 0, fin = false)

val data1 = data.take(3)
val data2 = data.drop(3)

sub.sendNext(data1)
expectFrameOnNetwork(Opcode.Continuation, data1, fin = false)
expectFrameOnNetwork(Opcode.Binary, data1, fin = false)

sub.sendNext(data2)
expectFrameOnNetwork(Opcode.Continuation, data2, fin = false)
Expand All @@ -288,13 +286,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
pushMessage(msg)
val sub = pub.expectSubscription()

expectFrameHeaderOnNetwork(Opcode.Binary, 0, fin = false)

val data1 = data.take(3)
val data2 = data.drop(3)

sub.sendNext(data1)
expectMaskedFrameOnNetwork(Opcode.Continuation, data1, fin = false)
expectMaskedFrameOnNetwork(Opcode.Binary, data1, fin = false)

sub.sendNext(data2)
expectMaskedFrameOnNetwork(Opcode.Continuation, data2, fin = false)
Expand Down Expand Up @@ -323,15 +319,13 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
pushMessage(msg)
val sub = pub.expectSubscription()

expectFrameHeaderOnNetwork(Opcode.Text, 0, fin = false)

val text1 = text.take(3)
val text1Bytes = ByteString(text1, "UTF-8")
val text2 = text.drop(3)
val text2Bytes = ByteString(text2, "UTF-8")

sub.sendNext(text1)
expectFrameOnNetwork(Opcode.Continuation, text1Bytes, fin = false)
expectFrameOnNetwork(Opcode.Text, text1Bytes, fin = false)

sub.sendNext(text2)
expectFrameOnNetwork(Opcode.Continuation, text2Bytes, fin = false)
Expand All @@ -353,12 +347,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
pushMessage(msg)
val sub = pub.expectSubscription()

expectFrameHeaderOnNetwork(Opcode.Text, 0, fin = false)
sub.sendNext(half1)

expectNoNetworkData()
sub.sendNext(half2)
expectFrameOnNetwork(Opcode.Continuation, ByteString(gclef, "utf8"), fin = false)
expectFrameOnNetwork(Opcode.Text, ByteString(gclef, "utf8"), fin = false)
}
"for a streamed message with a chunk being larger than configured maximum frame size" in pending
"and mask input on the client side" in new ClientTestSetup {
Expand All @@ -368,15 +361,13 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
pushMessage(msg)
val sub = pub.expectSubscription()

expectFrameOnNetwork(Opcode.Text, ByteString.empty, fin = false)

val text1 = text.take(3)
val text1Bytes = ByteString(text1, "UTF-8")
val text2 = text.drop(3)
val text2Bytes = ByteString(text2, "UTF-8")

sub.sendNext(text1)
expectMaskedFrameOnNetwork(Opcode.Continuation, text1Bytes, fin = false)
expectMaskedFrameOnNetwork(Opcode.Text, text1Bytes, fin = false)

sub.sendNext(text2)
expectMaskedFrameOnNetwork(Opcode.Continuation, text2Bytes, fin = false)
Expand Down Expand Up @@ -423,12 +414,10 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
val msg = BinaryMessage(Source.fromPublisher(outPub))
pushMessage(msg)

expectFrameHeaderOnNetwork(Opcode.Binary, 0, fin = false)

val outSub = outPub.expectSubscription()
val outData1 = ByteString("abc", "ASCII")
outSub.sendNext(outData1)
expectFrameOnNetwork(Opcode.Continuation, outData1, fin = false)
expectFrameOnNetwork(Opcode.Binary, outData1, fin = false)

val pingMask = Random.nextInt()
val pingData = maskedASCII("pling", pingMask)._1
Expand Down Expand Up @@ -509,12 +498,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage(Source.fromPublisher(pub))
pushMessage(msg)
expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false)

val data = ByteString("abc", "ASCII")
val dataSub = pub.expectSubscription()
dataSub.sendNext(data)
expectFrameOnNetwork(Opcode.Continuation, data, fin = false)
expectFrameOnNetwork(Opcode.Binary, data, fin = false)

dataSub.sendComplete()
expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true)
Expand Down Expand Up @@ -550,12 +538,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage(Source.fromPublisher(pub))
pushMessage(msg)
expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false)

val data = ByteString("abc", "ASCII")
val dataSub = pub.expectSubscription()
dataSub.sendNext(data)
expectFrameOnNetwork(Opcode.Continuation, data, fin = false)
expectFrameOnNetwork(Opcode.Binary, data, fin = false)

dataSub.sendComplete()
expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true)
Expand Down Expand Up @@ -614,12 +601,11 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec with
val pub = TestPublisher.manualProbe[ByteString]()
val msg = BinaryMessage(Source.fromPublisher(pub))
pushMessage(msg)
expectFrameOnNetwork(Opcode.Binary, ByteString.empty, fin = false)

val data = ByteString("abc", "ASCII")
val dataSub = pub.expectSubscription()
dataSub.sendNext(data)
expectFrameOnNetwork(Opcode.Continuation, data, fin = false)
expectFrameOnNetwork(Opcode.Binary, data, fin = false)

messageOut.sendComplete()
expectNoNetworkData() // need to wait for substream to close
Expand Down

0 comments on commit 6e084f5

Please sign in to comment.