Skip to content

Commit

Permalink
Don't shut down asynchronous executor in Jersey
Browse files Browse the repository at this point in the history
If a user customized a Jersey client, Jersey creates a new copy
of the instance. Jersey has a mechanism for detecting leaks, and Jetty
will invoke the `dispose` method on the old client. Unfortunately,
during a copy the executor is shared. Therefore, the user will not be
able to perform async requests anymore.

The executor is managed by the Dropwizard environment, so we can leave
with the fact that it's closed by a kill signal.

Fixes #2218
  • Loading branch information
arteam committed Nov 24, 2017
1 parent 44efc04 commit 87e05dc
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 107 deletions.
@@ -1,52 +1,21 @@
package io.dropwizard.client;

import com.google.common.util.concurrent.ForwardingExecutorService;
import io.dropwizard.util.Duration;
import org.glassfish.jersey.client.ClientAsyncExecutor;
import org.glassfish.jersey.spi.ExecutorServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;

/**
* An {@link ExecutorServiceProvider} implementation for use within
* Dropwizard.
*
* With {DropwizardExecutorProvider.DisposableExecutorService}, one
* can signal that an {ExecutorService} is to be gracefully shut down
* upon its disposal by the Jersey runtime. It is used as a means of
* signaling to {@link DropwizardExecutorProvider} that the executor
* is not shared.
*/
@ClientAsyncExecutor
class DropwizardExecutorProvider implements ExecutorServiceProvider {
/**
* An {@link ExecutorService} decorator used as a marker by
* {@link DropwizardExecutorProvider#dispose} to induce service
* shutdown.
*/
static class DisposableExecutorService extends ForwardingExecutorService {
private final ExecutorService delegate;

public DisposableExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}

@Override
protected ExecutorService delegate() {
return delegate;
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(DropwizardExecutorProvider.class);

private final ExecutorService executor;
private final Duration shutdownGracePeriod;

DropwizardExecutorProvider(ExecutorService executor, Duration shutdownGracePeriod) {
DropwizardExecutorProvider(ExecutorService executor) {
this.executor = executor;
this.shutdownGracePeriod = shutdownGracePeriod;
}

@Override
Expand All @@ -56,15 +25,5 @@ public ExecutorService getExecutorService() {

@Override
public void dispose(ExecutorService executorService) {
if (executorService instanceof DisposableExecutorService) {
executorService.shutdown();

try {
executorService.awaitTermination(
shutdownGracePeriod.getQuantity(), shutdownGracePeriod.getUnit());
} catch (InterruptedException err) {
LOGGER.warn("Interrupted while waiting for ExecutorService shutdown", err);
}
}
}
}
Expand Up @@ -68,7 +68,6 @@ public class JerseyClientBuilder {
private ObjectMapper objectMapper;
private ExecutorService executorService;
private ConnectorProvider connectorProvider;
private Duration shutdownGracePeriod = Duration.seconds(5);

public JerseyClientBuilder(Environment environment) {
this.apacheHttpClientBuilder = new HttpClientBuilder(environment);
Expand Down Expand Up @@ -122,18 +121,6 @@ public JerseyClientBuilder withProperty(String propertyName, Object propertyValu
return this;
}

/**
* Sets the shutdown grace period.
*
* @param shutdownGracePeriod a period of time to await shutdown of the
* configured {ExecutorService}.
* @return {@code this}
*/
public JerseyClientBuilder withShutdownGracePeriod(Duration shutdownGracePeriod) {
this.shutdownGracePeriod = shutdownGracePeriod;
return this;
}

/**
* Uses the given {@link JerseyClientConfiguration}.
*
Expand Down Expand Up @@ -338,14 +325,12 @@ public Client build(String name) {
// configuration. The DisposableExecutorService decorator
// is used to ensure that the service is shut down if the
// Jersey client disposes of it.
executorService = new DropwizardExecutorProvider.DisposableExecutorService(
environment.lifecycle()
.executorService("jersey-client-" + name + "-%d")
.minThreads(configuration.getMinThreads())
.maxThreads(configuration.getMaxThreads())
.workQueue(new ArrayBlockingQueue<>(configuration.getWorkQueueSize()))
.build()
);
executorService = environment.lifecycle()
.executorService("jersey-client-" + name + "-%d")
.minThreads(configuration.getMinThreads())
.maxThreads(configuration.getMaxThreads())
.workQueue(new ArrayBlockingQueue<>(configuration.getWorkQueueSize()))
.build();
}

if (objectMapper == null) {
Expand Down Expand Up @@ -410,7 +395,7 @@ private Configuration buildConfig(final String name, final ExecutorService threa
config.property(property.getKey(), property.getValue());
}

config.register(new DropwizardExecutorProvider(threadPool, shutdownGracePeriod));
config.register(new DropwizardExecutorProvider(threadPool));
if (connectorProvider == null) {
final ConfiguredCloseableHttpClient apacheHttpClient =
apacheHttpClientBuilder.buildWithDefaultRequestConfiguration(name);
Expand Down

This file was deleted.

Expand Up @@ -10,13 +10,15 @@
import io.dropwizard.jackson.Jackson;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.protocol.HttpContext;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.glassfish.jersey.logging.LoggingFeature;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
Expand All @@ -26,8 +28,10 @@
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

Expand Down Expand Up @@ -351,6 +355,50 @@ public void testFilterOnAWebTarget() {
jersey.close();
}

@Test
public void testAsyncWithCustomized() throws Exception {
httpServer.createContext("/test", httpExchange -> {
try {
httpExchange.getResponseHeaders().add(HttpHeaders.CONTENT_TYPE, TEXT_PLAIN);
byte[] body = "Hello World!".getBytes(StandardCharsets.UTF_8);
httpExchange.sendResponseHeaders(200, body.length);
httpExchange.getResponseBody().write(body);
} finally {
httpExchange.close();
}
});
httpServer.start();

ExecutorService executor = Executors.newSingleThreadExecutor();
Client jersey = new JerseyClientBuilder(new MetricRegistry())
.using(executor, JSON_MAPPER)
.build("test-jersey-client");
String uri = "http://127.0.0.1:" + httpServer.getAddress().getPort() + "/test";
CountDownLatch countDownLatch = new CountDownLatch(25); // Big enough that a `dispose` call shutdowns the executor
for (int i = 0; i < 25; i++) {
jersey.target(uri)
.register(HttpAuthenticationFeature.basic("scott", "t1ger")).request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String s) {
assertThat(s).isEqualTo("Hello World!");
countDownLatch.countDown();
}

@Override
public void failed(Throwable t) {
t.printStackTrace();
}
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
assertThat(countDownLatch.getCount()).isEqualTo(0);

executor.shutdown();
jersey.close();
}

static class Person {

@JsonProperty("email")
Expand Down

0 comments on commit 87e05dc

Please sign in to comment.