Skip to content

Commit

Permalink
Fix metrics for InstrumentedEE10Handler (#3928)
Browse files Browse the repository at this point in the history
Fixes #3917
  • Loading branch information
zUniQueX committed Jun 9, 2024
1 parent 9e0bedb commit 607e6d1
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 120 deletions.
7 changes: 7 additions & 0 deletions metrics-jetty12-ee10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.ee10</groupId>
Expand Down Expand Up @@ -134,5 +135,11 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package io.dropwizard.metrics.jetty12.ee10;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.annotation.ResponseMeteredLevel;
import io.dropwizard.metrics.jetty12.AbstractInstrumentedHandler;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import org.eclipse.jetty.ee10.servlet.AsyncContextState;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletRequestEvent;
import jakarta.servlet.ServletRequestListener;
import org.eclipse.jetty.ee10.servlet.ServletApiRequest;
import org.eclipse.jetty.ee10.servlet.ServletApiResponse;
import org.eclipse.jetty.ee10.servlet.ServletChannelState;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletContextRequest;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static com.codahale.metrics.annotation.ResponseMeteredLevel.COARSE;

/**
* A Jetty {@link Handler} which records various metrics about an underlying {@link Handler}
Expand All @@ -28,15 +27,15 @@
* {@link org.eclipse.jetty.ee10.servlet.ServletContextHandler#insertHandler(Singleton)}.
*/
public class InstrumentedEE10Handler extends AbstractInstrumentedHandler {
private AsyncListener listener;
private AsyncDispatchesAwareServletRequestListener asyncDispatchesAwareServletRequestListener;

/**
* Create a new instrumented handler using a given metrics registry.
*
* @param registry the registry for the metrics
*/
public InstrumentedEE10Handler(MetricRegistry registry) {
super(registry, null);
super(registry);
}

/**
Expand All @@ -46,7 +45,7 @@ public InstrumentedEE10Handler(MetricRegistry registry) {
* @param prefix the prefix to use for the metrics names
*/
public InstrumentedEE10Handler(MetricRegistry registry, String prefix) {
super(registry, prefix, COARSE);
super(registry, prefix);
}

/**
Expand All @@ -63,8 +62,7 @@ public InstrumentedEE10Handler(MetricRegistry registry, String prefix, ResponseM
@Override
protected void doStart() throws Exception {
super.doStart();

this.listener = new AsyncAttachingListener();
asyncDispatchesAwareServletRequestListener = new AsyncDispatchesAwareServletRequestListener(getAsyncDispatches());
}

@Override
Expand All @@ -73,104 +71,84 @@ protected void doStop() throws Exception {
}

@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception {
protected void setupServletListeners(Request request, Response response) {
ServletContextRequest servletContextRequest = Request.as(request, ServletContextRequest.class);

// only handle servlet requests with the InstrumentedHandler
// because it depends on the ServletRequestState
if (servletContextRequest == null) {
return super.handle(request, response, callback);
}

activeDispatches.inc();

final long start;
final ServletChannelState state = servletContextRequest.getServletRequestState();
if (state.isInitial()) {
// new request
activeRequests.inc();
start = Request.getTimeStamp(request);
state.addListener(listener);
} else {
// resumed request
start = System.currentTimeMillis();
activeSuspended.dec();
if (state.getState() == ServletChannelState.State.HANDLING) {
asyncDispatches.mark();
}
return;
}

boolean handled = false;
ServletChannelState servletChannelState = servletContextRequest.getServletRequestState();
// the ServletChannelState gets recycled after handling, so add a new listener for every request
servletChannelState.addListener(new InstrumentedAsyncListener(getAsyncTimeouts()));

try {
handled = super.handle(request, response, callback);
} finally {
final long now = System.currentTimeMillis();
final long dispatched = now - start;
ServletContextHandler servletContextHandler = servletContextRequest.getServletContextHandler();
// addEventListener checks for duplicates, so we can try to add the listener for every request
servletContextHandler.addEventListener(asyncDispatchesAwareServletRequestListener);
}

activeDispatches.dec();
dispatches.update(dispatched, TimeUnit.MILLISECONDS);
@Override
protected boolean isSuspended(Request request, Response response) {
ServletContextRequest servletContextRequest = Request.as(request, ServletContextRequest.class);
if (servletContextRequest == null) {
return false;
}

if (state.isSuspended()) {
activeSuspended.inc();
} else if (state.isInitial()) {
updateResponses(request, response, start, handled);
}
// else onCompletion will handle it.
ServletChannelState servletChannelState = servletContextRequest.getServletRequestState();
if (servletChannelState == null) {
return false;
}

return handled;
return servletChannelState.isSuspended();
}

private class AsyncAttachingListener implements AsyncListener {

@Override
public void onTimeout(AsyncEvent event) throws IOException {}
private static class AsyncDispatchesAwareServletRequestListener implements ServletRequestListener {
private final Meter asyncDispatches;

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
event.getAsyncContext().addListener(new InstrumentedAsyncListener());
private AsyncDispatchesAwareServletRequestListener(Meter asyncDispatches) {
this.asyncDispatches = asyncDispatches;
}

@Override
public void onError(AsyncEvent event) throws IOException {}
public void requestInitialized(ServletRequestEvent sre) {
ServletRequest servletRequest = sre.getServletRequest();
if (!(servletRequest instanceof ServletApiRequest)) {
return;
}

@Override
public void onComplete(AsyncEvent event) throws IOException {}
ServletApiRequest servletApiRequest = (ServletApiRequest) servletRequest;

ServletContextHandler.ServletRequestInfo servletRequestInfo = servletApiRequest.getServletRequestInfo();

ServletChannelState servletChannelState = servletRequestInfo.getServletRequestState();

// if the request isn't 'initial', the request was re-dispatched
if (servletChannelState.isAsync() && !servletChannelState.isInitial()) {
asyncDispatches.mark();
}
}
}

private class InstrumentedAsyncListener implements AsyncListener {
private final long startTime;
private static class InstrumentedAsyncListener implements AsyncListener {
private final Meter asyncTimeouts;

InstrumentedAsyncListener() {
this.startTime = System.currentTimeMillis();
private InstrumentedAsyncListener(Meter asyncTimeouts) {
this.asyncTimeouts = asyncTimeouts;
}

@Override
public void onTimeout(AsyncEvent event) throws IOException {
asyncTimeouts.mark();
}
public void onComplete(AsyncEvent event) throws IOException {}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
public void onTimeout(AsyncEvent event) throws IOException {
asyncTimeouts.mark();
}

@Override
public void onError(AsyncEvent event) throws IOException {
}
public void onError(AsyncEvent event) throws IOException {}

@Override
public void onComplete(AsyncEvent event) throws IOException {
final AsyncContextState state = (AsyncContextState) event.getAsyncContext();
final ServletApiRequest request = (ServletApiRequest) state.getRequest();
final ServletApiResponse response = (ServletApiResponse) state.getResponse();
updateResponses(request.getRequest(), response.getResponse(), startTime, true);

final ServletContextRequest servletContextRequest = Request.as(request.getRequest(), ServletContextRequest.class);
final ServletChannelState servletRequestState = servletContextRequest.getServletRequestState();
if (!servletRequestState.isSuspended()) {
activeSuspended.dec();
}
public void onStartAsync(AsyncEvent event) throws IOException {
event.getAsyncContext().addListener(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.dropwizard.metrics.jetty12.ee10;

import com.codahale.metrics.MetricRegistry;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.junit.After;
import org.junit.Before;

import static com.codahale.metrics.annotation.ResponseMeteredLevel.ALL;

abstract class AbstractIntegrationTest {

protected final HttpClient client = new HttpClient();
protected final MetricRegistry registry = new MetricRegistry();
protected final Server server = new Server();
protected final ServerConnector connector = new ServerConnector(server);
protected final InstrumentedEE10Handler handler = new InstrumentedEE10Handler(registry, null, ALL);
protected final ServletContextHandler servletContextHandler = new ServletContextHandler();

@Before
public void setUp() throws Exception {
handler.setName("handler");

// builds the following handler chain:
// ServletContextHandler -> InstrumentedHandler -> TestHandler
// the ServletContextHandler is needed to utilize servlet related classes
servletContextHandler.setHandler(getHandler());
servletContextHandler.insertHandler(handler);
server.setHandler(servletContextHandler);

server.addConnector(connector);
server.start();
client.start();
}

@After
public void tearDown() throws Exception {
server.stop();
client.stop();
}

protected String uri(String path) {
return "http://localhost:" + connector.getLocalPort() + path;
}

protected abstract Handler getHandler();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.dropwizard.metrics.jetty12.ee10;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.ee10.servlet.ServletHandler;
import org.eclipse.jetty.server.Handler;
import org.junit.Test;

import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.awaitility.Awaitility.await;

public class AsyncTest extends AbstractIntegrationTest {

@Override
protected Handler getHandler() {
return new ServletHandler();
}

@Test
public void testAsyncTimeout() throws Exception {
servletContextHandler.addFilter((request, response, chain) -> {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(1);
}, "/*", EnumSet.allOf(DispatcherType.class));

client.GET(uri("/"));
Meter asyncTimeouts = registry.meter(MetricRegistry.name(ServletHandler.class, "handler.async-timeouts"));
assertThat(asyncTimeouts.getCount()).isEqualTo(1L);

client.GET(uri("/"));
assertThat(asyncTimeouts.getCount()).isEqualTo(2L);
}

@Test
public void testActiveSuspended() {
servletContextHandler.addFilter((request, response, chain) -> {
AsyncContext asyncContext = request.startAsync();
asyncContext.start(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
asyncContext.complete();
});
}, "/*", EnumSet.allOf(DispatcherType.class));

Counter activeSuspended = registry.counter(MetricRegistry.name(ServletHandler.class, "handler.active-suspended"));
Request request = client.POST(uri("/"));
CompletableResponseListener completableResponseListener = new CompletableResponseListener(request);
CompletableFuture<ContentResponse> asyncResponse = completableResponseListener.send();
assertThatNoException().isThrownBy(() -> {
await()
.atMost(750, TimeUnit.MILLISECONDS)
.until(() -> activeSuspended.getCount() == 1L);
asyncResponse.get();
});
assertThat(activeSuspended.getCount()).isEqualTo(0L);
}

@Test
public void testAsyncDispatches() throws Exception {
servletContextHandler.addFilter((request, response, chain) -> {
if (!(request instanceof HttpServletRequest)) {
throw new IllegalStateException("Expecting ServletRequest to be an instance of HttpServletRequest");
}
HttpServletRequest httpServletRequest = (HttpServletRequest) request;
if ("/".equals(httpServletRequest.getRequestURI())) {
AsyncContext asyncContext = request.startAsync();
asyncContext.dispatch("/dispatch");
return;
}
if ("/dispatch".equals(httpServletRequest.getRequestURI())) {
AsyncContext asyncContext = request.startAsync();
if (!(response instanceof HttpServletResponse)) {
throw new IllegalStateException("Expecting ServletResponse to be an instance of HttpServletResponse");
}
HttpServletResponse httpServletResponse = (HttpServletResponse) response;
httpServletResponse.setStatus(204);
asyncContext.complete();
return;
}
throw new UnsupportedOperationException("Only '/' and '/dispatch' are valid paths");
}, "/*", EnumSet.allOf(DispatcherType.class));

ContentResponse contentResponse = client.GET(uri("/"));
assertThat(contentResponse).isNotNull().extracting(Response::getStatus).isEqualTo(204);
Meter asyncDispatches = registry.meter(MetricRegistry.name(ServletHandler.class, "handler.async-dispatches"));
assertThat(asyncDispatches.getCount()).isEqualTo(1L);
}
}
Loading

0 comments on commit 607e6d1

Please sign in to comment.