Skip to content

Commit

Permalink
Merge pull request #20778 from ktoso/revival-of-the-undead-json-strea…
Browse files Browse the repository at this point in the history
…ming-of-doom-ktoso

+htp #18837 JSON framing and framed entity streaming directives
  • Loading branch information
ktoso committed Aug 2, 2016
2 parents 5783387 + 8a1f8f2 commit a712f01
Show file tree
Hide file tree
Showing 43 changed files with 2,323 additions and 61 deletions.
Expand Up @@ -91,7 +91,7 @@ object ConsistentHashingRouter {
* INTERNAL API
*/
private[akka] def hashMappingAdapter(mapper: ConsistentHashMapper): ConsistentHashMapping = {
case message if (mapper.hashKey(message).asInstanceOf[AnyRef] ne null)
case message if mapper.hashKey(message).asInstanceOf[AnyRef] ne null
mapper.hashKey(message)
}

Expand Down
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream

import java.util.concurrent.TimeUnit

import akka.stream.impl.JsonObjectParser
import akka.util.ByteString
import org.openjdk.jmh.annotations._

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class JsonFramingBenchmark {

/*
Benchmark Mode Cnt Score Error Units
// old
JsonFramingBenchmark.collecting_1 thrpt 20 81.476 ± 14.793 ops/s
JsonFramingBenchmark.collecting_offer_5 thrpt 20 20.187 ± 2.291 ops/s
// new
JsonFramingBenchmark.counting_1 thrpt 20 10766.738 ± 1278.300 ops/s
JsonFramingBenchmark.counting_offer_5 thrpt 20 28798.255 ± 2670.163 ops/s
*/

val json =
ByteString(
"""|{"fname":"Frank","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin
)

val bracket = new JsonObjectParser

@Setup(Level.Invocation)
def init(): Unit = {
bracket.offer(json)
}

@Benchmark
def counting_1: ByteString =
bracket.poll().get

@Benchmark
@OperationsPerInvocation(5)
def counting_offer_5: ByteString = {
bracket.offer(json)
bracket.poll().get
bracket.poll().get
bracket.poll().get
bracket.poll().get
bracket.poll().get
bracket.poll().get
}

}
@@ -0,0 +1,179 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/

package docs.http.javadsl.server;

import akka.NotUsed;
import akka.http.javadsl.common.CsvEntityStreamingSupport;
import akka.http.javadsl.common.JsonEntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.marshalling.Marshaller;
import akka.http.javadsl.model.*;
import akka.http.javadsl.model.headers.Accept;
import akka.http.javadsl.server.*;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.javadsl.testkit.TestRoute;
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.junit.Test;

import java.util.concurrent.CompletionStage;

public class JsonStreamingExamplesTest extends JUnitRouteTest {

//#routes
final Route tweets() {
//#formats
final Unmarshaller<ByteString, JavaTweet> JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class);
//#formats

//#response-streaming

// Step 1: Enable JSON streaming
// we're not using this in the example, but it's the simplest way to start:
// The default rendering is a JSON array: `[el, el, el , ...]`
final JsonEntityStreamingSupport jsonStreaming = EntityStreamingSupport.json();

// Step 1.1: Enable and customise how we'll render the JSON, as a compact array:
final ByteString start = ByteString.fromString("[");
final ByteString between = ByteString.fromString(",");
final ByteString end = ByteString.fromString("]");
final Flow<ByteString, ByteString, NotUsed> compactArrayRendering =
Flow.of(ByteString.class).intersperse(start, between, end);

final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json()
.withFramingRendererFlow(compactArrayRendering);


// Step 2: implement the route
final Route responseStreaming = path("tweets", () ->
get(() ->
parameter(StringUnmarshallers.INTEGER, "n", n -> {
final Source<JavaTweet, NotUsed> tws =
Source.repeat(new JavaTweet(12, "Hello World!")).take(n);

// Step 3: call complete* with your source, marshaller, and stream rendering mode
return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport);
})
)
);
//#response-streaming

//#incoming-request-streaming
final Route incomingStreaming = path("tweets", () ->
post(() ->
extractMaterializer(mat -> {
final JsonEntityStreamingSupport jsonSupport = EntityStreamingSupport.json();

return entityAsSourceOf(JavaTweets, jsonSupport, sourceOfTweets -> {
final CompletionStage<Integer> tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat);
return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c));
});
}
)
)
);
//#incoming-request-streaming

return responseStreaming.orElse(incomingStreaming);
}

final Route csvTweets() {
//#csv-example
final Marshaller<JavaTweet, ByteString> renderAsCsv =
Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t ->
ByteString.fromString(t.getId() + "," + t.getMessage())
);

final CsvEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.csv();

final Route responseStreaming = path("tweets", () ->
get(() ->
parameter(StringUnmarshallers.INTEGER, "n", n -> {
final Source<JavaTweet, NotUsed> tws =
Source.repeat(new JavaTweet(12, "Hello World!")).take(n);
return completeWithSource(tws, renderAsCsv, compactJsonSupport);
})
)
);
//#csv-example

return responseStreaming;
}
//#routes

@Test
public void getTweetsTest() {
//#response-streaming
// tests:
final TestRoute routes = testRoute(tweets());

// test happy path
final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication))
.assertStatusCode(200)
.assertEntity("[{\"id\":12,\"message\":\"Hello World!\"},{\"id\":12,\"message\":\"Hello World!\"}]");

// test responses to potential errors
final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT);
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
.assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
.assertEntity("Resource representation is only available with these types:\napplication/json");
//#response-streaming
}

@Test
public void csvExampleTweetsTest() {
//#response-streaming
// tests --------------------------------------------
final TestRoute routes = testRoute(csvTweets());

// test happy path
final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv))
.assertStatusCode(200)
.assertEntity("12,Hello World!\n" +
"12,Hello World!");

// test responses to potential errors
final Accept acceptText = Accept.create(MediaRanges.ALL_APPLICATION);
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
.assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
.assertEntity("Resource representation is only available with these types:\ntext/csv; charset=UTF-8");
//#response-streaming
}

//#models
private static final class JavaTweet {
private int id;
private String message;

public JavaTweet(int id, String message) {
this.id = id;
this.message = message;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public void setMessage(String message) {
this.message = message;
}

public String getMessage() {
return message;
}

}
//#models
}
2 changes: 1 addition & 1 deletion akka-docs/rst/java/http/routing-dsl/index.rst
Expand Up @@ -18,6 +18,7 @@ To use the high-level API you need to add a dependency to the ``akka-http-experi
directives/index
marshalling
exception-handling
source-streaming-support
rejections
testkit

Expand Down Expand Up @@ -51,7 +52,6 @@ in the :ref:`exception-handling-java` section of the documtnation. You can use t

File uploads
^^^^^^^^^^^^
TODO not possible in Java DSL since there

For high level directives to handle uploads see the :ref:`FileUploadDirectives-java`.

Expand Down
91 changes: 91 additions & 0 deletions akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst
@@ -0,0 +1,91 @@
.. _json-streaming-java:

Source Streaming
================

Akka HTTP supports completing a request with an Akka ``Source<T, _>``, which makes it possible to easily build
and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack.

It is possible to complete requests with raw ``Source<ByteString, _>``, however often it is more convenient to
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
or CSV stream (where each element is separated by a new-line).

In the following sections we investigate how to make use of the JSON Streaming infrastructure,
however the general hints apply to any kind of element-by-element streaming you could imagine.

JSON Streaming
==============

`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
objects as a continuous HTTP request or response. The elements are most often separated using newlines,
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
use case.

In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as:

.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#models

.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming

Responding with JSON Streams
----------------------------

In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_.

Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an
Akka Streams ``Source<T,_>``. Here we'll use the ``Jackson`` helper class from ``akka-http-jackson`` (a separate library
that you should add as a dependency if you want to use Jackson with Akka HTTP).

First we enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 1).

The default mode of rendering a ``Source`` is to represent it as an JSON Array. If you want to change this representation
for example to use Twitter style new-line separated JSON objects, you can do so by configuring the support trait accordingly.

In Step 1.1. we demonstrate to configure configude the rendering to be new-line separated, and also how parallel marshalling
can be applied. We configure the Support object to render the JSON as series of new-line separated JSON objects,
simply by providing the ``start``, ``sep`` and ``end`` ByteStrings, which will be emitted at the apropriate
places in the rendered stream. Although this format is *not* valid JSON, it is pretty popular since parsing it is relatively
simple - clients need only to find the new-lines and apply JSON unmarshalling for an entire line of JSON.

The final step is simply completing a request using a Source of tweets, as simple as that:

.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#response-streaming

.. _Streaming API: https://dev.twitter.com/streaming/overview

Consuming JSON Streaming uploads
--------------------------------

Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with
the server and is feeding it with one line of measurement data.

In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure
will be applied automatically thanks to using Akka HTTP/Streams).

.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#formats

.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#incoming-request-streaming


Simple CSV streaming example
----------------------------

Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values).
For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming
modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available,
and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see
an error during runtime that the marshaller did not expose the expected content type and thus we can not render
the streaming response).

.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#csv-example

Implementing custom EntityStreamingSupport traits
-------------------------------------------------

The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type
or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller<T, ByteString>``
instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).

When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class,
and implement all of it's methods. It's best to use the existing implementations as a guideline.
2 changes: 1 addition & 1 deletion akka-docs/rst/java/typed-actors.rst
Expand Up @@ -25,7 +25,7 @@ lies in interfacing between private sphere and the public, but you don’t want
that many doors inside your house, do you? For a longer discussion see `this
blog post <http://letitcrash.com/post/19074284309/when-to-use-typedactors>`_.

A bit more background: TypedActors can very easily be abused as RPC, and that
A bit more background: TypedActors can easily be abused as RPC, and that
is an abstraction which is `well-known
<http://doc.akka.io/docs/misc/smli_tr-94-29.pdf>`_
to be leaky. Hence TypedActors are not what we think of first when we talk
Expand Down

0 comments on commit a712f01

Please sign in to comment.