Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
+htp #18837 JSON framing and framed entity streaming directives
- Loading branch information
Showing
12 changed files
with
1,343 additions
and
3 deletions.
There are no files selected for viewing
61 changes: 61 additions & 0 deletions
61
akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com> | ||
*/ | ||
package akka.stream | ||
|
||
import java.util.concurrent.TimeUnit | ||
|
||
import akka.stream.impl.JsonBracketCounting | ||
import akka.util.ByteString | ||
import org.openjdk.jmh.annotations._ | ||
|
||
@State(Scope.Benchmark) | ||
@OutputTimeUnit(TimeUnit.SECONDS) | ||
@BenchmarkMode(Array(Mode.Throughput)) | ||
class JsonFramingBenchmark { | ||
|
||
/* | ||
Benchmark Mode Cnt Score Error Units | ||
// old | ||
JsonFramingBenchmark.collecting_1 thrpt 20 81.476 ± 14.793 ops/s | ||
JsonFramingBenchmark.collecting_offer_5 thrpt 20 20.187 ± 2.291 ops/s | ||
// new | ||
JsonFramingBenchmark.counting_1 thrpt 20 10766.738 ± 1278.300 ops/s | ||
JsonFramingBenchmark.counting_offer_5 thrpt 20 28798.255 ± 2670.163 ops/s | ||
*/ | ||
|
||
val json = | ||
ByteString( | ||
"""|{"fname":"Frank","name":"Smith","age":42,"id":1337,"boardMember":false}, | ||
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, | ||
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, | ||
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, | ||
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, | ||
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, | ||
|{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin) | ||
|
||
val bracket = new JsonBracketCounting | ||
|
||
@Setup(Level.Invocation) | ||
def init(): Unit = { | ||
bracket.offer(json) | ||
} | ||
|
||
@Benchmark | ||
def counting_1: ByteString = | ||
bracket.poll().get | ||
|
||
@Benchmark | ||
@OperationsPerInvocation(5) | ||
def counting_offer_5: ByteString = { | ||
bracket.offer(json) | ||
bracket.poll().get | ||
bracket.poll().get | ||
bracket.poll().get | ||
bracket.poll().get | ||
bracket.poll().get | ||
bracket.poll().get | ||
} | ||
|
||
} |
150 changes: 150 additions & 0 deletions
150
...-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
/* | ||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com> | ||
*/ | ||
|
||
package docs.http.scaladsl.server.directives | ||
|
||
import akka.NotUsed | ||
import akka.http.scaladsl.marshalling.ToResponseMarshallable | ||
import akka.http.scaladsl.model._ | ||
import akka.http.scaladsl.model.headers.Accept | ||
import akka.http.scaladsl.server.{ UnsupportedRequestContentTypeRejection, UnacceptedResponseContentTypeRejection, JsonSourceRenderingMode } | ||
import akka.stream.scaladsl.{ Flow, Source } | ||
import docs.http.scaladsl.server.RoutingSpec | ||
import spray.json.{ JsValue, JsObject, DefaultJsonProtocol } | ||
|
||
import scala.concurrent.Future | ||
|
||
class JsonStreamingExamplesSpec extends RoutingSpec { | ||
|
||
//#models | ||
case class Tweet(uid: Int, txt: String) | ||
case class Measurement(id: String, value: Int) | ||
//# | ||
|
||
def getTweets() = | ||
Source(List( | ||
Tweet(1, "#Akka rocks!"), | ||
Tweet(2, "Streaming is so hot right now!"), | ||
Tweet(3, "You cannot enter the same river twice."))) | ||
|
||
//#formats | ||
object MyJsonProtocol extends spray.json.DefaultJsonProtocol { | ||
implicit val userFormat = jsonFormat2(Tweet.apply) | ||
implicit val measurementFormat = jsonFormat2(Measurement.apply) | ||
} | ||
//# | ||
|
||
"spray-json-response-streaming" in { | ||
// [1] import generic spray-json marshallers support: | ||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ | ||
|
||
// [2] import "my protocol", for marshalling Tweet objects: | ||
import MyJsonProtocol._ | ||
|
||
// [3] pick json rendering mode: | ||
implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine | ||
|
||
val route = | ||
path("users") { | ||
val users: Source[Tweet, NotUsed] = getTweets() | ||
complete(ToResponseMarshallable(users)) | ||
} | ||
|
||
// tests: | ||
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) | ||
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`)) | ||
|
||
Get("/users").withHeaders(AcceptJson) ~> route ~> check { | ||
responseAs[String] shouldEqual | ||
"""{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + | ||
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + | ||
"""{"uid":3,"txt":"You cannot enter the same river twice."}""" | ||
} | ||
|
||
// endpoint can only marshal Json, so it will *reject* requests for application/xml: | ||
Get("/users").withHeaders(AcceptXml) ~> route ~> check { | ||
handled should ===(false) | ||
rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`))) | ||
} | ||
} | ||
|
||
"response-streaming-modes" in { | ||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ | ||
import MyJsonProtocol._ | ||
implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine | ||
|
||
//#async-rendering | ||
path("users") { | ||
val users: Source[Tweet, NotUsed] = getTweets() | ||
complete(users.renderAsync(parallelism = 8)) | ||
} | ||
//# | ||
|
||
//#async-unordered-rendering | ||
path("users" / "unordered") { | ||
val users: Source[Tweet, NotUsed] = getTweets() | ||
complete(users.renderAsyncUnordered(parallelism = 8)) | ||
} | ||
//# | ||
} | ||
|
||
"spray-json-request-streaming" in { | ||
// [1] import generic spray-json (un)marshallers support: | ||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ | ||
|
||
// [1.1] import framing mode | ||
implicit val jsonFramingMode = akka.http.scaladsl.server.JsonEntityFramingSupport.bracketCountingJsonFraming(Int.MaxValue) | ||
|
||
// [2] import "my protocol", for unmarshalling Measurement objects: | ||
import MyJsonProtocol._ | ||
|
||
// [3] prepareyour persisting logic here | ||
val persistMetrics = Flow[Measurement] | ||
|
||
val route = | ||
path("metrics") { | ||
// [4] extract Source[Measurement, _] | ||
entity(stream[Measurement]) { measurements => | ||
println("measurements = " + measurements) | ||
val measurementsSubmitted: Future[Int] = | ||
measurements | ||
.via(persistMetrics) | ||
.runFold(0) { (cnt, _) => | ||
println("cnt = " + cnt) | ||
cnt + 1 | ||
} | ||
|
||
complete { | ||
measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n""")) | ||
} | ||
} | ||
} | ||
|
||
// tests: | ||
val data = HttpEntity( | ||
ContentTypes.`application/json`, | ||
""" | ||
|{"id":"temp","value":32} | ||
|{"id":"temp","value":31} | ||
| | ||
""".stripMargin) | ||
|
||
Post("/metrics", entity = data) ~> route ~> check { | ||
status should ===(StatusCodes.OK) | ||
responseAs[String] should ===("""{"msg":"Total metrics received: 2"}""") | ||
} | ||
|
||
// the FramingWithContentType will reject any content type that it does not understand: | ||
val xmlData = HttpEntity( | ||
ContentTypes.`text/xml(UTF-8)`, | ||
"""|<data id="temp" value="32"/> | ||
|<data id="temp" value="31"/>""".stripMargin) | ||
|
||
Post("/metrics", entity = xmlData) ~> route ~> check { | ||
handled should ===(false) | ||
rejection should ===(UnsupportedRequestContentTypeRejection(Set(ContentTypes.`application/json`))) | ||
} | ||
} | ||
|
||
} |
96 changes: 96 additions & 0 deletions
96
akka-docs/rst/scala/http/routing-dsl/json-streaming-support.rst
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
.. _json-streaming-scala: | ||
|
||
JSON Streaming | ||
============== | ||
|
||
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON | ||
objects onto one continious HTTP connection. The elements are most often separated using newlines, | ||
however do not have to be and concatenating elements side-by-side or emitting "very long" JSON array is also another | ||
use case. | ||
|
||
In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as: | ||
|
||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala | ||
:snippet: models | ||
|
||
And as always with spray-json, we provide our (Un)Marshaller instances as implicit values uding the ``jsonFormat##`` | ||
method to generate them statically: | ||
|
||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala | ||
:snippet: formats | ||
|
||
.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming | ||
|
||
Responding with JSON Streams | ||
---------------------------- | ||
|
||
In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_. | ||
|
||
Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an | ||
Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses | ||
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the | ||
``akka-http-spray-json-experimental`` module. | ||
to and from ``Source[T,_]`` by using spray-json provided | ||
|
||
Next we import our model's marshallers, generated by spray-json. | ||
|
||
The last bit of setup, before we can render a streaming json response | ||
|
||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala | ||
:snippet: spray-json-response-streaming | ||
|
||
.. _Streaming API: https://dev.twitter.com/streaming/overview | ||
|
||
Customising response rendering mode | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
The mode in which a response is marshalled and then rendered to the HttpResponse from the provided ``Source[T,_]`` | ||
is customisable (thanks to conversions originating from ``Directives`` via ``EntityStreamingDirectives``). | ||
|
||
Since Marshalling is a potentially asynchronous operation in Akka HTTP (because transforming ``T`` to ``JsValue`` may | ||
potentially take a long time (depending on your definition of "long time"), we allow to run marshalling concurrently | ||
(up to ``parallelism`` concurrent marshallings) by using the ``renderAsync(parallelism)`` mode: | ||
|
||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala | ||
:snippet: async-rendering | ||
|
||
The ``renderAsync`` mode perserves ordering of the Source's elements, which may sometimes be a required property, | ||
for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the | ||
data being streamed though, which allows us to explit this property and use ``renderAsyncUnordered(parallelism)``, | ||
which will concurrently marshall up to ``parallelism`` elements and emit the first which is marshalled onto | ||
the HttpResponse: | ||
|
||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala | ||
:snippet: async-unordered-rendering | ||
|
||
This allows us to _potentially_ render elements faster onto the HttpResponse, since it can avoid "head of line blocking", | ||
in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall. | ||
|
||
Consuming JSON Streaming uploads | ||
-------------------------------- | ||
|
||
Sometimes the client may be sending in a streaming request, for example an embedded device initiated a connection with | ||
the server and is feeding it with one line of measurement data. | ||
|
||
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply | ||
back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure | ||
will be applied automatically thanks to using Akka HTTP/Streams). | ||
|
||
|
||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala | ||
:snippet: spray-json-request-streaming | ||
|
||
Implementing custom (Un)Marshaller support for JSON streaming | ||
------------------------------------------------------------- | ||
|
||
While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport`` | ||
is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as | ||
Play JSON, or Jackson directly for example). Such support traits will want to extend the ``JsonEntityStreamingSupport`` trait. | ||
|
||
The following types that may need to be implemented by a custom framed-streaming support library are: | ||
|
||
- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such stream (while writing a response, i.e. by calling ``complete(source)``). | ||
Implementations for JSON are available in ``akka.http.scaladsl.server.JsonSourceRenderingMode``. | ||
- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames | ||
of the higher-level data type format that is understood by the provided unmarshallers. | ||
In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object, | ||
this framing is implemented in ``JsonEntityStreamingSupport``. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.