diff --git a/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala new file mode 100644 index 000000000000..6f353ed4d9eb --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream + +import java.util.concurrent.TimeUnit + +import akka.stream.impl.JsonBracketCounting +import akka.util.ByteString +import org.openjdk.jmh.annotations._ + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class JsonFramingBenchmark { + + /* + Benchmark Mode Cnt Score Error Units + // old + JsonFramingBenchmark.collecting_1 thrpt 20 81.476 ± 14.793 ops/s + JsonFramingBenchmark.collecting_offer_5 thrpt 20 20.187 ± 2.291 ops/s + + // new + JsonFramingBenchmark.counting_1 thrpt 20 10766.738 ± 1278.300 ops/s + JsonFramingBenchmark.counting_offer_5 thrpt 20 28798.255 ± 2670.163 ops/s + */ + + val json = + ByteString( + """|{"fname":"Frank","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin) + + val bracket = new JsonBracketCounting + + @Setup(Level.Invocation) + def init(): Unit = { + bracket.offer(json) + } + + @Benchmark + def counting_1: ByteString = + bracket.poll().get + + @Benchmark + @OperationsPerInvocation(5) + def counting_offer_5: ByteString = { + bracket.offer(json) + bracket.poll().get + bracket.poll().get + bracket.poll().get + bracket.poll().get + bracket.poll().get + bracket.poll().get + } + +} diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala new file mode 100644 index 000000000000..6a6fffe86fea --- /dev/null +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package docs.http.scaladsl.server.directives + +import akka.NotUsed +import akka.http.scaladsl.marshalling.ToResponseMarshallable +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.Accept +import akka.http.scaladsl.server.{ UnsupportedRequestContentTypeRejection, UnacceptedResponseContentTypeRejection, JsonSourceRenderingMode } +import akka.stream.scaladsl.{ Flow, Source } +import docs.http.scaladsl.server.RoutingSpec +import spray.json.{ JsValue, JsObject, DefaultJsonProtocol } + +import scala.concurrent.Future + +class JsonStreamingExamplesSpec extends RoutingSpec { + + //#models + case class Tweet(uid: Int, txt: String) + case class Measurement(id: String, value: Int) + //# + + def getTweets() = + Source(List( + Tweet(1, "#Akka rocks!"), + Tweet(2, "Streaming is so hot right now!"), + Tweet(3, "You cannot enter the same river twice."))) + + //#formats + object MyJsonProtocol extends spray.json.DefaultJsonProtocol { + implicit val userFormat = jsonFormat2(Tweet.apply) + implicit val measurementFormat = jsonFormat2(Measurement.apply) + } + //# + + "spray-json-response-streaming" in { + // [1] import generic spray-json marshallers support: + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ + + // [2] import "my protocol", for marshalling Tweet objects: + import MyJsonProtocol._ + + // [3] pick json rendering mode: + implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine + + val route = + path("users") { + val users: Source[Tweet, NotUsed] = getTweets() + complete(ToResponseMarshallable(users)) + } + + // tests: + val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) + val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`)) + + Get("/users").withHeaders(AcceptJson) ~> route ~> check { + responseAs[String] shouldEqual + """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + + """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + + """{"uid":3,"txt":"You cannot enter the same river twice."}""" + } + + // endpoint can only marshal Json, so it will *reject* requests for application/xml: + Get("/users").withHeaders(AcceptXml) ~> route ~> check { + handled should ===(false) + rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`))) + } + } + + "response-streaming-modes" in { + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ + import MyJsonProtocol._ + implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine + + //#async-rendering + path("users") { + val users: Source[Tweet, NotUsed] = getTweets() + complete(users.renderAsync(parallelism = 8)) + } + //# + + //#async-unordered-rendering + path("users" / "unordered") { + val users: Source[Tweet, NotUsed] = getTweets() + complete(users.renderAsyncUnordered(parallelism = 8)) + } + //# + } + + "spray-json-request-streaming" in { + // [1] import generic spray-json (un)marshallers support: + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ + + // [1.1] import framing mode + implicit val jsonFramingMode = akka.http.scaladsl.server.JsonEntityFramingSupport.bracketCountingJsonFraming(Int.MaxValue) + + // [2] import "my protocol", for unmarshalling Measurement objects: + import MyJsonProtocol._ + + // [3] prepareyour persisting logic here + val persistMetrics = Flow[Measurement] + + val route = + path("metrics") { + // [4] extract Source[Measurement, _] + entity(stream[Measurement]) { measurements => + println("measurements = " + measurements) + val measurementsSubmitted: Future[Int] = + measurements + .via(persistMetrics) + .runFold(0) { (cnt, _) => + println("cnt = " + cnt) + cnt + 1 + } + + complete { + measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n""")) + } + } + } + + // tests: + val data = HttpEntity( + ContentTypes.`application/json`, + """ + |{"id":"temp","value":32} + |{"id":"temp","value":31} + | + """.stripMargin) + + Post("/metrics", entity = data) ~> route ~> check { + status should ===(StatusCodes.OK) + responseAs[String] should ===("""{"msg":"Total metrics received: 2"}""") + } + + // the FramingWithContentType will reject any content type that it does not understand: + val xmlData = HttpEntity( + ContentTypes.`text/xml(UTF-8)`, + """| + |""".stripMargin) + + Post("/metrics", entity = xmlData) ~> route ~> check { + handled should ===(false) + rejection should ===(UnsupportedRequestContentTypeRejection(Set(ContentTypes.`application/json`))) + } + } + +} diff --git a/akka-docs/rst/scala/http/routing-dsl/json-streaming-support.rst b/akka-docs/rst/scala/http/routing-dsl/json-streaming-support.rst new file mode 100644 index 000000000000..34dda6235b00 --- /dev/null +++ b/akka-docs/rst/scala/http/routing-dsl/json-streaming-support.rst @@ -0,0 +1,96 @@ +.. _json-streaming-scala: + +JSON Streaming +============== + +`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON +objects onto one continious HTTP connection. The elements are most often separated using newlines, +however do not have to be and concatenating elements side-by-side or emitting "very long" JSON array is also another +use case. + +In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: models + +And as always with spray-json, we provide our (Un)Marshaller instances as implicit values uding the ``jsonFormat##`` +method to generate them statically: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: formats + +.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming + +Responding with JSON Streams +---------------------------- + +In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_. + +Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an +Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses +spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the +``akka-http-spray-json-experimental`` module. +to and from ``Source[T,_]`` by using spray-json provided + +Next we import our model's marshallers, generated by spray-json. + +The last bit of setup, before we can render a streaming json response + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: spray-json-response-streaming + +.. _Streaming API: https://dev.twitter.com/streaming/overview + +Customising response rendering mode +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The mode in which a response is marshalled and then rendered to the HttpResponse from the provided ``Source[T,_]`` +is customisable (thanks to conversions originating from ``Directives`` via ``EntityStreamingDirectives``). + +Since Marshalling is a potentially asynchronous operation in Akka HTTP (because transforming ``T`` to ``JsValue`` may +potentially take a long time (depending on your definition of "long time"), we allow to run marshalling concurrently +(up to ``parallelism`` concurrent marshallings) by using the ``renderAsync(parallelism)`` mode: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: async-rendering + +The ``renderAsync`` mode perserves ordering of the Source's elements, which may sometimes be a required property, +for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the +data being streamed though, which allows us to explit this property and use ``renderAsyncUnordered(parallelism)``, +which will concurrently marshall up to ``parallelism`` elements and emit the first which is marshalled onto +the HttpResponse: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: async-unordered-rendering + +This allows us to _potentially_ render elements faster onto the HttpResponse, since it can avoid "head of line blocking", +in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall. + +Consuming JSON Streaming uploads +-------------------------------- + +Sometimes the client may be sending in a streaming request, for example an embedded device initiated a connection with +the server and is feeding it with one line of measurement data. + +In this example, we want to consume this data in a streaming fashion from the request entity, and also apply +back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure +will be applied automatically thanks to using Akka HTTP/Streams). + + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: spray-json-request-streaming + +Implementing custom (Un)Marshaller support for JSON streaming +------------------------------------------------------------- + +While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport`` +is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as +Play JSON, or Jackson directly for example). Such support traits will want to extend the ``JsonEntityStreamingSupport`` trait. + +The following types that may need to be implemented by a custom framed-streaming support library are: + +- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such stream (while writing a response, i.e. by calling ``complete(source)``). + Implementations for JSON are available in ``akka.http.scaladsl.server.JsonSourceRenderingMode``. +- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames + of the higher-level data type format that is understood by the provided unmarshallers. + In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object, + this framing is implemented in ``JsonEntityStreamingSupport``. diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala index 99bbb2c2fc21..c62f51314f6d 100644 --- a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala @@ -4,10 +4,13 @@ package akka.http.scaladsl.marshallers.sprayjson +import akka.http.scaladsl.util.FastFuture +import akka.util.ByteString + import scala.language.implicitConversions import akka.http.scaladsl.marshalling.{ ToEntityMarshaller, Marshaller } import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller } -import akka.http.scaladsl.model.{ MediaTypes, HttpCharsets } +import akka.http.scaladsl.model.{ ContentTypes, MediaTypes, HttpCharsets } import akka.http.scaladsl.model.MediaTypes.`application/json` import spray.json._ @@ -19,6 +22,10 @@ trait SprayJsonSupport { sprayJsonUnmarshaller(reader) implicit def sprayJsonUnmarshaller[T](implicit reader: RootJsonReader[T]): FromEntityUnmarshaller[T] = sprayJsValueUnmarshaller.map(jsonReader[T].read) + implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] = + Unmarshaller.withMaterializer[ByteString, JsValue](_ ⇒ implicit mat ⇒ { bs ⇒ + FastFuture.successful(JsonParser(bs.toArray[Byte])) + }).map(jsonReader[T].read) implicit def sprayJsValueUnmarshaller: FromEntityUnmarshaller[JsValue] = Unmarshaller.byteStringUnmarshaller.forContentTypes(`application/json`).mapWithCharset { (data, charset) ⇒ val input = @@ -29,9 +36,11 @@ trait SprayJsonSupport { implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] = sprayJsonMarshaller[T](writer, printer) - implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] = + implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] = sprayJsValueMarshaller compose writer.write implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[JsValue] = Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer) + implicit def sprayByteStringMarshaller[T](implicit writer: RootJsonFormat[T], printer: JsonPrinter = CompactPrinter): Marshaller[T, ByteString] = + sprayJsValueMarshaller.map(s ⇒ ByteString(s.toString)) compose writer.write } -object SprayJsonSupport extends SprayJsonSupport \ No newline at end of file +object SprayJsonSupport extends SprayJsonSupport diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala index b92bb8d97ad3..489ea7375a6b 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala @@ -36,5 +36,6 @@ trait Directives extends RouteConcatenation with SchemeDirectives with SecurityDirectives with WebSocketDirectives + with FramedEntityStreamingDirectives object Directives extends Directives diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/JsonEntityStreaming.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/JsonEntityStreaming.scala new file mode 100644 index 000000000000..228aaf96a825 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/JsonEntityStreaming.scala @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.http.scaladsl.server + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.http.impl.util.JavaMapping +import akka.http.scaladsl.model.{ ContentType, ContentTypes } +import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives.SourceRenderingMode +import akka.stream.scaladsl.{ Flow, Framing } +import akka.util.ByteString +import com.typesafe.config.Config + +import scala.collection.immutable + +/** + * Same as [[akka.stream.scaladsl.Framing]] but additionally can express which [[ContentType]] it supports, + * which can be used to reject routes if content type does not match used framing. + */ +abstract class FramingWithContentType extends Framing { + def flow: Flow[ByteString, ByteString, NotUsed] + def supported: immutable.Set[ContentType] + def isSupported(ct: akka.http.javadsl.model.ContentType): Boolean = supported(JavaMapping.ContentType.toScala(ct)) +} +object FramingWithContentType { + def apply(framing: Flow[ByteString, ByteString, NotUsed], contentType: ContentType, moreContentTypes: ContentType*) = + new FramingWithContentType { + override def flow = framing + + override val supported: immutable.Set[ContentType] = + if (moreContentTypes.isEmpty) Set(contentType) + else Set(contentType) ++ moreContentTypes + } +} + +/** + * Json entity streaming support, independent of used Json parsing library. + * + * Can be extended by various Support traits (e.g. "SprayJsonSupport"), + * in order to provide users with both `framing` (this trait) and `marshalling` + * (implemented by a library) by using a single trait. + */ +trait JsonEntityFramingSupport { + + /** `application/json` specific Framing implementation */ + def bracketCountingJsonFraming(maximumObjectLength: Int) = new FramingWithContentType { + override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength)) + + override val supported: immutable.Set[ContentType] = Set(ContentTypes.`application/json`) + } +} +object JsonEntityFramingSupport extends JsonEntityFramingSupport + +/** + * Specialised rendering mode for streaming elements as JSON. + * + * See also: JSON Streaming on Wikipedia. + */ +trait JsonSourceRenderingMode extends SourceRenderingMode { + override val contentType = ContentTypes.`application/json` +} + +object JsonSourceRenderingMode { + + /** + * Most compact rendering mode + * It does not intersperse any separator between the signalled elements. + * + * {{{ + * {"id":42}{"id":43}{"id":44} + * }}} + */ + object Compact extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString.empty + override val end: ByteString = ByteString.empty + } + + /** + * Simple rendering mode, similar to [[Compact]] however interspersing elements with a `\n` character. + * + * {{{ + * {"id":42},{"id":43},{"id":44} + * }}} + */ + object CompactCommaSeparated extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString(",") + override val end: ByteString = ByteString.empty + } + + /** + * Rendering mode useful when the receiving end expects a valid JSON Array. + * It can be useful when the client wants to detect when the stream has been successfully received in-full, + * which it can determine by seeing the terminating `]` character. + * + * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, + * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. + * + * {{{ + * [{"id":42},{"id":43},{"id":44}] + * }}} + */ + object CompactArray extends JsonSourceRenderingMode { + override val start: ByteString = ByteString("[") + override val between: ByteString = ByteString(",") + override val end: ByteString = ByteString("]") + } + + /** + * Recommended rendering mode. + * + * It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements). + * A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API). + * + * {{{ + * {"id":42} + * {"id":43} + * {"id":44} + * }}} + */ + object LineByLine extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString("\n") + override val end: ByteString = ByteString.empty + } + + /** + * Simple rendering mode interspersing each pair of elements with both `,\n`. + * Picking the [[LineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma). + * + * {{{ + * {"id":42}, + * {"id":43}, + * {"id":44} + * }}} + */ + object LineByLineCommaSeparated extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString(",\n") + override val end: ByteString = ByteString.empty + } + +} + +object JsonStreamingSettings { + + def apply(sys: ActorSystem): JsonStreamingSettings = + apply(sys.settings.config.getConfig("akka.http.json-streaming")) + + def apply(c: Config): JsonStreamingSettings = { + JsonStreamingSettings( + c.getInt("max-object-size"), + renderingMode(c.getString("rendering-mode"))) + } + + def renderingMode(name: String): SourceRenderingMode = name match { + case "line-by-line" ⇒ JsonSourceRenderingMode.LineByLine // the default + case "line-by-line-comma-separated" ⇒ JsonSourceRenderingMode.LineByLineCommaSeparated + case "compact" ⇒ JsonSourceRenderingMode.Compact + case "compact-comma-separated" ⇒ JsonSourceRenderingMode.CompactCommaSeparated + case "compact-array" ⇒ JsonSourceRenderingMode.CompactArray + } +} +final case class JsonStreamingSettings( + maxObjectSize: Int, + style: SourceRenderingMode) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala new file mode 100644 index 000000000000..bb8a074fd662 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.http.scaladsl.server.directives + +import akka.NotUsed +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.FramingWithContentType +import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller, _ } +import akka.http.scaladsl.util.FastFuture +import akka.stream.Materializer +import akka.stream.impl.ConstantFun +import akka.stream.scaladsl.{ Flow, Source } +import akka.util.ByteString + +import scala.concurrent.ExecutionContext +import scala.language.implicitConversions + +/** + * Allows the [[MarshallingDirectives.entity]] directive to extract a `stream[T]` for framed messages. + * See `JsonEntityStreamingSupport` and classes extending it, such as `SprayJsonSupport` to get marshallers. + */ +trait FramedEntityStreamingDirectives extends MarshallingDirectives { + import FramedEntityStreamingDirectives._ + + type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]] + + // TODO DOCS + + final def stream[T](implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + streamAsync(1)(um, framing) + final def stream[T](framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = + streamAsync(1)(um, framing) + + final def streamAsync[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + streamInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat))) + final def streamAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = + streamAsync(parallelism)(um, framing) + + final def streamAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + streamInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat))) + final def streamAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = + streamAsyncUnordered(parallelism)(um, framing) + + // TODO materialized value may want to be "drain/cancel" or something like it? + // TODO could expose `streamMat`, for more fine grained picking of Marshaller + + // format: OFF + private def streamInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] = + Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ + val entity = req.entity + if (!framing.supported(entity.contentType)) { + val supportedContentTypes = framing.supported.map(ContentTypeRange(_)) + FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(supportedContentTypes)) + } else { + val stream = entity.dataBytes.via(framing.flow).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) + FastFuture.successful(stream) + } + } + // format: ON + + // TODO note to self - we need the same of ease of streaming stuff for the client side - i.e. the twitter firehose case. + + implicit def _sourceMarshaller[T, M](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[Source[T, M]] = + Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ + FastFuture successful { + Marshalling.WithFixedContentType(mode.contentType, () ⇒ { // TODO charset? + val bytes = source + .mapAsync(1)(t ⇒ Marshal(t).to[HttpEntity]) + .map(_.dataBytes) + .flatMapConcat(ConstantFun.scalaIdentityFunction) + .intersperse(mode.start, mode.between, mode.end) + HttpResponse(entity = HttpEntity(mode.contentType, bytes)) + }) :: Nil + } + } + + implicit def _sourceParallelismMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncRenderingOf[T]] = + Marshaller[AsyncRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒ + FastFuture successful { + Marshalling.WithFixedContentType(mode.contentType, () ⇒ { // TODO charset? + val bytes = rendering.source + .mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity]) + .map(_.dataBytes) + .flatMapConcat(ConstantFun.scalaIdentityFunction) + .intersperse(mode.start, mode.between, mode.end) + HttpResponse(entity = HttpEntity(mode.contentType, bytes)) + }) :: Nil + } + } + + implicit def _sourceUnorderedMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncUnorderedRenderingOf[T]] = + Marshaller[AsyncUnorderedRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒ + FastFuture successful { + Marshalling.WithFixedContentType(mode.contentType, () ⇒ { // TODO charset? + val bytes = rendering.source + .mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity]) + .map(_.dataBytes) + .flatMapConcat(ConstantFun.scalaIdentityFunction) + .intersperse(mode.start, mode.between, mode.end) + HttpResponse(entity = HttpEntity(mode.contentType, bytes)) + }) :: Nil + } + } + + // special rendering modes + + implicit def enableSpecialSourceRenderingModes[T](source: Source[T, Any]): EnableSpecialSourceRenderingModes[T] = + new EnableSpecialSourceRenderingModes(source) + +} +object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives { + /** + * Defines ByteStrings to be injected before the first, between, and after all elements of a [[Source]], + * when used to complete a request. + * + * A typical example would be rendering a ``Source[T, _]`` as JSON array, + * where start is `[`, between is `,`, and end is `]` - which procudes a valid json array, assuming each element can + * be properly marshalled as JSON object. + * + * The corresponding values will typically be put into an [[Source.intersperse]] call on the to-be-rendered Source. + */ + trait SourceRenderingMode { + def contentType: ContentType + // def charset: HttpCharset = HttpCharsets.`UTF-8` + + def start: ByteString + def between: ByteString + def end: ByteString + } + + final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) + final class AsyncUnorderedRenderingOf[T](val source: Source[T, Any], val parallelism: Int) + +} + +final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal { + def renderAsync(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncRenderingOf(source, parallelism) + def renderAsyncUnordered(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncUnorderedRenderingOf(source, parallelism) +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala new file mode 100644 index 000000000000..ccd58690dde6 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -0,0 +1,444 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorMaterializer +import akka.stream.impl.JsonBracketCounting +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.scaladsl.{ JsonFraming, Framing, Source } +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.AkkaSpec +import akka.util.ByteString +import org.scalatest.concurrent.ScalaFutures + +import scala.collection.immutable.Seq +import scala.concurrent.Await +import scala.concurrent.duration._ + +class JsonFramingSpec extends AkkaSpec { + + implicit val mat = ActorMaterializer() + + "collecting multiple json" should { + "xoxo parse json array" in { + val input = + """ + |[ + | { "name" : "john" }, + | { "name" : "jack" }, + | { "name" : "katie" } + |] + |""".stripMargin // also should complete once notices end of array + + val result = Source.single(ByteString(input)) + .via(JsonFraming.bracketCounting(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + result.futureValue shouldBe Seq( + """{ "name" : "john" }""".stripMargin, + """{ "name" : "jack" }""".stripMargin, + """{ "name" : "katie" }""".stripMargin) + } + + "emit single json element from string" in { + val input = + """| { "name": "john" } + | { "name": "jack" } + """.stripMargin + + val result = Source.single(ByteString(input)) + .via(JsonFraming.bracketCounting(Int.MaxValue)) + .take(1) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + Await.result(result, 3.seconds) shouldBe Seq("""{ "name": "john" }""".stripMargin) + } + + "parse line delimited" in { + val input = + """| { "name": "john" } + | { "name": "jack" } + | { "name": "katie" } + """.stripMargin + + val result = Source.single(ByteString(input)) + .via(JsonFraming.bracketCounting(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + Await.result(result, 3.seconds) shouldBe Seq( + """{ "name": "john" }""".stripMargin, + """{ "name": "jack" }""".stripMargin, + """{ "name": "katie" }""".stripMargin) + } + + "parse comma delimited" in { + val input = + """ + | { "name": "john" }, { "name": "jack" }, { "name": "katie" } + """.stripMargin + + val result = Source.single(ByteString(input)) + .via(JsonFraming.bracketCounting(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + result.futureValue shouldBe Seq( + """{ "name": "john" }""".stripMargin, + """{ "name": "jack" }""", + """{ "name": "katie" }""") + } + + "parse chunks successfully" in { + val input: Seq[ByteString] = Seq( + """ + |[ + | { "name": "john"""".stripMargin, + """ + |}, + """.stripMargin, + """{ "na""", + """me": "jack""", + """"}]"""").map(ByteString(_)) + + val result = Source.apply(input) + .via(JsonFraming.bracketCounting(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + result.futureValue shouldBe Seq( + """{ "name": "john" + |}""".stripMargin, + """{ "name": "jack"}""") + } + } + + // TODO fold these specs into the previous section + "collecting json buffer" when { + "nothing is supplied" should { + "return nothing" in { + val buffer = new JsonBracketCounting() + buffer.poll() should ===(None) + } + } + + "valid json is supplied" which { + "has one object" should { + "successfully parse empty object" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""{}""")) + buffer.poll().get.utf8String shouldBe """{}""" + } + + "successfully parse single field having string value" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""{ "name": "john"}""")) + buffer.poll().get.utf8String shouldBe """{ "name": "john"}""" + } + + "successfully parse single field having string value containing space" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""{ "name": "john doe"}""")) + buffer.poll().get.utf8String shouldBe """{ "name": "john doe"}""" + } + + "successfully parse single field having string value containing curly brace" in { + val buffer = new JsonBracketCounting() + + buffer.offer(ByteString("""{ "name": "john{""")) + buffer.offer(ByteString("}")) + buffer.offer(ByteString("\"")) + buffer.offer(ByteString("}")) + + buffer.poll().get.utf8String shouldBe """{ "name": "john{}"}""" + } + + "successfully parse single field having string value containing curly brace and escape character" in { + val buffer = new JsonBracketCounting() + + buffer.offer(ByteString("""{ "name": "john""")) + buffer.offer(ByteString("\\\"")) + buffer.offer(ByteString("{")) + buffer.offer(ByteString("}")) + buffer.offer(ByteString("\\\"")) + buffer.offer(ByteString(" ")) + buffer.offer(ByteString("hey")) + buffer.offer(ByteString("\"")) + + buffer.offer(ByteString("}")) + buffer.poll().get.utf8String shouldBe """{ "name": "john\"{}\" hey"}""" + } + + "successfully parse single field having integer value" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""{ "age": 101}""")) + buffer.poll().get.utf8String shouldBe """{ "age": 101}""" + } + + "successfully parse single field having decimal value" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""{ "age": 101}""")) + buffer.poll().get.utf8String shouldBe """{ "age": 101}""" + } + + "successfully parse single field having nested object" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString( + """ + |{ "name": "john", + | "age": 101, + | "address": { + | "street": "Straight Street", + | "postcode": 1234 + | } + |} + | """.stripMargin)) + buffer.poll().get.utf8String shouldBe """{ "name": "john", + | "age": 101, + | "address": { + | "street": "Straight Street", + | "postcode": 1234 + | } + |}""".stripMargin + } + + "successfully parse single field having multiple level of nested object" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString( + """ + |{ "name": "john", + | "age": 101, + | "address": { + | "street": { + | "name": "Straight", + | "type": "Avenue" + | }, + | "postcode": 1234 + | } + |} + | """.stripMargin)) + buffer.poll().get.utf8String shouldBe """{ "name": "john", + | "age": 101, + | "address": { + | "street": { + | "name": "Straight", + | "type": "Avenue" + | }, + | "postcode": 1234 + | } + |}""".stripMargin + } + } + + "has nested array" should { + "successfully parse" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString( + """ + |{ "name": "john", + | "things": [ + | 1, + | "hey", + | 3, + | "there" + | ] + |} + | """.stripMargin)) + buffer.poll().get.utf8String shouldBe """{ "name": "john", + | "things": [ + | 1, + | "hey", + | 3, + | "there" + | ] + |}""".stripMargin + } + } + + "has complex object graph" should { + "successfully parse" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString( + """ + |{ + | "name": "john", + | "addresses": [ + | { + | "street": "3 Hopson Street", + | "postcode": "ABC-123", + | "tags": ["work", "office"], + | "contactTime": [ + | {"time": "0900-1800", "timezone", "UTC"} + | ] + | }, + | { + | "street": "12 Adielie Road", + | "postcode": "ZZY-888", + | "tags": ["home"], + | "contactTime": [ + | {"time": "0800-0830", "timezone", "UTC"}, + | {"time": "1800-2000", "timezone", "UTC"} + | ] + | } + | ] + |} + | """.stripMargin)) + + buffer.poll().get.utf8String shouldBe """{ + | "name": "john", + | "addresses": [ + | { + | "street": "3 Hopson Street", + | "postcode": "ABC-123", + | "tags": ["work", "office"], + | "contactTime": [ + | {"time": "0900-1800", "timezone", "UTC"} + | ] + | }, + | { + | "street": "12 Adielie Road", + | "postcode": "ZZY-888", + | "tags": ["home"], + | "contactTime": [ + | {"time": "0800-0830", "timezone", "UTC"}, + | {"time": "1800-2000", "timezone", "UTC"} + | ] + | } + | ] + |}""".stripMargin + } + } + + "has multiple fields" should { + "parse successfully" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""{ "name": "john", "age": 101}""")) + buffer.poll().get.utf8String shouldBe """{ "name": "john", "age": 101}""" + } + + "parse successfully despite valid whitespaces around json" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString( + """ + | + | + |{"name": "john" + |, "age": 101}""".stripMargin)) + buffer.poll().get.utf8String shouldBe + """{"name": "john" + |, "age": 101}""".stripMargin + } + } + + "has multiple objects" should { + "pops the right object as buffer is filled" in { + val input = + """ + | { + | "name": "john", + | "age": 32 + | }, + | { + | "name": "katie", + | "age": 25 + | } + """.stripMargin + + val buffer = new JsonBracketCounting() + buffer.offer(ByteString(input)) + + buffer.poll().get.utf8String shouldBe + """{ + | "name": "john", + | "age": 32 + | }""".stripMargin + buffer.poll().get.utf8String shouldBe + """{ + | "name": "katie", + | "age": 25 + | }""".stripMargin + buffer.poll() should ===(None) + + buffer.offer(ByteString("""{"name":"jenkins","age": """)) + buffer.poll() should ===(None) + + buffer.offer(ByteString("65 }")) + buffer.poll().get.utf8String shouldBe """{"name":"jenkins","age": 65 }""" + } + } + + "returns none until valid json is encountered" in { + val buffer = new JsonBracketCounting() + + """{ "name": "john"""".stripMargin.foreach { + c ⇒ + buffer.offer(ByteString(c)) + buffer.poll() should ===(None) + } + + buffer.offer(ByteString("}")) + buffer.poll().get.utf8String shouldBe """{ "name": "john"}""" + } + + "invalid json is supplied" should { + "fail if it's broken from the start" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""THIS IS NOT VALID { "name": "john"}""")) + a[FramingException] shouldBe thrownBy { buffer.poll() } + } + + "fail if it's broken at the end" in { + val buffer = new JsonBracketCounting() + buffer.offer(ByteString("""{ "name": "john"} THIS IS NOT VALID""")) + buffer.poll() // first emitting the valid element + a[FramingException] shouldBe thrownBy { buffer.poll() } + } + } + } + + "fail on too large initial object" in { + val input = + """ + | { "name": "john" }, { "name": "jack" } + """.stripMargin + + val result = Source.single(ByteString(input)) + .via(JsonFraming.bracketCounting(5)).map(_.utf8String) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry) + } + + a[FramingException] shouldBe thrownBy { + Await.result(result, 3.seconds) + } + } + + "fail when 2nd object is too large" in { + val input = List( + """{ "name": "john" }""", + """{ "name": "jack" }""", + """{ "name": "very very long name somehow. how did this happen?" }""").map(s ⇒ ByteString(s)) + + val probe = Source(input) + .via(JsonFraming.bracketCounting(48)) + .runWith(TestSink.probe) + + probe.ensureSubscription() + probe + .request(1) + .expectNext(ByteString("""{ "name": "john" }""")) // FIXME we should not impact the given json in Framing + .request(1) + .expectNext(ByteString("""{ "name": "jack" }""")) + .request(1) + .expectError().getMessage should include("exceeded") + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonBracketCounting.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonBracketCounting.scala new file mode 100644 index 000000000000..7bd9e18ca277 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonBracketCounting.scala @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.scaladsl.Framing.FramingException +import akka.util.ByteString + +import scala.annotation.switch + +object JsonBracketCounting { + + final val SquareBraceStart = "[".getBytes.head + final val SquareBraceEnd = "]".getBytes.head + final val CurlyBraceStart = "{".getBytes.head + final val CurlyBraceEnd = "}".getBytes.head + final val DoubleQuote = "\"".getBytes.head + final val Backslash = "\\".getBytes.head + final val Comma = ",".getBytes.head + + final val LineBreak = '\n'.toByte + final val LineBreak2 = '\r'.toByte + final val Tab = '\t'.toByte + final val Space = ' '.toByte + + final val Whitespace = Set(LineBreak, LineBreak2, Tab, Space) + + def isWhitespace(input: Byte): Boolean = + Whitespace.contains(input) + +} + +/** + * **Mutable** framing implementation that given any number of [[ByteString]] chunks, can emit JSON objects contained within them. + * Typically JSON objects are separated by new-lines or comas, however a top-level JSON Array can also be understood and chunked up + * into valid JSON objects by this framing implementation. + * + * Leading whitespace between elements will be trimmed. + */ +class JsonBracketCounting(maximumObjectLength: Int = Int.MaxValue) { + import JsonBracketCounting._ + + private var buffer: ByteString = ByteString.empty + + private var pos = 0 // latest position of pointer while scanning for json object end + private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc) + private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted + + private var charsInObject = 0 + private var completedObject = false + private var inStringExpression = false + private var isStartOfEscapeSequence = false + + /** + * Appends input ByteString to internal byte string buffer. + * Use [[poll]] to extract contained JSON objects. + */ + def offer(input: ByteString): Unit = + buffer ++= input + + def isEmpty: Boolean = buffer.isEmpty + + /** + * Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found. + * May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded. + */ + def poll(): Option[ByteString] = { + val foundObject = seekObject() + if (!foundObject) None + else + (pos: @switch) match { + case -1 | 0 ⇒ None + case _ ⇒ + val (emit, buf) = buffer.splitAt(pos) + buffer = buf.compact + pos = 0 + + val tf = trimFront + trimFront = 0 + + if (tf == 0) Some(emit) + else { + val trimmed = emit.drop(tf) + if (trimmed.isEmpty) None + else Some(trimmed) + } + } + } + + /** @return true if an entire valid JSON object was found, false otherwise */ + private def seekObject(): Boolean = { + completedObject = false + val bufSize = buffer.size + while (pos != -1 && (pos < bufSize && pos < maximumObjectLength) && !completedObject) + proceed(buffer(pos)) + + if (pos >= maximumObjectLength) + throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""") + + completedObject + } + + private def proceed(input: Byte): Unit = + if (input == SquareBraceStart && outsideObject) { + // outer object is an array + pos += 1 + trimFront += 1 + } else if (input == SquareBraceEnd && outsideObject) { + // outer array completed! + pos = -1 + } else if (input == Comma && outsideObject) { + // do nothing + pos += 1 + trimFront += 1 + } else if (input == Backslash) { + isStartOfEscapeSequence = true + pos += 1 + } else if (input == DoubleQuote) { + if (!isStartOfEscapeSequence) inStringExpression = !inStringExpression + isStartOfEscapeSequence = false + pos += 1 + } else if (input == CurlyBraceStart && !inStringExpression) { + isStartOfEscapeSequence = false + depth += 1 + pos += 1 + } else if (input == CurlyBraceEnd && !inStringExpression) { + isStartOfEscapeSequence = false + depth -= 1 + pos += 1 + if (depth == 0) { + charsInObject = 0 + completedObject = true + } + } else if (isWhitespace(input) && !inStringExpression) { + pos += 1 + if (depth == 0) trimFront += 1 + } else if (insideObject) { + isStartOfEscapeSequence = false + pos += 1 + } else { + throw new FramingException(s"Invalid JSON encountered as position [$pos] of [$buffer]") + } + + @inline private final def insideObject: Boolean = + !outsideObject + + @inline private final def outsideObject: Boolean = + depth == 0 + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala new file mode 100644 index 000000000000..3fb3c28638e9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.javadsl + +import akka.NotUsed +import akka.util.ByteString + +/** Provides JSON framing stages that can separate valid JSON objects from incoming [[akka.util.ByteString]] objects. */ +object JsonFraming { + + /** + * Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks. + * + * Typical examples of data that one may want to frame using this stage include: + * + * **Very large arrays**: + * {{{ + * [{"id": 1}, {"id": 2}, [...], {"id": 999}] + * }}} + * + * **Multiple concatenated JSON objects** (with, or without commas between them): + * + * {{{ + * {"id": 1}, {"id": 2}, [...], {"id": 999} + * }}} + * + * The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two + * elements are separated by multiple newlines or other whitespace characters. And of course is insensitive + * (and does not impact the emitting frame) to the JSON object's internal formatting. + * + * Framing raw JSON values (such as integers or strings) is supported as well. + * + * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded + * this Flow will fail the stream. + */ + def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = + akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index 7ff38cae3690..d68b19560ea1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -287,3 +287,11 @@ object Framing { } } + +/** + * Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example. + * Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP). + */ +trait Framing { + def flow: Flow[ByteString, ByteString, NotUsed] +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala new file mode 100644 index 000000000000..0a48c0a3c481 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.Attributes +import akka.stream.impl.JsonBracketCounting +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.stage.{ InHandler, OutHandler, GraphStageLogic } +import akka.util.ByteString + +import scala.util.control.NonFatal + +/** Provides JSON framing stages that can separate valid JSON objects from incoming [[ByteString]] objects. */ +object JsonFraming { + + /** + * Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks. + * + * Typical examples of data that one may want to frame using this stage include: + * + * **Very large arrays**: + * {{{ + * [{"id": 1}, {"id": 2}, [...], {"id": 999}] + * }}} + * + * **Multiple concatenated JSON objects** (with, or without commas between them): + * + * {{{ + * {"id": 1}, {"id": 2}, [...], {"id": 999} + * }}} + * + * The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two + * elements are separated by multiple newlines or other whitespace characters. And of course is insensitive + * (and does not impact the emitting frame) to the JSON object's internal formatting. + * + * Framing raw JSON values (such as integers or strings) is supported as well. + * + * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded + * this Flow will fail the stream. + */ + def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] { + private[this] val buffer = new JsonBracketCounting(maximumObjectLength) + + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) + + override def onPush(): Unit = { + buffer.offer(grab(in)) + tryPopBuffer() + } + + override def onPull(): Unit = + tryPopBuffer() + + override def onUpstreamFinish(): Unit = + if (buffer.isEmpty) completeStage() + + def tryPopBuffer() = { + try buffer.poll() match { + case Some(json) ⇒ push(out, json) + case _ ⇒ if (isClosed(in)) completeStage() else pull(in) + } catch { + case NonFatal(ex) ⇒ failStage(ex) + } + } + } + }).named("jsonFraming(BracketCounting)") + +}