Skip to content

Commit

Permalink
Support of Java 8 Stream Decoder and Jackson Iterator Decoder (#651)
Browse files Browse the repository at this point in the history
* Support of Java 8 Stream Decoder and Jackson Iterator Decoder

Signed-off-by: phymbert <pierrick.hymbert@gmail.com>

* Removed class javadoc to make license plugin happy

Signed-off-by: phymbert <pierrick.hymbert@gmail.com>

* Fixed build failed cause of license missing

Signed-off-by: phymbert <pierrick.hymbert@gmail.com>

* - smaller highlighted changelog
- make decoder type implementation final
- change inner types and constructors visibility to package private
- fix non static inner class
- remove useless Factory inner class
- unit test JacksonIterator

Signed-off-by: phymbert <pierrick.hymbert@gmail.com>

* - Revert deleted groupId tag in benchmark
- Fix code style on StreamDecoder
- Add unit test to verify iterator is closed if stream is closed
- Remove any characteristics to the returned stream

Signed-off-by: phymbert <pierrick.hymbert@gmail.com>

* Benchmark:
 - updated with latest factory methods
 - do not duplicate groupId

Signed-off-by: phymbert <pierrick.hymbert@gmail.com>
  • Loading branch information
phymbert authored and velo committed Mar 9, 2018
1 parent 9c72569 commit 861bb80
Show file tree
Hide file tree
Showing 11 changed files with 830 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,5 +1,6 @@
### Version 9.6 ### Version 9.6
* Feign builder now supports flag `doNotCloseAfterDecode` to support lazy iteration of responses. * Feign builder now supports flag `doNotCloseAfterDecode` to support lazy iteration of responses.
* Adds `JacksonIteratorDecoder` and `StreamDecoder` to decode responses as `java.util.Iterator` or `java.util.stream.Stream`.


### Version 9.5.1 ### Version 9.5.1
* When specified, Content-Type header is now included on OkHttp requests lacking a body. * When specified, Content-Type header is now included on OkHttp requests lacking a body.
Expand Down
27 changes: 20 additions & 7 deletions benchmark/pom.xml
Expand Up @@ -24,28 +24,41 @@
<version>9</version> <version>9</version>
</parent> </parent>


<!-- TODO: change group id when 9.0 is released --> <groupId>io.github.openfeign</groupId>
<groupId>com.netflix.feign</groupId>
<artifactId>feign-benchmark</artifactId> <artifactId>feign-benchmark</artifactId>
<packaging>jar</packaging> <version>9.6.0-SNAPSHOT</version>
<version>8.1.0-SNAPSHOT</version>
<name>Feign Benchmark (JMH)</name> <name>Feign Benchmark (JMH)</name>


<properties> <properties>
<jmh.version>1.11.2</jmh.version> <jmh.version>1.20</jmh.version>
<!-- override default bytecode version for src/main from parent pom -->
<main.java.version>1.8</main.java.version>
<main.signature.artifact>java18</main.signature.artifact>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties> </properties>


<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.netflix.feign</groupId> <groupId>${project.groupId}</groupId>
<artifactId>feign-core</artifactId> <artifactId>feign-core</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.netflix.feign</groupId> <groupId>${project.groupId}</groupId>
<artifactId>feign-okhttp</artifactId> <artifactId>feign-okhttp</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>feign-jackson</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>feign-java8</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.squareup.okhttp</groupId> <groupId>com.squareup.okhttp</groupId>
<artifactId>mockwebserver</artifactId> <artifactId>mockwebserver</artifactId>
Expand Down
124 changes: 124 additions & 0 deletions benchmark/src/main/java/feign/benchmark/DecoderIteratorsBenchmark.java
@@ -0,0 +1,124 @@
/**
* Copyright 2012-2018 The Feign Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package feign.benchmark;

import com.fasterxml.jackson.core.type.TypeReference;
import feign.Response;
import feign.Util;
import feign.codec.Decoder;
import feign.jackson.JacksonDecoder;
import feign.jackson.JacksonIteratorDecoder;
import feign.stream.StreamDecoder;
import org.openjdk.jmh.annotations.*;

import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
* This test shows up how fast different json array response processing implementations are.
*/
@State(Scope.Thread)
public class DecoderIteratorsBenchmark {

@Param({"list", "iterator", "stream"})
private String api;

@Param({"10", "100"})
private String size;

private Response response;

private Decoder decoder;
private Type type;

@Benchmark
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public void decode() throws Exception {
fetch(decoder.decode(response, type));
}

@SuppressWarnings("unchecked")
private void fetch(Object o) {
Iterator<Car> cars;

if (o instanceof Collection) {
cars = ((Collection<Car>) o).iterator();
} else if (o instanceof Stream) {
cars = ((Stream<Car>) o).iterator();
} else {
cars = (Iterator<Car>) o;
}

while (cars.hasNext()) {
cars.next();
}
}

@Setup(Level.Invocation)
public void buildResponse() {
response = Response.builder()
.status(200)
.reason("OK")
.headers(Collections.emptyMap())
.body(carsJson(Integer.valueOf(size)), Util.UTF_8)
.build();
}

@Setup(Level.Trial)
public void buildDecoder() {
switch (api) {
case "list":
decoder = new JacksonDecoder();
type = new TypeReference<List<Car>>() {
}.getType();
break;
case "iterator":
decoder = JacksonIteratorDecoder.create();
type = new TypeReference<Iterator<Car>>() {
}.getType();
break;
case "stream":
decoder = StreamDecoder.create(JacksonIteratorDecoder.create());
type = new TypeReference<Stream<Car>>() {
}.getType();
break;
default:
throw new IllegalStateException("Unknown api: " + api);
}
}

private String carsJson(int count) {
String car = "{\"name\":\"c4\",\"manufacturer\":\"Citroën\"}";
StringBuilder builder = new StringBuilder("[");
builder.append(car);
for (int i = 1; i < count; i++) {
builder.append(",").append(car);
}
return builder.append("]").toString();
}

static class Car {
public String name;
public String manufacturer;
}
}
Expand Up @@ -64,7 +64,7 @@ public rx.Observable handle(HttpServerRequest<ByteBuf> request,
}); });
server.start(); server.start();
client = new OkHttpClient(); client = new OkHttpClient();
client.setRetryOnConnectionFailure(false); client.retryOnConnectionFailure();
okFeign = Feign.builder() okFeign = Feign.builder()
.client(new feign.okhttp.OkHttpClient(client)) .client(new feign.okhttp.OkHttpClient(client))
.target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT); .target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT);
Expand All @@ -82,8 +82,8 @@ public void tearDown() throws InterruptedException {
* How fast can we execute get commands synchronously? * How fast can we execute get commands synchronously?
*/ */
@Benchmark @Benchmark
public com.squareup.okhttp.Response query_baseCaseUsingOkHttp() throws IOException { public okhttp3.Response query_baseCaseUsingOkHttp() throws IOException {
com.squareup.okhttp.Response result = client.newCall(queryRequest).execute(); okhttp3.Response result = client.newCall(queryRequest).execute();
result.body().close(); result.body().close();
return result; return result;
} }
Expand Down
164 changes: 164 additions & 0 deletions jackson/src/main/java/feign/jackson/JacksonIteratorDecoder.java
@@ -0,0 +1,164 @@
/**
* Copyright 2012-2018 The Feign Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package feign.jackson;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.*;
import feign.Response;
import feign.Util;
import feign.codec.DecodeException;
import feign.codec.Decoder;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Iterator;

import static feign.Util.ensureClosed;

/**
* Jackson decoder which return a closeable iterator.
* Returned iterator auto-close the {@code Response} when it reached json array end or failed to parse stream.
* If this iterator is not fetched till the end, it has to be casted to {@code Closeable} and explicity {@code Closeable#close} by the consumer.
* <p>
* <p>
* <p>Example: <br>
* <pre><code>
* Feign.builder()
* .decoder(JacksonIteratorDecoder.create())
* .doNotCloseAfterDecode() // Required to fetch the iterator after the response is processed, need to be close
* .target(GitHub.class, "https://api.github.com");
* interface GitHub {
* {@literal @}RequestLine("GET /repos/{owner}/{repo}/contributors")
* Iterator<Contributor> contributors(@Param("owner") String owner, @Param("repo") String repo);
* }</code></pre>
*/
public final class JacksonIteratorDecoder implements Decoder {

private final ObjectMapper mapper;

JacksonIteratorDecoder(ObjectMapper mapper) {
this.mapper = mapper;
}

@Override
public Object decode(Response response, Type type) throws IOException {
if (response.status() == 404) return Util.emptyValueOf(type);
if (response.body() == null) return null;
Reader reader = response.body().asReader();
if (!reader.markSupported()) {
reader = new BufferedReader(reader, 1);
}
try {
// Read the first byte to see if we have any data
reader.mark(1);
if (reader.read() == -1) {
return null; // Eagerly returning null avoids "No content to map due to end-of-input"
}
reader.reset();
return new JacksonIterator<Object>(actualIteratorTypeArgument(type), mapper, response, reader);
} catch (RuntimeJsonMappingException e) {
if (e.getCause() != null && e.getCause() instanceof IOException) {
throw IOException.class.cast(e.getCause());
}
throw e;
}
}

private static Type actualIteratorTypeArgument(Type type) {
if (!(type instanceof ParameterizedType)) {
throw new IllegalArgumentException("Not supported type " + type.toString());
}
ParameterizedType parameterizedType = (ParameterizedType) type;
if (!Iterator.class.equals(parameterizedType.getRawType())) {
throw new IllegalArgumentException("Not an iterator type " + parameterizedType.getRawType().toString());
}
return ((ParameterizedType) type).getActualTypeArguments()[0];
}

public static JacksonIteratorDecoder create() {
return create(Collections.<Module>emptyList());
}

public static JacksonIteratorDecoder create(Iterable<Module> modules) {
return new JacksonIteratorDecoder(new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.registerModules(modules));
}

public static JacksonIteratorDecoder create(ObjectMapper objectMapper) {
return new JacksonIteratorDecoder(objectMapper);
}

static final class JacksonIterator<T> implements Iterator<T>, Closeable {
private final Response response;
private final JsonParser parser;
private final ObjectReader objectReader;

private T current;

JacksonIterator(Type type, ObjectMapper mapper, Response response, Reader reader)
throws IOException {
this.response = response;
this.parser = mapper.getFactory().createParser(reader);
this.objectReader = mapper.reader().forType(mapper.constructType(type));
}

@Override
public boolean hasNext() {
try {
JsonToken jsonToken = parser.nextToken();
if (jsonToken == null) {
return false;
}

if (jsonToken == JsonToken.START_ARRAY) {
jsonToken = parser.nextToken();
}

if (jsonToken == JsonToken.END_ARRAY) {
current = null;
ensureClosed(this);
return false;
}

current = objectReader.readValue(parser);
} catch (IOException e) {
// Input Stream closed automatically by parser
throw new DecodeException(e.getMessage(), e);
}
return current != null;
}

@Override
public T next() {
return current;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public void close() throws IOException {
ensureClosed(this.response);
}
}
}

0 comments on commit 861bb80

Please sign in to comment.