diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 78dd4ff505e..4e3461fb691 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -91,7 +91,7 @@ object ConsistentHashingRouter { * INTERNAL API */ private[akka] def hashMappingAdapter(mapper: ConsistentHashMapper): ConsistentHashMapping = { - case message if (mapper.hashKey(message).asInstanceOf[AnyRef] ne null) ⇒ + case message if mapper.hashKey(message).asInstanceOf[AnyRef] ne null ⇒ mapper.hashKey(message) } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala new file mode 100644 index 00000000000..8e7cfe884e8 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream + +import java.util.concurrent.TimeUnit + +import akka.stream.impl.JsonObjectParser +import akka.util.ByteString +import org.openjdk.jmh.annotations._ + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class JsonFramingBenchmark { + + /* + Benchmark Mode Cnt Score Error Units + // old + JsonFramingBenchmark.collecting_1 thrpt 20 81.476 ± 14.793 ops/s + JsonFramingBenchmark.collecting_offer_5 thrpt 20 20.187 ± 2.291 ops/s + + // new + JsonFramingBenchmark.counting_1 thrpt 20 10766.738 ± 1278.300 ops/s + JsonFramingBenchmark.counting_offer_5 thrpt 20 28798.255 ± 2670.163 ops/s + */ + + val json = + ByteString( + """|{"fname":"Frank","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, + |{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin + ) + + val bracket = new JsonObjectParser + + @Setup(Level.Invocation) + def init(): Unit = { + bracket.offer(json) + } + + @Benchmark + def counting_1: ByteString = + bracket.poll().get + + @Benchmark + @OperationsPerInvocation(5) + def counting_offer_5: ByteString = { + bracket.offer(json) + bracket.poll().get + bracket.poll().get + bracket.poll().get + bracket.poll().get + bracket.poll().get + bracket.poll().get + } + +} diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java new file mode 100644 index 00000000000..c43d6241e53 --- /dev/null +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package docs.http.javadsl.server; + +import akka.NotUsed; +import akka.http.javadsl.common.CsvEntityStreamingSupport; +import akka.http.javadsl.common.JsonEntityStreamingSupport; +import akka.http.javadsl.marshallers.jackson.Jackson; +import akka.http.javadsl.marshalling.Marshaller; +import akka.http.javadsl.model.*; +import akka.http.javadsl.model.headers.Accept; +import akka.http.javadsl.server.*; +import akka.http.javadsl.testkit.JUnitRouteTest; +import akka.http.javadsl.testkit.TestRoute; +import akka.http.javadsl.unmarshalling.StringUnmarshallers; +import akka.http.javadsl.common.EntityStreamingSupport; +import akka.http.javadsl.unmarshalling.Unmarshaller; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import org.junit.Test; + +import java.util.concurrent.CompletionStage; + +public class JsonStreamingExamplesTest extends JUnitRouteTest { + + //#routes + final Route tweets() { + //#formats + final Unmarshaller JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class); + //#formats + + //#response-streaming + + // Step 1: Enable JSON streaming + // we're not using this in the example, but it's the simplest way to start: + // The default rendering is a JSON array: `[el, el, el , ...]` + final JsonEntityStreamingSupport jsonStreaming = EntityStreamingSupport.json(); + + // Step 1.1: Enable and customise how we'll render the JSON, as a compact array: + final ByteString start = ByteString.fromString("["); + final ByteString between = ByteString.fromString(","); + final ByteString end = ByteString.fromString("]"); + final Flow compactArrayRendering = + Flow.of(ByteString.class).intersperse(start, between, end); + + final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json() + .withFramingRendererFlow(compactArrayRendering); + + + // Step 2: implement the route + final Route responseStreaming = path("tweets", () -> + get(() -> + parameter(StringUnmarshallers.INTEGER, "n", n -> { + final Source tws = + Source.repeat(new JavaTweet(12, "Hello World!")).take(n); + + // Step 3: call complete* with your source, marshaller, and stream rendering mode + return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport); + }) + ) + ); + //#response-streaming + + //#incoming-request-streaming + final Route incomingStreaming = path("tweets", () -> + post(() -> + extractMaterializer(mat -> { + final JsonEntityStreamingSupport jsonSupport = EntityStreamingSupport.json(); + + return entityAsSourceOf(JavaTweets, jsonSupport, sourceOfTweets -> { + final CompletionStage tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat); + return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c)); + }); + } + ) + ) + ); + //#incoming-request-streaming + + return responseStreaming.orElse(incomingStreaming); + } + + final Route csvTweets() { + //#csv-example + final Marshaller renderAsCsv = + Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t -> + ByteString.fromString(t.getId() + "," + t.getMessage()) + ); + + final CsvEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.csv(); + + final Route responseStreaming = path("tweets", () -> + get(() -> + parameter(StringUnmarshallers.INTEGER, "n", n -> { + final Source tws = + Source.repeat(new JavaTweet(12, "Hello World!")).take(n); + return completeWithSource(tws, renderAsCsv, compactJsonSupport); + }) + ) + ); + //#csv-example + + return responseStreaming; + } + //#routes + + @Test + public void getTweetsTest() { + //#response-streaming + // tests: + final TestRoute routes = testRoute(tweets()); + + // test happy path + final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON)); + routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication)) + .assertStatusCode(200) + .assertEntity("[{\"id\":12,\"message\":\"Hello World!\"},{\"id\":12,\"message\":\"Hello World!\"}]"); + + // test responses to potential errors + final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT); + routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText)) + .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406 + .assertEntity("Resource representation is only available with these types:\napplication/json"); + //#response-streaming + } + + @Test + public void csvExampleTweetsTest() { + //#response-streaming + // tests -------------------------------------------- + final TestRoute routes = testRoute(csvTweets()); + + // test happy path + final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV)); + routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv)) + .assertStatusCode(200) + .assertEntity("12,Hello World!\n" + + "12,Hello World!"); + + // test responses to potential errors + final Accept acceptText = Accept.create(MediaRanges.ALL_APPLICATION); + routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText)) + .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406 + .assertEntity("Resource representation is only available with these types:\ntext/csv; charset=UTF-8"); + //#response-streaming + } + + //#models + private static final class JavaTweet { + private int id; + private String message; + + public JavaTweet(int id, String message) { + this.id = id; + this.message = message; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + } + //#models +} diff --git a/akka-docs/rst/java/http/routing-dsl/index.rst b/akka-docs/rst/java/http/routing-dsl/index.rst index f84f20759fe..85fff2bcb51 100644 --- a/akka-docs/rst/java/http/routing-dsl/index.rst +++ b/akka-docs/rst/java/http/routing-dsl/index.rst @@ -18,6 +18,7 @@ To use the high-level API you need to add a dependency to the ``akka-http-experi directives/index marshalling exception-handling + source-streaming-support rejections testkit @@ -51,7 +52,6 @@ in the :ref:`exception-handling-java` section of the documtnation. You can use t File uploads ^^^^^^^^^^^^ -TODO not possible in Java DSL since there For high level directives to handle uploads see the :ref:`FileUploadDirectives-java`. diff --git a/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst new file mode 100644 index 00000000000..acf46421b62 --- /dev/null +++ b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst @@ -0,0 +1,91 @@ +.. _json-streaming-java: + +Source Streaming +================ + +Akka HTTP supports completing a request with an Akka ``Source``, which makes it possible to easily build +and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack. + +It is possible to complete requests with raw ``Source``, however often it is more convenient to +stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, +or CSV stream (where each element is separated by a new-line). + +In the following sections we investigate how to make use of the JSON Streaming infrastructure, +however the general hints apply to any kind of element-by-element streaming you could imagine. + +JSON Streaming +============== + +`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON +objects as a continuous HTTP request or response. The elements are most often separated using newlines, +however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another +use case. + +In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as: + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#models + +.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming + +Responding with JSON Streams +---------------------------- + +In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_. + +Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an +Akka Streams ``Source``. Here we'll use the ``Jackson`` helper class from ``akka-http-jackson`` (a separate library +that you should add as a dependency if you want to use Jackson with Akka HTTP). + +First we enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 1). + +The default mode of rendering a ``Source`` is to represent it as an JSON Array. If you want to change this representation +for example to use Twitter style new-line separated JSON objects, you can do so by configuring the support trait accordingly. + +In Step 1.1. we demonstrate to configure configude the rendering to be new-line separated, and also how parallel marshalling +can be applied. We configure the Support object to render the JSON as series of new-line separated JSON objects, +simply by providing the ``start``, ``sep`` and ``end`` ByteStrings, which will be emitted at the apropriate +places in the rendered stream. Although this format is *not* valid JSON, it is pretty popular since parsing it is relatively +simple - clients need only to find the new-lines and apply JSON unmarshalling for an entire line of JSON. + +The final step is simply completing a request using a Source of tweets, as simple as that: + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#response-streaming + +.. _Streaming API: https://dev.twitter.com/streaming/overview + +Consuming JSON Streaming uploads +-------------------------------- + +Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with +the server and is feeding it with one line of measurement data. + +In this example, we want to consume this data in a streaming fashion from the request entity, and also apply +back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure +will be applied automatically thanks to using Akka HTTP/Streams). + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#formats + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#incoming-request-streaming + + +Simple CSV streaming example +---------------------------- + +Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values). +For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming +modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available, +and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see +an error during runtime that the marshaller did not expose the expected content type and thus we can not render +the streaming response). + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#csv-example + +Implementing custom EntityStreamingSupport traits +------------------------------------------------- + +The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type +or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller`` +instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON). + +When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class, +and implement all of it's methods. It's best to use the existing implementations as a guideline. diff --git a/akka-docs/rst/java/typed-actors.rst b/akka-docs/rst/java/typed-actors.rst index 11160b69e15..0b6622023c9 100644 --- a/akka-docs/rst/java/typed-actors.rst +++ b/akka-docs/rst/java/typed-actors.rst @@ -25,7 +25,7 @@ lies in interfacing between private sphere and the public, but you don’t want that many doors inside your house, do you? For a longer discussion see `this blog post `_. -A bit more background: TypedActors can very easily be abused as RPC, and that +A bit more background: TypedActors can easily be abused as RPC, and that is an abstraction which is `well-known `_ to be leaky. Hence TypedActors are not what we think of first when we talk diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala new file mode 100644 index 00000000000..c311515c1b5 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -0,0 +1,228 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package docs.http.scaladsl.server.directives + +import akka.NotUsed +import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport } +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.Accept +import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, UnsupportedRequestContentTypeRejection } +import akka.stream.scaladsl.{ Flow, Source } +import akka.util.ByteString +import docs.http.scaladsl.server.RoutingSpec +import spray.json.JsValue + +import scala.concurrent.Future + +class JsonStreamingExamplesSpec extends RoutingSpec { + + //#models + case class Tweet(uid: Int, txt: String) + case class Measurement(id: String, value: Int) + //# + + def getTweets() = + Source(List( + Tweet(1, "#Akka rocks!"), + Tweet(2, "Streaming is so hot right now!"), + Tweet(3, "You cannot enter the same river twice."))) + + //#formats + object MyJsonProtocol + extends akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport + with spray.json.DefaultJsonProtocol { + + implicit val tweetFormat = jsonFormat2(Tweet.apply) + implicit val measurementFormat = jsonFormat2(Measurement.apply) + } + //# + + "spray-json-response-streaming" in { + // [1] import "my protocol", for marshalling Tweet objects: + import MyJsonProtocol._ + + // [2] pick a Source rendering support trait: + // Note that the default support renders the Source as JSON Array + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() + + val route = + path("tweets") { + // [3] simply complete a request with a source of tweets: + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + + // tests ------------------------------------------------------------ + val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) + val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`)) + + Get("/tweets").withHeaders(AcceptJson) ~> route ~> check { + responseAs[String] shouldEqual + """[""" + + """{"uid":1,"txt":"#Akka rocks!"},""" + + """{"uid":2,"txt":"Streaming is so hot right now!"},""" + + """{"uid":3,"txt":"You cannot enter the same river twice."}""" + + """]""" + } + + // endpoint can only marshal Json, so it will *reject* requests for application/xml: + Get("/tweets").withHeaders(AcceptXml) ~> route ~> check { + handled should ===(false) + rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`))) + } + } + + "line-by-line-json-response-streaming" in { + import MyJsonProtocol._ + + // Configure the EntityStreamingSupport to render the elements as: + // {"example":42} + // {"example":43} + // ... + // {"example":1000} + val start = ByteString.empty + val sep = ByteString("\n") + val end = ByteString.empty + + implicit val jsonStreamingSupport = EntityStreamingSupport.json() + .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end)) + + val route = + path("tweets") { + // [3] simply complete a request with a source of tweets: + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + + // tests ------------------------------------------------------------ + val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) + + Get("/tweets").withHeaders(AcceptJson) ~> route ~> check { + responseAs[String] shouldEqual + """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + + """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + + """{"uid":3,"txt":"You cannot enter the same river twice."}""" + } + } + + "csv-example" in { + // [1] provide a marshaller to ByteString + implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t => + Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => { + val txt = t.txt.replaceAll(",", ".") + val uid = t.uid + ByteString(List(uid, txt).mkString(",")) + }) + } + + // [2] enable csv streaming: + implicit val csvStreaming = EntityStreamingSupport.csv() + + val route = + path("tweets") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + + // tests ------------------------------------------------------------ + val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`)) + + Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check { + responseAs[String] shouldEqual + """|1,#Akka rocks! + |2,Streaming is so hot right now! + |3,You cannot enter the same river twice.""" + .stripMargin + } + } + + "response-streaming-modes" in { + + { + //#async-rendering + import MyJsonProtocol._ + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = + EntityStreamingSupport.json() + .withParallelMarshalling(parallelism = 8, unordered = false) + + path("tweets") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + //# + } + + { + + //#async-unordered-rendering + import MyJsonProtocol._ + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = + EntityStreamingSupport.json() + .withParallelMarshalling(parallelism = 8, unordered = true) + + path("tweets" / "unordered") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + //# + } + } + + "spray-json-request-streaming" in { + // [1] import "my protocol", for unmarshalling Measurement objects: + import MyJsonProtocol._ + + // [2] enable Json Streaming + implicit val jsonStreamingSupport = EntityStreamingSupport.json() + + // prepare your persisting logic here + val persistMetrics = Flow[Measurement] + + val route = + path("metrics") { + // [3] extract Source[Measurement, _] + entity(asSourceOf[Measurement]) { measurements => + // alternative syntax: + // entity(as[Source[Measurement, NotUsed]]) { measurements => + val measurementsSubmitted: Future[Int] = + measurements + .via(persistMetrics) + .runFold(0) { (cnt, _) => cnt + 1 } + + complete { + measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n""")) + } + } + } + + // tests ------------------------------------------------------------ + // uploading an array or newline separated values works out of the box + val data = HttpEntity( + ContentTypes.`application/json`, + """ + |{"id":"temp","value":32} + |{"id":"temp","value":31} + | + """.stripMargin) + + Post("/metrics", entity = data) ~> route ~> check { + status should ===(StatusCodes.OK) + responseAs[String] should ===("""{"msg":"Total metrics received: 2"}""") + } + + // the FramingWithContentType will reject any content type that it does not understand: + val xmlData = HttpEntity( + ContentTypes.`text/xml(UTF-8)`, + """| + |""".stripMargin) + + Post("/metrics", entity = xmlData) ~> route ~> check { + handled should ===(false) + rejection should ===(UnsupportedRequestContentTypeRejection(Set(ContentTypes.`application/json`))) + } + } + +} diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala index a0720452cd5..8f7c855dc95 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala @@ -71,8 +71,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec { // tests: Get("/") ~> route ~> check { mediaType shouldEqual `application/json` - responseAs[String] should include(""""name": "Jane"""") - responseAs[String] should include(""""favoriteNumber": 42""") + responseAs[String] should include(""""name":"Jane"""") + responseAs[String] should include(""""favoriteNumber":42""") } } @@ -95,8 +95,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec { Post("/", HttpEntity(`application/json`, """{ "name": "Jane", "favoriteNumber" : 42 }""")) ~> route ~> check { mediaType shouldEqual `application/json` - responseAs[String] should include(""""name": "Jane"""") - responseAs[String] should include(""""favoriteNumber": 42""") + responseAs[String] should include(""""name":"Jane"""") + responseAs[String] should include(""""favoriteNumber":42""") } } } diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/index.rst b/akka-docs/rst/scala/http/routing-dsl/directives/index.rst index e0082a338a2..4e30d49f503 100644 --- a/akka-docs/rst/scala/http/routing-dsl/directives/index.rst +++ b/akka-docs/rst/scala/http/routing-dsl/directives/index.rst @@ -224,4 +224,4 @@ When you combine directives producing extractions with the ``&`` operator all ex Directives offer a great way of constructing your web service logic from small building blocks in a plug and play fashion while maintaining DRYness and full type-safety. If the large range of :ref:`Predefined Directives` does not -fully satisfy your needs you can also very easily create :ref:`Custom Directives`. +fully satisfy your needs you can also easily create :ref:`Custom Directives`. diff --git a/akka-docs/rst/scala/http/routing-dsl/index.rst b/akka-docs/rst/scala/http/routing-dsl/index.rst index a4e1ee51216..10942ef517b 100644 --- a/akka-docs/rst/scala/http/routing-dsl/index.rst +++ b/akka-docs/rst/scala/http/routing-dsl/index.rst @@ -23,6 +23,7 @@ static content serving. exception-handling path-matchers case-class-extraction + source-streaming-support testkit websocket-support diff --git a/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst new file mode 100644 index 00000000000..4a2d121b801 --- /dev/null +++ b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst @@ -0,0 +1,136 @@ +.. _json-streaming-scala: + +Source Streaming +================ + +Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to easily build +and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack. + +It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to +stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, +or CSV stream (where each element is separated by a new-line). + +In the following sections we investigate how to make use of the JSON Streaming infrastructure, +however the general hints apply to any kind of element-by-element streaming you could imagine. + +JSON Streaming +============== + +`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON +objects as a continuous HTTP request or response. The elements are most often separated using newlines, +however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another +use case. + +In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: models + +And as always with spray-json, we provide our (Un)Marshaller instances as implicit values uding the ``jsonFormat##`` +method to generate them statically: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: formats + +.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming + +Responding with JSON Streams +---------------------------- + +In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_. + +Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an +Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses +spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the +``akka-http-spray-json-experimental`` module. + +Once the general infrastructure is prepared we import our model's marshallers, generated by spray-json (Step 1), +and enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 2). +Akka HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you'd +like to stream a different content type (for example plists or protobuf). + +The final step is simply completing a request using a Source of tweets, as simple as that: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: spray-json-response-streaming + +The reason the ``EntityStreamingSupport`` has to be enabled explicitly is that one might want to configure how the +stream should be rendered. We'll dicuss this in depth in the next section though. + +.. _Streaming API: https://dev.twitter.com/streaming/overview + +Customising response rendering mode +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Since it is not always possible to directly and confidently answer the question of how a stream of ``T`` should look on +the wire, the ``EntityStreamingSupport`` traits come into play and allow fine-tuning the streams rendered representation. + +For example, in case of JSON Streaming, there isn't really one standard about rendering the response. Some APIs prefer +to render multiple JSON objects in a line-by-line fashion (Twitter's streaming APIs for example), while others simply return +very large arrays, which could be streamed as well. + +Akka defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting +a streaming API would still be able to consume it in a naive way if they'd want to. + +The line-by-line aproach however is also pretty popular even though it is not valid JSON. It's relatively simplicity for +client-side parsing is a strong point in case to pick this format for your Streaming APIs. +Below we demonstrate how to reconfigure the support trait to render the JSON as + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: line-by-line-json-response-streaming + +Another interesting feature is parallel marshalling. Since marshalling can potentially take much time, +it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration +option on ``EntityStreamingSupport`` and is configurable like this: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: async-rendering + +The above shown mode perserves ordering of the Source's elements, which may sometimes be a required property, +for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the +data being streamed though, which allows us to exploit this property and use an ``unordered`` rendering. + +This also is a configuration option and is used as shown below. Effectively this will allow Akka's marshalling infrastructure +to concurrently marshallup to ``parallelism`` elements and emit the first which is marshalled onto the ``HttpResponse``: + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: async-unordered-rendering + +This allows us to _potentially_ render elements faster onto the HttpResponse, since it can avoid "head of line blocking", +in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall. + +Consuming JSON Streaming uploads +-------------------------------- + +Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with +the server and is feeding it with one line of measurement data. + +In this example, we want to consume this data in a streaming fashion from the request entity, and also apply +back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure +will be applied automatically thanks to using Akka HTTP/Streams). + + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: spray-json-request-streaming + +Simple CSV streaming example +---------------------------- + +Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values). +For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming +modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available, +and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see +an error during runtime that the marshaller did not expose the expected content type and thus we can not render +the streaming response). + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: csv-example + +Implementing custom EntityStreamingSupport traits +------------------------------------------------- + +The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type +or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller[T, ByteString]`` +instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON). + +When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class, +and implement all of it's methods. It's best to use the existing implementations as a guideline. diff --git a/akka-docs/rst/scala/typed-actors.rst b/akka-docs/rst/scala/typed-actors.rst index f9f5ab8fd5b..75c72637f3a 100644 --- a/akka-docs/rst/scala/typed-actors.rst +++ b/akka-docs/rst/scala/typed-actors.rst @@ -35,7 +35,7 @@ lies in interfacing between private sphere and the public, but you don’t want that many doors inside your house, do you? For a longer discussion see `this blog post `_. -A bit more background: TypedActors can very easily be abused as RPC, and that +A bit more background: TypedActors can easily be abused as RPC, and that is an abstraction which is `well-known `_ to be leaky. Hence TypedActors are not what we think of first when we talk diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java b/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java index 61b3a1c728e..25c403ee63f 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java @@ -25,6 +25,9 @@ private ContentTypes() { } public static final ContentType.WithCharset TEXT_XML_UTF8 = akka.http.scaladsl.model.ContentTypes.text$divxml$u0028UTF$minus8$u0029(); + public static final ContentType.WithCharset TEXT_CSV_UTF8 = + akka.http.scaladsl.model.ContentTypes.text$divcsv$u0028UTF$minus8$u0029(); + public static ContentType.Binary create(MediaType.Binary mediaType) { return ContentType$.MODULE$.apply((akka.http.scaladsl.model.MediaType.Binary) mediaType); } diff --git a/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java b/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java index eb9e8ec78db..92cd6b9bb61 100644 --- a/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java +++ b/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java @@ -11,6 +11,7 @@ import akka.http.javadsl.marshalling.Marshaller; import akka.http.javadsl.unmarshalling.Unmarshaller; +import akka.util.ByteString; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -31,6 +32,10 @@ public static Marshaller marshaller(ObjectMapper mapper) { ); } + public static Unmarshaller byteStringUnmarshaller(Class expectedType) { + return byteStringUnmarshaller(defaultObjectMapper, expectedType); + } + public static Unmarshaller unmarshaller(Class expectedType) { return unmarshaller(defaultObjectMapper, expectedType); } @@ -39,6 +44,10 @@ public static Unmarshaller unmarshaller(ObjectMapper mapper, return Unmarshaller.forMediaType(MediaTypes.APPLICATION_JSON, Unmarshaller.entityToString()) .thenApply(s -> fromJSON(mapper, s, expectedType)); } + + public static Unmarshaller byteStringUnmarshaller(ObjectMapper mapper, Class expectedType) { + return Unmarshaller.sync(s -> fromJSON(mapper, s.utf8String(), expectedType)); + } private static String toJSON(ObjectMapper mapper, Object object) { try { diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala new file mode 100644 index 00000000000..71f7fec8cb3 --- /dev/null +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.marshallers.sprayjson + +import java.nio.{ ByteBuffer, CharBuffer } +import java.nio.charset.{ Charset, StandardCharsets } + +import akka.util.ByteString +import spray.json.ParserInput.DefaultParserInput +import scala.annotation.tailrec + +/** + * ParserInput reading directly off a ByteString. (Based on the ByteArrayBasedParserInput) + * This avoids a separate decoding step but assumes that each byte represents exactly one character, + * which is encoded by ISO-8859-1! + * You can therefore use this ParserInput type only if you know that all input will be `ISO-8859-1`-encoded, + * or only contains 7-bit ASCII characters (which is a subset of ISO-8859-1)! + * + * Note that this ParserInput type will NOT work with general `UTF-8`-encoded input as this can contain + * character representations spanning multiple bytes. However, if you know that your input will only ever contain + * 7-bit ASCII characters (0x00-0x7F) then UTF-8 is fine, since the first 127 UTF-8 characters are + * encoded with only one byte that is identical to 7-bit ASCII and ISO-8859-1. + */ +final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultParserInput { + + import SprayJsonByteStringParserInput._ + + private[this] val byteBuffer = ByteBuffer.allocate(4) + private[this] val charBuffer = CharBuffer.allocate(1) + + private[this] val decoder = Charset.forName("UTF-8").newDecoder() + + override def nextChar() = { + _cursor += 1 + if (_cursor < bytes.length) (bytes(_cursor) & 0xFF).toChar else EOI + } + + override def nextUtf8Char() = { + @tailrec def decode(byte: Byte, remainingBytes: Int): Char = { + byteBuffer.put(byte) + if (remainingBytes > 0) { + _cursor += 1 + if (_cursor < bytes.length) decode(bytes(_cursor), remainingBytes - 1) else ErrorChar + } else { + byteBuffer.flip() + val coderResult = decoder.decode(byteBuffer, charBuffer, false) + charBuffer.flip() + val result = if (coderResult.isUnderflow & charBuffer.hasRemaining) charBuffer.get() else ErrorChar + byteBuffer.clear() + charBuffer.clear() + result + } + } + + _cursor += 1 + if (_cursor < bytes.length) { + val byte = bytes(_cursor) + if (byte >= 0) byte.toChar // 7-Bit ASCII + else if ((byte & 0xE0) == 0xC0) decode(byte, 1) // 2-byte UTF-8 sequence + else if ((byte & 0xF0) == 0xE0) decode(byte, 2) // 3-byte UTF-8 sequence + else if ((byte & 0xF8) == 0xF0) decode(byte, 3) // 4-byte UTF-8 sequence, will probably produce an (unsupported) surrogate pair + else ErrorChar + } else EOI + } + + override def length: Int = bytes.size + override def sliceString(start: Int, end: Int): String = + bytes.slice(start, end - start).decodeString(StandardCharsets.ISO_8859_1) + override def sliceCharArray(start: Int, end: Int): Array[Char] = + StandardCharsets.ISO_8859_1.decode(bytes.slice(start, end).asByteBuffer).array() +} + +object SprayJsonByteStringParserInput { + private final val EOI = '\uFFFF' + // compile-time constant + private final val ErrorChar = '\uFFFD' // compile-time constant, universal UTF-8 replacement character '�' +} diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala index 99bbb2c2fc2..e399ca77069 100644 --- a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala @@ -4,13 +4,19 @@ package akka.http.scaladsl.marshallers.sprayjson -import scala.language.implicitConversions -import akka.http.scaladsl.marshalling.{ ToEntityMarshaller, Marshaller } -import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller } -import akka.http.scaladsl.model.{ MediaTypes, HttpCharsets } +import akka.NotUsed +import akka.http.scaladsl.common.EntityStreamingSupport +import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model.MediaTypes.`application/json` +import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes } +import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, FromRequestUnmarshaller, Unmarshaller } +import akka.http.scaladsl.util.FastFuture +import akka.stream.scaladsl.{ Flow, Keep, Source } +import akka.util.ByteString import spray.json._ +import scala.language.implicitConversions + /** * A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol. */ @@ -19,19 +25,41 @@ trait SprayJsonSupport { sprayJsonUnmarshaller(reader) implicit def sprayJsonUnmarshaller[T](implicit reader: RootJsonReader[T]): FromEntityUnmarshaller[T] = sprayJsValueUnmarshaller.map(jsonReader[T].read) + implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] = + Unmarshaller.withMaterializer[ByteString, JsValue](_ ⇒ implicit mat ⇒ { bs ⇒ + // .compact so addressing into any address is very fast (also for large chunks) + // TODO we could optimise ByteStrings to better handle lienear access like this (or provide ByteStrings.linearAccessOptimised) + // TODO IF it's worth it. + val parserInput = new SprayJsonByteStringParserInput(bs.compact) + FastFuture.successful(JsonParser(parserInput)) + }).map(jsonReader[T].read) implicit def sprayJsValueUnmarshaller: FromEntityUnmarshaller[JsValue] = Unmarshaller.byteStringUnmarshaller.forContentTypes(`application/json`).mapWithCharset { (data, charset) ⇒ val input = if (charset == HttpCharsets.`UTF-8`) ParserInput(data.toArray) - else ParserInput(data.decodeString(charset.nioCharset.name)) // FIXME: identify charset by instance, not by name! + else ParserInput(data.decodeString(charset.nioCharset)) JsonParser(input) } - implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] = + implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] = sprayJsonMarshaller[T](writer, printer) - implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] = + implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] = sprayJsValueMarshaller compose writer.write - implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[JsValue] = + implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] = Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer) + + // support for as[Source[T, NotUsed]] + implicit def sprayJsonSourceReader[T](implicit rootJsonReader: RootJsonReader[T], support: EntityStreamingSupport): FromRequestUnmarshaller[Source[T, NotUsed]] = + Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ r ⇒ + if (support.supported.matches(r.entity.contentType)) { + val bytes = r.entity.dataBytes + val frames = bytes.via(support.framingDecoder) + val unmarshalling = + if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs)) + else Flow[ByteString].mapAsync(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs)) + val elements = frames.viaMat(unmarshalling)(Keep.right) + FastFuture.successful(elements) + } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported)) + } } -object SprayJsonSupport extends SprayJsonSupport \ No newline at end of file +object SprayJsonSupport extends SprayJsonSupport diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java index 33f2e986643..761a8e2ba9d 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java @@ -3,15 +3,22 @@ */ package akka.http.javadsl.server; +import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; +import akka.http.javadsl.common.EntityStreamingSupport; +import akka.http.javadsl.marshallers.jackson.Jackson; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.model.StatusCodes; +import akka.http.javadsl.unmarshalling.StringUnmarshallers; +import akka.http.javadsl.unmarshalling.Unmarshaller; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; +import akka.util.ByteString; import scala.concurrent.duration.Duration; import scala.runtime.BoxedUnit; @@ -55,18 +62,41 @@ public Route createRoute() { ); final Route crash = path("crash", () -> - path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.failed(new Exception("Boom!")))).orElse( - path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); })))) + path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.failed(new Exception("Boom!")))).orElse( + path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); })))) ); + final Unmarshaller JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class); + final Route tweets = path("tweets", () -> + get(() -> + parameter(StringUnmarshallers.INTEGER, "n", n -> { + final Source tws = Source.repeat(new JavaTweet("Hello World!")).take(n); + return completeOKWithSource(tws, Jackson.marshaller(), EntityStreamingSupport.json()); + }) + ).orElse( + post(() -> + extractMaterializer(mat -> + entityAsSourceOf(JavaTweets, null, sourceOfTweets -> { + final CompletionStage tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat); + return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c)); + }) + ) + )) + ); + final Route inner = path("inner", () -> getFromResourceDirectory("someDir") ); - return get(() -> - index.orElse(secure).orElse(ping).orElse(crash).orElse(inner).orElse(requestTimeout) - ); + return index + .orElse(secure) + .orElse(ping) + .orElse(crash) + .orElse(inner) + .orElse(requestTimeout) + .orElse(tweets) + ; } private void silentSleep(int millis) { @@ -113,7 +143,7 @@ private void run() throws InterruptedException { final Flow flow = createRoute().flow(system, mat); final CompletionStage binding = - Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1"), mat); + Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1", 8080), mat); System.console().readLine("Press [ENTER] to quit..."); shutdown(binding); @@ -131,4 +161,21 @@ private CompletionStage shutdown(CompletionStage binding) { } }); } + + private static final class JavaTweet { + private String message; + + public JavaTweet(String message) { + this.message = message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + } } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala index a931209d7f5..223b6b2d9b3 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala @@ -18,6 +18,7 @@ import scala.concurrent.Await private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with Directives with RequestBuilding with ScalaFutures with IntegrationPatience { + import IntegrationRoutingSpec._ implicit val system = ActorSystem(AkkaSpec.getCallerName(getClass)) implicit val mat = ActorMaterializer() @@ -31,8 +32,6 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi def ~!>(route: Route) = new Prepped(request, route) } - final case class Prepped(request: HttpRequest, route: Route) - implicit class Checking(p: Prepped) { def ~!>(checking: HttpResponse ⇒ Unit) = { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() @@ -47,3 +46,7 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi } } + +object IntegrationRoutingSpec { + final case class Prepped(request: HttpRequest, route: Route) +} diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index ecca3305dc7..ea5cb71894f 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -4,14 +4,19 @@ package akka.http.scaladsl.server +import akka.NotUsed import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport -import akka.http.scaladsl.model.{ StatusCodes, HttpResponse } +import akka.http.scaladsl.model.{ HttpResponse, StatusCodes } import akka.http.scaladsl.server.directives.Credentials -import com.typesafe.config.{ ConfigFactory, Config } +import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http +import akka.http.scaladsl.common.EntityStreamingSupport +import akka.http.scaladsl.marshalling.ToResponseMarshallable +import spray.json.RootJsonReader + import scala.concurrent.duration._ import scala.io.StdIn @@ -21,10 +26,18 @@ object TestServer extends App { akka.log-dead-letters = off akka.stream.materializer.debug.fuzzing-mode = off """) + implicit val system = ActorSystem("ServerTest", testConf) import system.dispatcher implicit val materializer = ActorMaterializer() + import spray.json.DefaultJsonProtocol._ + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ + final case class Tweet(message: String) + implicit val tweetFormat = jsonFormat1(Tweet) + + implicit val jsonStreaming = EntityStreamingSupport.json() + import ScalaXmlSupport._ import Directives._ @@ -32,7 +45,8 @@ object TestServer extends App { case p @ Credentials.Provided(name) if p.verify(name + "-password") ⇒ name } - val bindingFuture = Http().bindAndHandle({ + // format: OFF + val routes = { get { path("") { withRequestTimeout(1.milli, _ ⇒ HttpResponse( @@ -42,21 +56,49 @@ object TestServer extends App { complete(index) } } ~ - path("secure") { - authenticateBasicPF("My very secure site", auth) { user ⇒ - complete(Hello { user }. Access has been granted!) - } + path("secure") { + authenticateBasicPF("My very secure site", auth) { user ⇒ + complete( Hello {user}. Access has been granted! ) + } + } ~ + path("ping") { + complete("PONG!") + } ~ + path("crash") { + complete(sys.error("BOOM!")) + } ~ + path("tweet") { + complete(Tweet("Hello, world!")) + } ~ + (path("tweets") & parameter('n.as[Int])) { n => + get { + val tweets = Source.repeat(Tweet("Hello, world!")).take(n) + complete(tweets) } ~ - path("ping") { - complete("PONG!") + post { + entity(asSourceOf[Tweet]) { tweets ⇒ + onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count => + complete(s"Total tweets received: " + count) + } + } } ~ - path("crash") { - complete(sys.error("BOOM!")) + put { + // checking the alternative syntax also works: + entity(as[Source[Tweet, NotUsed]]) { tweets ⇒ + onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count => + complete(s"Total tweets received: " + count) + } + } } - } ~ pathPrefix("inner")(getFromResourceDirectory("someDir")) - }, interface = "localhost", port = 8080) + } + } ~ + pathPrefix("inner")(getFromResourceDirectory("someDir")) + } + // format: ON + + val bindingFuture = Http().bindAndHandle(routes, interface = "0.0.0.0", port = 8080) - println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") + println(s"Server online at http://0.0.0.0:8080/\nPress RETURN to stop...") StdIn.readLine() bindingFuture.flatMap(_.unbind()).onComplete(_ ⇒ system.terminate()) diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala index 50be2b4c584..fc049363532 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala @@ -184,7 +184,7 @@ class MarshallingDirectivesSpec extends RoutingSpec with Inside { "render JSON with UTF-8 encoding if no `Accept-Charset` request header is present" in { Get() ~> complete(foo) ~> check { - responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.prettyPrint) + responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.compactPrint) } } "reject JSON rendering if an `Accept-Charset` request header requests a non-UTF-8 encoding" in { diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala index 26e9f65b294..df9ee7c2b54 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala @@ -98,10 +98,7 @@ class RouteDirectivesSpec extends FreeSpec with GenericRoutingSpec { import akka.http.scaladsl.model.headers.Accept Get().withHeaders(Accept(MediaTypes.`application/json`)) ~> route ~> check { responseAs[String] shouldEqual - """{ - | "name": "Ida", - | "age": 83 - |}""".stripMarginWithNewline("\n") + """{"name":"Ida","age":83}""" } Get().withHeaders(Accept(MediaTypes.`text/xml`)) ~> route ~> check { responseAs[xml.NodeSeq] shouldEqual Ida83 diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/javadsl/common/EntityStreamingSupport.scala new file mode 100644 index 00000000000..b2c461f4c88 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/common/EntityStreamingSupport.scala @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.javadsl.common + +import akka.NotUsed +import akka.http.javadsl.model.{ ContentType, ContentTypeRange } +import akka.http.scaladsl.common +import akka.stream.javadsl.Flow +import akka.util.ByteString + +/** + * Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities. + * + * See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations. + */ +abstract class EntityStreamingSupport { + + /** Read-side, what content types it is able to frame and unmarshall. */ + def supported: ContentTypeRange + /** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */ + def contentType: ContentType + + /** + * Read-side, decode incoming framed entity. + * For example with an incoming JSON array, chunk it up into JSON objects contained within that array. + */ + def getFramingDecoder: Flow[ByteString, ByteString, NotUsed] + /** + * Write-side, apply framing to outgoing entity stream. + * + * Most typical usage will be a variant of `Flow[ByteString].intersperse`. + * + * For example for rendering a JSON array one would return + * `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))` + * and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`. + */ + def getFramingRenderer: Flow[ByteString, ByteString, NotUsed] + + /** + * Read-side, allows changing what content types are accepted by this framing. + * + * EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]]. + * + * This is in order to support a-typical APIs which users still want to communicate with using + * the provided support trait. Typical examples include APIs which return valid `application/json` + * however advertise the content type as being `application/javascript` or vendor specific content types, + * which still parse correctly as JSON, CSV or something else that a provided support trait is built for. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withSupported(range: ContentTypeRange): EntityStreamingSupport + + /** + * Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. + * + * EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]]. + * This is due to the need integrating with existing systems which sometimes excpect custom Content-Types, + * however really are just plain JSON or something else internally (perhaps with slight extensions). + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withContentType(range: ContentType): EntityStreamingSupport + + /** + * Write-side / read-side, defines if (un)marshalling should be done in parallel. + * + * This may be beneficial marshalling the bottleneck in the pipeline. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def parallelism: Int + + /** + * Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not. + * + * Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding + * head-of-line blocking if some elements are much larger than others. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def unordered: Boolean + + /** + * Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling. + * + * Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced) + * may yield in higher throughput. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport + +} + +/** + * Entity streaming support, independent of used Json parsing library etc. + */ +object EntityStreamingSupport { + + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(): JsonEntityStreamingSupport = json(8 * 1024) + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(maxObjectLength: Int): JsonEntityStreamingSupport = common.EntityStreamingSupport.json(maxObjectLength) + + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + * + * Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly. + */ + def csv(): CsvEntityStreamingSupport = csv(8 * 1024) + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + */ + def csv(maxLineLength: Int): CsvEntityStreamingSupport = common.EntityStreamingSupport.csv(maxLineLength) +} + +// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed) +abstract class JsonEntityStreamingSupport extends common.EntityStreamingSupport { + def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport +} + +// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed) +abstract class CsvEntityStreamingSupport extends common.EntityStreamingSupport { + def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport +} diff --git a/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala b/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala index c1dc4ab6815..dfd73886527 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala @@ -16,6 +16,7 @@ import akka.japi.Util import akka.util.ByteString import scala.concurrent.ExecutionContext +import scala.annotation.unchecked.uncheckedVariance import scala.language.implicitConversions object Marshaller { @@ -56,29 +57,29 @@ object Marshaller { // TODO make sure these are actually usable in a sane way def wrapEntity[A, C](f: function.BiFunction[ExecutionContext, C, A], m: Marshaller[A, RequestEntity], mediaType: MediaType): Marshaller[C, RequestEntity] = { - val scalaMarshaller = m.asScalaToEntityMarshaller + val scalaMarshaller = m.asScalaCastOutput fromScala(scalaMarshaller.wrapWithEC(mediaType.asScala) { ctx ⇒ c: C ⇒ f(ctx, c) }(ContentTypeOverrider.forEntity)) } def wrapEntity[A, C, E <: RequestEntity](f: function.Function[C, A], m: Marshaller[A, E], mediaType: MediaType): Marshaller[C, RequestEntity] = { - val scalaMarshaller = m.asScalaToEntityMarshaller + val scalaMarshaller = m.asScalaCastOutput fromScala(scalaMarshaller.wrap(mediaType.asScala)((in: C) ⇒ f.apply(in))(ContentTypeOverrider.forEntity)) } def entityToOKResponse[A](m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaToEntityMarshaller)) + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaCastOutput)) } def entityToResponse[A, R <: RequestEntity](status: StatusCode, m: Marshaller[A, R]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaToEntityMarshaller)) + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaCastOutput)) } def entityToResponse[A](status: StatusCode, headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO can we avoid the map() ? + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO can we avoid the map() ? } def entityToOKResponse[A](headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO avoid the map() + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO avoid the map() } // these are methods not varargs to avoid call site warning about unchecked type params @@ -140,13 +141,14 @@ object Marshaller { m.asScala.map(_.asScala) } -class Marshaller[A, B] private (implicit val asScala: marshalling.Marshaller[A, B]) { +class Marshaller[-A, +B] private (implicit val asScala: marshalling.Marshaller[A, B]) { import Marshaller.fromScala + /** INTERNAL API: involves unsafe cast (however is very fast) */ // TODO would be nice to not need this special case - def asScalaToEntityMarshaller[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]] + private[akka] def asScalaCastOutput[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]] - def map[C](f: function.Function[B, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply)) + def map[C](f: function.Function[B @uncheckedVariance, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply)) - def compose[C](f: function.Function[C, A]): Marshaller[C, B] = fromScala(asScala.compose(f.apply)) + def compose[C](f: function.Function[C, A @uncheckedVariance]): Marshaller[C, B] = fromScala(asScala.compose(f.apply)) } diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala index 7bc2100a2df..2f966a1a7ec 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala @@ -5,11 +5,11 @@ package akka.http.javadsl.server import akka.http.impl.util.JavaMapping -import akka.http.javadsl.server.directives.TimeoutDirectives +import akka.http.javadsl.server.directives.{ FramedEntityStreamingDirectives, TimeoutDirectives } import scala.annotation.varargs -abstract class AllDirectives extends TimeoutDirectives +abstract class AllDirectives extends FramedEntityStreamingDirectives /** * INTERNAL API diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala b/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala index 1cb892cfd81..ebb6eeb3137 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala @@ -8,9 +8,13 @@ import java.util.concurrent.CompletionStage import akka.http.impl.util.JavaMapping._ import akka.http.impl.util._ +import akka.http.javadsl.common.EntityStreamingSupport import akka.http.{ javadsl, scaladsl } import akka.http.scaladsl.server.{ directives ⇒ sdirectives } +import akka.http.scaladsl.{ common ⇒ scommon } import akka.http.javadsl.server.{ directives ⇒ jdirectives } +import akka.http.javadsl.{ common ⇒ jcommon } + import scala.collection.immutable /** @@ -43,6 +47,8 @@ private[http] object RoutingJavaMapping { } implicit object convertRouteResult extends Inherited[javadsl.server.RouteResult, scaladsl.server.RouteResult] + implicit object convertEntityStreamingSupport extends Inherited[EntityStreamingSupport, scommon.EntityStreamingSupport] + implicit object convertDirectoryRenderer extends Inherited[jdirectives.DirectoryRenderer, sdirectives.FileAndResourceDirectives.DirectoryRenderer] implicit object convertContentTypeResolver extends Inherited[jdirectives.ContentTypeResolver, sdirectives.ContentTypeResolver] implicit object convertDirectoryListing extends Inherited[jdirectives.DirectoryListing, sdirectives.DirectoryListing] diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala new file mode 100644 index 00000000000..eef4a3d1f9c --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.http.javadsl.server.directives + +import java.util.function.{ Function ⇒ JFunction } +import java.util.{ List ⇒ JList, Map ⇒ JMap } + +import akka.NotUsed +import akka.http.javadsl.common.EntityStreamingSupport +import akka.http.javadsl.marshalling.Marshaller +import akka.http.javadsl.model.{ HttpEntity, _ } +import akka.http.javadsl.server.Route +import akka.http.javadsl.unmarshalling.Unmarshaller +import akka.http.scaladsl.marshalling.{ Marshalling, ToByteStringMarshaller, ToResponseMarshallable } +import akka.http.scaladsl.server.{ Directives ⇒ D } +import akka.stream.javadsl.Source +import akka.util.ByteString + +/** EXPERIMENTAL API */ +abstract class FramedEntityStreamingDirectives extends TimeoutDirectives { + + import akka.http.javadsl.server.RoutingJavaMapping._ + import akka.http.javadsl.server.RoutingJavaMapping.Implicits._ + + @CorrespondsTo("asSourceOf") + def entityAsSourceOf[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport, + inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { + val umm = D.asSourceOf(um.asScala, support.asScala) + D.entity(umm) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + inner(s.asJava).delegate + } + } + + // implicits and multiple parameter lists used internally, Java caller does not benefit or use it + @CorrespondsTo("complete") + def completeWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, ByteString], support: EntityStreamingSupport): Route = RouteAdapter { + import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._ + val mm = m.map(ByteStringAsEntityFn).asScalaCastOutput[akka.http.scaladsl.model.RequestEntity] + val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm) + val response = ToResponseMarshallable(source.asScala)(mmm) + D.complete(response) + } + + // implicits and multiple parameter lists used internally, Java caller does not benefit or use it + @CorrespondsTo("complete") + def completeOKWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, RequestEntity], support: EntityStreamingSupport): Route = RouteAdapter { + import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._ + // don't try this at home: + val mm = m.asScalaCastOutput[akka.http.scaladsl.model.RequestEntity].map(_.httpEntity.asInstanceOf[akka.http.scaladsl.model.RequestEntity]) + implicit val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm) + val response = ToResponseMarshallable(source.asScala) + D.complete(response) + } + + private[this] val ByteStringAsEntityFn = new java.util.function.Function[ByteString, HttpEntity]() { + override def apply(bs: ByteString): HttpEntity = HttpEntities.create(bs) + } +} + +object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala index 7a73f294ef2..f89ac51563c 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala @@ -23,6 +23,8 @@ abstract class FutureDirectives extends FormFieldDirectives { /** * "Unwraps" a `CompletionStage` and runs the inner route after future * completion with the future's value as an extraction of type `Try`. + * + * @group future */ def onComplete[T](f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter { D.onComplete(f.get.toScala.recover(unwrapCompletionException)) { value ⇒ @@ -30,6 +32,18 @@ abstract class FutureDirectives extends FormFieldDirectives { } } + /** + * "Unwraps" a `CompletionStage` and runs the inner route after future + * completion with the future's value as an extraction of type `Try`. + * + * @group future + */ + def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter { + D.onComplete(cs.toScala.recover(unwrapCompletionException)) { value ⇒ + inner(value).delegate + } + } + /** * "Unwraps" a `CompletionStage[T]` and runs the inner route after future * completion with the future's value as an extraction of type `T` if @@ -51,6 +65,8 @@ abstract class FutureDirectives extends FormFieldDirectives { * completion with the stage's value as an extraction of type `T`. * If the stage fails its failure Throwable is bubbled up to the nearest * ExceptionHandler. + * + * @group future */ def onSuccess[T](f: Supplier[CompletionStage[T]], inner: JFunction[T, Route]) = RouteAdapter { D.onSuccess(f.get.toScala.recover(unwrapCompletionException)) { value ⇒ @@ -64,6 +80,8 @@ abstract class FutureDirectives extends FormFieldDirectives { * If the completion stage succeeds the request is completed using the values marshaller * (This directive therefore requires a marshaller for the completion stage value type to be * provided.) + * + * @group future */ def completeOrRecoverWith[T](f: Supplier[CompletionStage[T]], marshaller: Marshaller[T, RequestEntity], inner: JFunction[Throwable, Route]): Route = RouteAdapter { val magnet = CompleteOrRecoverWithMagnet(f.get.toScala)(Marshaller.asScalaEntityMarshaller(marshaller)) diff --git a/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala b/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala index 8a7a0a76595..368ebe68c30 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala @@ -101,6 +101,9 @@ abstract class Unmarshaller[-A, B] extends UnmarshallerBase[A, B] { implicit def asScala: akka.http.scaladsl.unmarshalling.Unmarshaller[A, B] + /** INTERNAL API */ + private[akka] def asScalaCastInput[I]: unmarshalling.Unmarshaller[I, B] = asScala.asInstanceOf[unmarshalling.Unmarshaller[I, B]] + def unmarshall(a: A, ec: ExecutionContext, mat: Materializer): CompletionStage[B] = asScala.apply(a)(ec, mat).toJava /** diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/CsvEntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/CsvEntityStreamingSupport.scala new file mode 100644 index 00000000000..536a86f0039 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/CsvEntityStreamingSupport.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.common + +import akka.NotUsed +import akka.event.Logging +import akka.http.javadsl.{ common, model ⇒ jm } +import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes } +import akka.stream.scaladsl.{ Flow, Framing } +import akka.util.ByteString + +final class CsvEntityStreamingSupport private[akka] ( + maxLineLength: Int, + val supported: ContentTypeRange, + val contentType: ContentType, + val framingRenderer: Flow[ByteString, ByteString, NotUsed], + val parallelism: Int, + val unordered: Boolean +) extends common.CsvEntityStreamingSupport { + import akka.http.impl.util.JavaMapping.Implicits._ + + def this(maxObjectSize: Int) = + this( + maxObjectSize, + ContentTypeRange(ContentTypes.`text/csv(UTF-8)`), + ContentTypes.`text/csv(UTF-8)`, + Flow[ByteString].intersperse(ByteString("\n")), + 1, false) + + override val framingDecoder: Flow[ByteString, ByteString, NotUsed] = + Framing.delimiter(ByteString("\n"), maxLineLength) + + override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport = + withFramingRenderer(framingRendererFlow.asScala) + def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRendererFlow, parallelism, unordered) + + override def withContentType(ct: jm.ContentType): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, supported, ct.asScala, framingRenderer, parallelism, unordered) + override def withSupported(range: jm.ContentTypeRange): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, range.asScala, contentType, framingRenderer, parallelism, unordered) + override def withParallelMarshalling(parallelism: Int, unordered: Boolean): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRenderer, parallelism, unordered) + + override def toString = s"""${Logging.simpleName(getClass)}($maxLineLength, $supported, $contentType)""" +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/EntityStreamingSupport.scala new file mode 100644 index 00000000000..aea219666fa --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/EntityStreamingSupport.scala @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.http.scaladsl.common + +import akka.NotUsed +import akka.http.javadsl.{ common, model ⇒ jm } +import akka.http.scaladsl.model._ +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +/** + * Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities. + * + * See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations. + */ +abstract class EntityStreamingSupport extends common.EntityStreamingSupport { + /** Read-side, what content types it is able to frame and unmarshall. */ + def supported: ContentTypeRange + /** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */ + def contentType: ContentType + + /** + * Read-side, decode incoming framed entity. + * For example with an incoming JSON array, chunk it up into JSON objects contained within that array. + */ + def framingDecoder: Flow[ByteString, ByteString, NotUsed] + override final def getFramingDecoder = framingDecoder.asJava + + /** + * Write-side, apply framing to outgoing entity stream. + * + * Most typical usage will be a variant of `Flow[ByteString].intersperse`. + * + * For example for rendering a JSON array one would return + * `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))` + * and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`. + */ + def framingRenderer: Flow[ByteString, ByteString, NotUsed] + override final def getFramingRenderer = framingRenderer.asJava + + /** + * Read-side, allows changing what content types are accepted by this framing. + * + * EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]]. + * + * This is in order to support a-typical APIs which users still want to communicate with using + * the provided support trait. Typical examples include APIs which return valid `application/json` + * however advertise the content type as being `application/javascript` or vendor specific content types, + * which still parse correctly as JSON, CSV or something else that a provided support trait is built for. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + override def withSupported(range: jm.ContentTypeRange): EntityStreamingSupport + + /** + * Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. + * + * EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]]. + * This is due to the need integrating with existing systems which sometimes excpect custom Content-Types, + * however really are just plain JSON or something else internally (perhaps with slight extensions). + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + override def withContentType(range: jm.ContentType): EntityStreamingSupport + + /** + * Write-side / read-side, defines if (un)marshalling should be done in parallel. + * + * This may be beneficial marshalling the bottleneck in the pipeline. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def parallelism: Int + + /** + * Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not. + * + * Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding + * head-of-line blocking if some elements are much larger than others. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def unordered: Boolean + + /** + * Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling. + * + * Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced) + * may yield in higher throughput. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport + +} + +/** + * Entity streaming support, independent of used Json parsing library etc. + */ +object EntityStreamingSupport { + + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(): JsonEntityStreamingSupport = json(8 * 1024) + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(maxObjectLength: Int): JsonEntityStreamingSupport = new JsonEntityStreamingSupport(maxObjectLength) + + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + * + * Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly. + */ + def csv(): CsvEntityStreamingSupport = csv(8 * 1024) + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + */ + def csv(maxLineLength: Int): CsvEntityStreamingSupport = new CsvEntityStreamingSupport(maxLineLength) +} + diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/JsonEntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonEntityStreamingSupport.scala new file mode 100644 index 00000000000..743f5fd8aa0 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonEntityStreamingSupport.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.common + +import akka.NotUsed +import akka.event.Logging +import akka.http.javadsl.{ common, model ⇒ jm } +import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes } +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +final class JsonEntityStreamingSupport private[akka] ( + maxObjectSize: Int, + val supported: ContentTypeRange, + val contentType: ContentType, + val framingRenderer: Flow[ByteString, ByteString, NotUsed], + val parallelism: Int, + val unordered: Boolean +) extends common.JsonEntityStreamingSupport { + import akka.http.impl.util.JavaMapping.Implicits._ + + def this(maxObjectSize: Int) = + this( + maxObjectSize, + ContentTypeRange(ContentTypes.`application/json`), + ContentTypes.`application/json`, + Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]")), + 1, false) + + override val framingDecoder: Flow[ByteString, ByteString, NotUsed] = + akka.stream.scaladsl.JsonFraming.objectScanner(maxObjectSize) + + override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport = + withFramingRenderer(framingRendererFlow.asScala) + def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRendererFlow, parallelism, unordered) + + override def withContentType(ct: jm.ContentType): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, supported, ct.asScala, framingRenderer, parallelism, unordered) + override def withSupported(range: jm.ContentTypeRange): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, range.asScala, contentType, framingRenderer, parallelism, unordered) + override def withParallelMarshalling(parallelism: Int, unordered: Boolean): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRenderer, parallelism, unordered) + + override def toString = s"""${Logging.simpleName(getClass)}($maxObjectSize, $supported, $contentType)""" + +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala index 124746ccd5d..f6a5d35206a 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala @@ -61,8 +61,9 @@ object StrictForm { fsu(value.entity.data.decodeString(charsetName)) }) - @implicitNotFound("In order to unmarshal a `StrictForm.Field` to type `${T}` you need to supply a " + - "`FromStringUnmarshaller[${T}]` and/or a `FromEntityUnmarshaller[${T}]`") + @implicitNotFound(msg = + s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " + + s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`") sealed trait FieldUnmarshaller[T] { def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T] def unmarshalPart(value: Multipart.FormData.BodyPart.Strict)(implicit ec: ExecutionContext, mat: Materializer): Future[T] diff --git a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala index 81c0290448b..6606ff9f37b 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala @@ -4,7 +4,9 @@ package akka.http.scaladsl.marshalling -import scala.concurrent.{ Future, ExecutionContext } +import akka.http.scaladsl.marshalling.Marshalling.Opaque + +import scala.concurrent.{ ExecutionContext, Future } import scala.util.control.NonFatal import akka.http.scaladsl.model._ import akka.http.scaladsl.util.FastFuture @@ -174,4 +176,4 @@ object Marshalling { def map[B](f: A ⇒ B): Opaque[B] = copy(marshal = () ⇒ f(marshal())) } } -//# \ No newline at end of file +//# diff --git a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala index 62777103a22..35bc19f3d0f 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala @@ -4,12 +4,19 @@ package akka.http.scaladsl.marshalling +import akka.http.scaladsl.common.EntityStreamingSupport import akka.stream.impl.ConstantFun import scala.collection.immutable import akka.http.scaladsl.util.FastFuture._ import akka.http.scaladsl.model.MediaTypes._ import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.ContentNegotiator +import akka.http.scaladsl.util.FastFuture +import akka.stream.scaladsl.Source +import akka.util.ByteString + +import scala.language.higherKinds trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImplicits { @@ -41,6 +48,37 @@ trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImp Marshaller(implicit ec ⇒ { case (status, headers, value) ⇒ mt(value).fast map (_ map (_ map (HttpResponse(status, headers, _)))) }) + + implicit def fromEntityStreamingSupportAndByteStringMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToByteStringMarshaller[T]): ToResponseMarshaller[Source[T, M]] = { + Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ + FastFuture successful { + Marshalling.WithFixedContentType(s.contentType, () ⇒ { + val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) } + + // TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?) + // TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer + val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒ + // pick the Marshalling that matches our EntityStreamingSupport + (s.contentType match { + case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset) ⇒ + marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal } + + case best @ ContentType.WithCharset(bestMT, bestCS) ⇒ + marshallings collectFirst { + case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal + case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS) + } + }).toList + } + val marshalledElements: Source[ByteString, M] = + bestMarshallingPerElement.map(_.apply()) // marshal! + .via(s.framingRenderer) + + HttpResponse(entity = HttpEntity(s.contentType, marshalledElements)) + }) :: Nil + } + } + } } trait LowPriorityToResponseMarshallerImplicits { @@ -48,6 +86,40 @@ trait LowPriorityToResponseMarshallerImplicits { liftMarshaller(m) implicit def liftMarshaller[T](implicit m: ToEntityMarshaller[T]): ToResponseMarshaller[T] = PredefinedToResponseMarshallers.fromToEntityMarshaller() + + // FIXME deduplicate this!!! + implicit def fromEntityStreamingSupportAndEntityMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToEntityMarshaller[T]): ToResponseMarshaller[Source[T, M]] = { + Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ + FastFuture successful { + Marshalling.WithFixedContentType(s.contentType, () ⇒ { + val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) } + + // TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?) + // TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer + val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒ + // pick the Marshalling that matches our EntityStreamingSupport + (s.contentType match { + case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset) ⇒ + marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal } + + case best @ ContentType.WithCharset(bestMT, bestCS) ⇒ + marshallings collectFirst { + case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal + case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS) + } + }).toList + } + val marshalledElements: Source[ByteString, M] = + bestMarshallingPerElement.map(_.apply()) // marshal! + .flatMapConcat(_.dataBytes) // extract raw dataBytes + .via(s.framingRenderer) + + HttpResponse(entity = HttpEntity(s.contentType, marshalledElements)) + }) :: Nil + } + } + } + } object PredefinedToResponseMarshallers extends PredefinedToResponseMarshallers diff --git a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/package.scala b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/package.scala index a6a8748e975..c1935887e07 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/package.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/package.scala @@ -6,10 +6,12 @@ package akka.http.scaladsl import scala.collection.immutable import akka.http.scaladsl.model._ +import akka.util.ByteString package object marshalling { //# marshaller-aliases type ToEntityMarshaller[T] = Marshaller[T, MessageEntity] + type ToByteStringMarshaller[T] = Marshaller[T, ByteString] type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)] type ToResponseMarshaller[T] = Marshaller[T, HttpResponse] type ToRequestMarshaller[T] = Marshaller[T, HttpRequest] diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala index b92bb8d97ad..489ea7375a6 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala @@ -36,5 +36,6 @@ trait Directives extends RouteConcatenation with SchemeDirectives with SecurityDirectives with WebSocketDirectives + with FramedEntityStreamingDirectives object Directives extends Directives diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala new file mode 100644 index 00000000000..dc4a5ab0015 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.http.scaladsl.server.directives + +import akka.NotUsed +import akka.http.scaladsl.common +import akka.http.scaladsl.common.EntityStreamingSupport +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.unmarshalling.{ Unmarshaller, _ } +import akka.http.scaladsl.util.FastFuture +import akka.stream.scaladsl.{ Flow, Keep, Source } +import akka.util.ByteString + +import scala.language.implicitConversions + +/** + * Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements. + * + * See [[common.EntityStreamingSupport]] for useful default framing `Flow` instances and + * support traits such as `SprayJsonSupport` (or your other favourite JSON library) to provide the needed [[Marshaller]] s. + */ +trait FramedEntityStreamingDirectives extends MarshallingDirectives { + + type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]] + + /** + * Extracts entity as [[Source]] of elements of type `T`. + * This is achieved by applying the implicitly provided (in the following order): + * + * - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing + * - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). + * + * The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if + * its [[ContentType]] is not supported by the used `framing` or `unmarshaller`. + * + * Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`). + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOf[T](implicit um: FromByteStringUnmarshaller[T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] = + asSourceOfInternal(um, support) + + /** + * Extracts entity as [[Source]] of elements of type `T`. + * This is achieved by applying the implicitly provided (in the following order): + * + * - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing + * - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). + * + * The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if + * its [[ContentType]] is not supported by the used `framing` or `unmarshaller`. + * + * Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`). + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOf[T](support: EntityStreamingSupport)(implicit um: FromByteStringUnmarshaller[T]): RequestToSourceUnmarshaller[T] = + asSourceOfInternal(um, support) + + // format: OFF + private final def asSourceOfInternal[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] = + Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ + val entity = req.entity + if (support.supported.matches(entity.contentType)) { + val bytes = entity.dataBytes + val frames = bytes.via(support.framingDecoder) + val marshalling = + if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs => um(bs)(ec, mat)) + else Flow[ByteString].mapAsync(support.parallelism)(bs => um(bs)(ec, mat)) + + val elements = frames.viaMat(marshalling)(Keep.right) + FastFuture.successful(elements) + + } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported)) + } + // format: ON + +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/PredefinedFromStringUnmarshallers.scala b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/PredefinedFromStringUnmarshallers.scala index 6b6ac6b96ab..9f44f677627 100755 --- a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/PredefinedFromStringUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/PredefinedFromStringUnmarshallers.scala @@ -6,9 +6,15 @@ package akka.http.scaladsl.unmarshalling import scala.collection.immutable import akka.http.scaladsl.util.FastFuture +import akka.util.ByteString trait PredefinedFromStringUnmarshallers { + implicit def _fromStringUnmarshallerFromByteStringUnmarshaller[T](implicit bsum: FromByteStringUnmarshaller[T]): Unmarshaller[String, T] = { + val bs = Unmarshaller.strict[String, ByteString](s ⇒ ByteString(s)) + bs.flatMap(implicit ec ⇒ implicit mat ⇒ bsum(_)) + } + implicit val byteFromStringUnmarshaller: Unmarshaller[String, Byte] = numberUnmarshaller(_.toByte, "8-bit signed integer") diff --git a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/package.scala b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/package.scala index 3870019cbd8..46dbbce95a9 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/package.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/package.scala @@ -6,6 +6,7 @@ package akka.http.scaladsl import akka.http.scaladsl.common.StrictForm import akka.http.scaladsl.model._ +import akka.util.ByteString package object unmarshalling { //# unmarshaller-aliases @@ -13,6 +14,7 @@ package object unmarshalling { type FromMessageUnmarshaller[T] = Unmarshaller[HttpMessage, T] type FromResponseUnmarshaller[T] = Unmarshaller[HttpResponse, T] type FromRequestUnmarshaller[T] = Unmarshaller[HttpRequest, T] + type FromByteStringUnmarshaller[T] = Unmarshaller[ByteString, T] type FromStringUnmarshaller[T] = Unmarshaller[String, T] type FromStrictFormFieldUnmarshaller[T] = Unmarshaller[StrictForm.Field, T] //# diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala new file mode 100644 index 00000000000..57bfa79349b --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -0,0 +1,440 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorMaterializer +import akka.stream.impl.JsonObjectParser +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.AkkaSpec +import akka.util.ByteString + +import scala.collection.immutable.Seq +import scala.concurrent.Await +import scala.concurrent.duration._ + +class JsonFramingSpec extends AkkaSpec { + + implicit val mat = ActorMaterializer() + + "collecting multiple json" should { + "parse json array" in { + val input = + """ + |[ + | { "name" : "john" }, + | { "name" : "Ég get etið gler án þess að meiða mig" }, + | { "name" : "jack" }, + |] + |""".stripMargin // also should complete once notices end of array + + val result = Source.single(ByteString(input)) + .via(JsonFraming.objectScanner(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + result.futureValue shouldBe Seq( + """{ "name" : "john" }""", + """{ "name" : "Ég get etið gler án þess að meiða mig" }""", + """{ "name" : "jack" }""" + ) + } + + "emit single json element from string" in { + val input = + """| { "name": "john" } + | { "name": "jack" } + """.stripMargin + + val result = Source.single(ByteString(input)) + .via(JsonFraming.objectScanner(Int.MaxValue)) + .take(1) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + Await.result(result, 3.seconds) shouldBe Seq("""{ "name": "john" }""") + } + + "parse line delimited" in { + val input = + """| { "name": "john" } + | { "name": "jack" } + | { "name": "katie" } + """.stripMargin + + val result = Source.single(ByteString(input)) + .via(JsonFraming.objectScanner(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + Await.result(result, 3.seconds) shouldBe Seq( + """{ "name": "john" }""", + """{ "name": "jack" }""", + """{ "name": "katie" }""") + } + + "parse comma delimited" in { + val input = + """ { "name": "john" }, { "name": "jack" }, { "name": "katie" } """ + + val result = Source.single(ByteString(input)) + .via(JsonFraming.objectScanner(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + result.futureValue shouldBe Seq( + """{ "name": "john" }""", + """{ "name": "jack" }""", + """{ "name": "katie" }""") + } + + "parse chunks successfully" in { + val input: Seq[ByteString] = Seq( + """ + |[ + | { "name": "john"""".stripMargin, + """ + |}, + """.stripMargin, + """{ "na""", + """me": "jack""", + """"}]"""").map(ByteString(_)) + + val result = Source.apply(input) + .via(JsonFraming.objectScanner(Int.MaxValue)) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) + } + + result.futureValue shouldBe Seq( + """{ "name": "john" + |}""".stripMargin, + """{ "name": "jack"}""") + } + } + + "collecting json buffer" when { + "nothing is supplied" should { + "return nothing" in { + val buffer = new JsonObjectParser() + buffer.poll() should ===(None) + } + } + + "valid json is supplied" which { + "has one object" should { + "successfully parse empty object" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""{}""")) + buffer.poll().get.utf8String shouldBe """{}""" + } + + "successfully parse single field having string value" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""{ "name": "john"}""")) + buffer.poll().get.utf8String shouldBe """{ "name": "john"}""" + } + + "successfully parse single field having string value containing space" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""{ "name": "john doe"}""")) + buffer.poll().get.utf8String shouldBe """{ "name": "john doe"}""" + } + + "successfully parse single field having string value containing curly brace" in { + val buffer = new JsonObjectParser() + + buffer.offer(ByteString("""{ "name": "john{""")) + buffer.offer(ByteString("}")) + buffer.offer(ByteString("\"")) + buffer.offer(ByteString("}")) + + buffer.poll().get.utf8String shouldBe """{ "name": "john{}"}""" + } + + "successfully parse single field having string value containing curly brace and escape character" in { + val buffer = new JsonObjectParser() + + buffer.offer(ByteString("""{ "name": "john""")) + buffer.offer(ByteString("\\\"")) + buffer.offer(ByteString("{")) + buffer.offer(ByteString("}")) + buffer.offer(ByteString("\\\"")) + buffer.offer(ByteString(" ")) + buffer.offer(ByteString("hey")) + buffer.offer(ByteString("\"")) + + buffer.offer(ByteString("}")) + buffer.poll().get.utf8String shouldBe """{ "name": "john\"{}\" hey"}""" + } + + "successfully parse single field having integer value" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""{ "age": 101}""")) + buffer.poll().get.utf8String shouldBe """{ "age": 101}""" + } + + "successfully parse single field having decimal value" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""{ "age": 101}""")) + buffer.poll().get.utf8String shouldBe """{ "age": 101}""" + } + + "successfully parse single field having nested object" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString( + """ + |{ "name": "john", + | "age": 101, + | "address": { + | "street": "Straight Street", + | "postcode": 1234 + | } + |} + | """.stripMargin)) + buffer.poll().get.utf8String shouldBe """{ "name": "john", + | "age": 101, + | "address": { + | "street": "Straight Street", + | "postcode": 1234 + | } + |}""".stripMargin + } + + "successfully parse single field having multiple level of nested object" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString( + """ + |{ "name": "john", + | "age": 101, + | "address": { + | "street": { + | "name": "Straight", + | "type": "Avenue" + | }, + | "postcode": 1234 + | } + |} + | """.stripMargin)) + buffer.poll().get.utf8String shouldBe """{ "name": "john", + | "age": 101, + | "address": { + | "street": { + | "name": "Straight", + | "type": "Avenue" + | }, + | "postcode": 1234 + | } + |}""".stripMargin + } + } + + "has nested array" should { + "successfully parse" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString( + """ + |{ "name": "john", + | "things": [ + | 1, + | "hey", + | 3, + | "there" + | ] + |} + | """.stripMargin)) + buffer.poll().get.utf8String shouldBe """{ "name": "john", + | "things": [ + | 1, + | "hey", + | 3, + | "there" + | ] + |}""".stripMargin + } + } + + "has complex object graph" should { + "successfully parse" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString( + """ + |{ + | "name": "john", + | "addresses": [ + | { + | "street": "3 Hopson Street", + | "postcode": "ABC-123", + | "tags": ["work", "office"], + | "contactTime": [ + | {"time": "0900-1800", "timezone", "UTC"} + | ] + | }, + | { + | "street": "12 Adielie Road", + | "postcode": "ZZY-888", + | "tags": ["home"], + | "contactTime": [ + | {"time": "0800-0830", "timezone", "UTC"}, + | {"time": "1800-2000", "timezone", "UTC"} + | ] + | } + | ] + |} + | """.stripMargin)) + + buffer.poll().get.utf8String shouldBe """{ + | "name": "john", + | "addresses": [ + | { + | "street": "3 Hopson Street", + | "postcode": "ABC-123", + | "tags": ["work", "office"], + | "contactTime": [ + | {"time": "0900-1800", "timezone", "UTC"} + | ] + | }, + | { + | "street": "12 Adielie Road", + | "postcode": "ZZY-888", + | "tags": ["home"], + | "contactTime": [ + | {"time": "0800-0830", "timezone", "UTC"}, + | {"time": "1800-2000", "timezone", "UTC"} + | ] + | } + | ] + |}""".stripMargin + } + } + + "has multiple fields" should { + "parse successfully" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""{ "name": "john", "age": 101}""")) + buffer.poll().get.utf8String shouldBe """{ "name": "john", "age": 101}""" + } + + "parse successfully despite valid whitespaces around json" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString( + """ + | + | + |{"name": "john" + |, "age": 101}""".stripMargin)) + buffer.poll().get.utf8String shouldBe + """{"name": "john" + |, "age": 101}""".stripMargin + } + } + + "has multiple objects" should { + "pops the right object as buffer is filled" in { + val input = + """ + | { + | "name": "john", + | "age": 32 + | }, + | { + | "name": "katie", + | "age": 25 + | } + """.stripMargin + + val buffer = new JsonObjectParser() + buffer.offer(ByteString(input)) + + buffer.poll().get.utf8String shouldBe + """{ + | "name": "john", + | "age": 32 + | }""".stripMargin + buffer.poll().get.utf8String shouldBe + """{ + | "name": "katie", + | "age": 25 + | }""".stripMargin + buffer.poll() should ===(None) + + buffer.offer(ByteString("""{"name":"jenkins","age": """)) + buffer.poll() should ===(None) + + buffer.offer(ByteString("65 }")) + buffer.poll().get.utf8String shouldBe """{"name":"jenkins","age": 65 }""" + } + } + + "returns none until valid json is encountered" in { + val buffer = new JsonObjectParser() + + """{ "name": "john"""".foreach { + c ⇒ + buffer.offer(ByteString(c)) + buffer.poll() should ===(None) + } + + buffer.offer(ByteString("}")) + buffer.poll().get.utf8String shouldBe """{ "name": "john"}""" + } + + "invalid json is supplied" should { + "fail if it's broken from the start" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""THIS IS NOT VALID { "name": "john"}""")) + a[FramingException] shouldBe thrownBy { buffer.poll() } + } + + "fail if it's broken at the end" in { + val buffer = new JsonObjectParser() + buffer.offer(ByteString("""{ "name": "john"} THIS IS NOT VALID""")) + buffer.poll() // first emitting the valid element + a[FramingException] shouldBe thrownBy { buffer.poll() } + } + } + } + + "fail on too large initial object" in { + val input = + """ + | { "name": "john" }, { "name": "jack" } + """.stripMargin + + val result = Source.single(ByteString(input)) + .via(JsonFraming.objectScanner(5)).map(_.utf8String) + .runFold(Seq.empty[String]) { + case (acc, entry) ⇒ acc ++ Seq(entry) + } + + a[FramingException] shouldBe thrownBy { + Await.result(result, 3.seconds) + } + } + + "fail when 2nd object is too large" in { + val input = List( + """{ "name": "john" }""", + """{ "name": "jack" }""", + """{ "name": "very very long name somehow. how did this happen?" }""").map(s ⇒ ByteString(s)) + + val probe = Source(input) + .via(JsonFraming.objectScanner(48)) + .runWith(TestSink.probe) + + probe.ensureSubscription() + probe + .request(1) + .expectNext(ByteString("""{ "name": "john" }""")) + .request(1) + .expectNext(ByteString("""{ "name": "jack" }""")) + .request(1) + .expectError().getMessage should include("exceeded") + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala new file mode 100644 index 00000000000..ed779646073 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.scaladsl.Framing.FramingException +import akka.util.ByteString + +import scala.annotation.switch + +/** + * INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead. + */ +private[akka] object JsonObjectParser { + + final val SquareBraceStart = '['.toByte + final val SquareBraceEnd = ']'.toByte + final val CurlyBraceStart = '{'.toByte + final val CurlyBraceEnd = '}'.toByte + final val DoubleQuote = '\''.toByte + final val Backslash = '\\'.toByte + final val Comma = ','.toByte + + final val LineBreak = '\n'.toByte + final val LineBreak2 = '\r'.toByte + final val Tab = '\t'.toByte + final val Space = ' '.toByte + + final val Whitespace = Set(LineBreak, LineBreak2, Tab, Space) + + def isWhitespace(input: Byte): Boolean = + Whitespace.contains(input) + +} + +/** + * INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead. + * + * **Mutable** framing implementation that given any number of [[ByteString]] chunks, can emit JSON objects contained within them. + * Typically JSON objects are separated by new-lines or comas, however a top-level JSON Array can also be understood and chunked up + * into valid JSON objects by this framing implementation. + * + * Leading whitespace between elements will be trimmed. + */ +private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) { + import JsonObjectParser._ + + private var buffer: ByteString = ByteString.empty + + private var pos = 0 // latest position of pointer while scanning for json object end + private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc) + private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted + + private var charsInObject = 0 + private var completedObject = false + private var inStringExpression = false + private var isStartOfEscapeSequence = false + + /** + * Appends input ByteString to internal byte string buffer. + * Use [[poll]] to extract contained JSON objects. + */ + def offer(input: ByteString): Unit = + buffer ++= input + + def isEmpty: Boolean = buffer.isEmpty + + /** + * Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found. + * May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded. + */ + def poll(): Option[ByteString] = { + val foundObject = seekObject() + if (!foundObject) None + else + (pos: @switch) match { + case -1 | 0 ⇒ None + case _ ⇒ + val (emit, buf) = buffer.splitAt(pos) + buffer = buf.compact + pos = 0 + + val tf = trimFront + trimFront = 0 + + if (tf == 0) Some(emit) + else { + val trimmed = emit.drop(tf) + if (trimmed.isEmpty) None + else Some(trimmed) + } + } + } + + /** @return true if an entire valid JSON object was found, false otherwise */ + private def seekObject(): Boolean = { + completedObject = false + val bufSize = buffer.size + while (pos != -1 && (pos < bufSize && pos < maximumObjectLength) && !completedObject) + proceed(buffer(pos)) + + if (pos >= maximumObjectLength) + throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""") + + completedObject + } + + private def proceed(input: Byte): Unit = + if (input == SquareBraceStart && outsideObject) { + // outer object is an array + pos += 1 + trimFront += 1 + } else if (input == SquareBraceEnd && outsideObject) { + // outer array completed! + pos = -1 + } else if (input == Comma && outsideObject) { + // do nothing + pos += 1 + trimFront += 1 + } else if (input == Backslash) { + isStartOfEscapeSequence = true + pos += 1 + } else if (input == DoubleQuote) { + if (!isStartOfEscapeSequence) inStringExpression = !inStringExpression + isStartOfEscapeSequence = false + pos += 1 + } else if (input == CurlyBraceStart && !inStringExpression) { + isStartOfEscapeSequence = false + depth += 1 + pos += 1 + } else if (input == CurlyBraceEnd && !inStringExpression) { + isStartOfEscapeSequence = false + depth -= 1 + pos += 1 + if (depth == 0) { + charsInObject = 0 + completedObject = true + } + } else if (isWhitespace(input) && !inStringExpression) { + pos += 1 + if (depth == 0) trimFront += 1 + } else if (insideObject) { + isStartOfEscapeSequence = false + pos += 1 + } else { + throw new FramingException(s"Invalid JSON encountered at position [$pos] of [$buffer]") + } + + @inline private final def insideObject: Boolean = + !outsideObject + + @inline private final def outsideObject: Boolean = + depth == 0 + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala new file mode 100644 index 00000000000..4bad96f7906 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.javadsl + +import akka.NotUsed +import akka.util.ByteString + +/** Provides JSON framing stages that can separate valid JSON objects from incoming [[akka.util.ByteString]] objects. */ +object JsonFraming { + + /** + * Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks. + * + * Typical examples of data that one may want to frame using this stage include: + * + * **Very large arrays**: + * {{{ + * [{"id": 1}, {"id": 2}, [...], {"id": 999}] + * }}} + * + * **Multiple concatenated JSON objects** (with, or without commas between them): + * + * {{{ + * {"id": 1}, {"id": 2}, [...], {"id": 999} + * }}} + * + * The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two + * elements are separated by multiple newlines or other whitespace characters. And of course is insensitive + * (and does not impact the emitting frame) to the JSON object's internal formatting. + * + * Framing raw JSON values (such as integers or strings) is supported as well. + * + * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded + * this Flow will fail the stream. + */ + def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = + akka.stream.scaladsl.JsonFraming.objectScanner(maximumObjectLength).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala new file mode 100644 index 00000000000..f7c7aeb50e9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.Attributes +import akka.stream.impl.JsonObjectParser +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.stage.{ InHandler, OutHandler, GraphStageLogic } +import akka.util.ByteString + +import scala.util.control.NonFatal + +/** Provides JSON framing stages that can separate valid JSON objects from incoming [[ByteString]] objects. */ +object JsonFraming { + + /** + * Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks. + * It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks. + * + * Typical examples of data that one may want to frame using this stage include: + * + * **Very large arrays**: + * {{{ + * [{"id": 1}, {"id": 2}, [...], {"id": 999}] + * }}} + * + * **Multiple concatenated JSON objects** (with, or without commas between them): + * + * {{{ + * {"id": 1}, {"id": 2}, [...], {"id": 999} + * }}} + * + * The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two + * elements are separated by multiple newlines or other whitespace characters. And of course is insensitive + * (and does not impact the emitting frame) to the JSON object's internal formatting. + * + * Framing raw JSON values (such as integers or strings) is supported as well. + * + * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded + * this Flow will fail the stream. + */ + def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] { + private[this] val buffer = new JsonObjectParser(maximumObjectLength) + + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) + + override def onPush(): Unit = { + buffer.offer(grab(in)) + tryPopBuffer() + } + + override def onPull(): Unit = + tryPopBuffer() + + override def onUpstreamFinish(): Unit = { + try buffer.poll() match { + case Some(json) ⇒ emit(out, json, () ⇒ completeStage()) + case _ ⇒ completeStage() + } + } + + def tryPopBuffer() = { + try buffer.poll() match { + case Some(json) ⇒ push(out, json) + case _ ⇒ if (isClosed(in)) completeStage() else pull(in) + } catch { + case NonFatal(ex) ⇒ failStage(ex) + } + } + } + }).named("JsonFraming.objectScanner") + +}