Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop filling monitoring queues when processor fails #4697

Merged
merged 2 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,14 +20,14 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import javax.ws.rs.ProcessingException;

import javax.annotation.Priority;
import javax.inject.Inject;
import javax.ws.rs.ProcessingException;

import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.server.internal.LocalizationMessages;
Expand Down Expand Up @@ -68,6 +68,8 @@ public final class MonitoringEventListener implements ApplicationEventListener {
private final Queue<Integer> responseStatuses = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private final Queue<RequestEvent> exceptionMapperEvents = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private volatile MonitoringStatisticsProcessor monitoringStatisticsProcessor;
// By default new events can arrive before MonitoringStatisticsProcessor is running.
private final AtomicBoolean processorFailed = new AtomicBoolean(false);

/**
* Time statistics.
Expand Down Expand Up @@ -185,6 +187,7 @@ public void onEvent(final ApplicationEvent event) {
case RELOAD_FINISHED:
case INITIALIZATION_FINISHED:
this.monitoringStatisticsProcessor = new MonitoringStatisticsProcessor(injectionManager, this);
processorFailed.set(false);
this.monitoringStatisticsProcessor.startMonitoringWorker();
break;
case DESTROY_FINISHED:
Expand Down Expand Up @@ -238,13 +241,13 @@ public void onEvent(final RequestEvent event) {
methodStats = new MethodStats(method, methodTimeStart, now - methodTimeStart);
break;
case EXCEPTION_MAPPING_FINISHED:
if (!exceptionMapperEvents.offer(event)) {
if (!offer(exceptionMapperEvents, event)) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_MAPPER());
}
break;
case FINISHED:
if (event.isResponseWritten()) {
if (!responseStatuses.offer(event.getContainerResponse().getStatus())) {
if (!offer(responseStatuses, event.getContainerResponse().getStatus())) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_RESPONSE());
}
}
Expand All @@ -264,8 +267,7 @@ public void onEvent(final RequestEvent event) {
}
sb.setLength(sb.length() - 1);
}

if (!requestQueuedItems.offer(new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
if (!offer(requestQueuedItems, new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
methodStats, sb.toString()))) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_REQUEST());
}
Expand All @@ -274,6 +276,21 @@ public void onEvent(final RequestEvent event) {
}
}

private <T> boolean offer(Queue<T> queue, T event) {
if (!processorFailed.get()) {
return queue.offer(event);
}
// Don't need to warn that the event was not queued because an Exception was thrown by MonitoringStatisticsProcessor
return true;
}

/**
* Invoked by {@link MonitoringStatisticsProcessor} when there is one exception consuming from queues.
*/
void processorFailed() {
processorFailed.set(true);
}

/**
* Get the exception mapper event queue.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.glassfish.jersey.server.ExtendedResourceContext;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.LocalizationMessages;
import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener.RequestStats;
import org.glassfish.jersey.server.model.ResourceMethod;
import org.glassfish.jersey.server.model.ResourceModel;
import org.glassfish.jersey.server.monitoring.MonitoringStatisticsListener;
Expand Down Expand Up @@ -94,6 +95,7 @@ public void run() {
processResponseCodeEvents();
processExceptionMapperEvents();
} catch (final Throwable t) {
monitoringEventListener.processorFailed();
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
// rethrowing exception stops further task execution
throw new ProcessingException(LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
Expand All @@ -120,11 +122,9 @@ public void run() {
private void processExceptionMapperEvents() {
final Queue<RequestEvent> eventQueue = monitoringEventListener.getExceptionMapperEvents();
final FloodingLogger floodingLogger = new FloodingLogger(eventQueue);

while (!eventQueue.isEmpty()) {
RequestEvent event = null;
while ((event = eventQueue.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final RequestEvent event = eventQueue.remove();
final ExceptionMapperStatisticsImpl.Builder mapperStats = statisticsBuilder.getExceptionMapperStatisticsBuilder();

if (event.getExceptionMapper() != null) {
Expand All @@ -138,12 +138,9 @@ private void processExceptionMapperEvents() {
private void processRequestItems() {
final Queue<MonitoringEventListener.RequestStats> requestQueuedItems = monitoringEventListener.getRequestQueuedItems();
final FloodingLogger floodingLogger = new FloodingLogger(requestQueuedItems);

while (!requestQueuedItems.isEmpty()) {
RequestStats event = null;
while ((event = requestQueuedItems.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final MonitoringEventListener.RequestStats event = requestQueuedItems.remove();

final MonitoringEventListener.TimeStats requestStats = event.getRequestStats();
statisticsBuilder.addRequestExecution(requestStats.getStartTime(), requestStats.getDuration());

Expand All @@ -160,11 +157,9 @@ private void processRequestItems() {
private void processResponseCodeEvents() {
final Queue<Integer> responseEvents = monitoringEventListener.getResponseStatuses();
final FloodingLogger floodingLogger = new FloodingLogger(responseEvents);

while (!responseEvents.isEmpty()) {
Integer code = null;
while ((code = responseEvents.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final Integer code = responseEvents.remove();
statisticsBuilder.addResponseCode(code);
}

Expand Down
45 changes: 45 additions & 0 deletions tests/integration/jersey-4697/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.

This program and the accompanying materials are made available under the
terms of the Eclipse Public License v. 2.0, which is available at
http://www.eclipse.org/legal/epl-2.0.

This Source Code may also be made available under the following Secondary
Licenses when the conditions for such availability set forth in the
Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
version 2 with the GNU Classpath Exception, which is available at
https://www.gnu.org/software/classpath/license.html.

SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>project</artifactId>
<groupId>org.glassfish.jersey.tests.integration</groupId>
<version>2.34-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>jersey-4697</artifactId>

<dependencies>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
<artifactId>jersey-test-framework-provider-bundle</artifactId>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.tests.integration.jersey4697;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;

import javax.inject.Inject;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeDataSupport;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;

import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.ExceptionMapperMXBean;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class MonitoringEventListenerTest extends JerseyTest {

private static final long TIMEOUT = 500;
private static final String MBEAN_EXCEPTION =
"org.glassfish.jersey:type=MonitoringEventListenerTest,subType=Global,exceptions=ExceptionMapper";

@Path("/example")
public static class ExampleResource {
@Inject
private InjectionManager injectionManager;
@GET
@Path("/error")
public Response error() {
throw new RuntimeException("Any exception to be counted in ExceptionMapper");
}
@GET
@Path("/poison")
public Response poison() {
MonitoringEventListener monitoringEventListener = listener();
RequestEvent requestEvent = mock(RequestEvent.class);
when(requestEvent.getType()).thenReturn(RequestEvent.Type.START);
RequestEventListener eventListener = monitoringEventListener.onRequest(requestEvent);
RequestEvent poisonEvent = mock(RequestEvent.class);
when(poisonEvent.getType()).thenReturn(RequestEvent.Type.EXCEPTION_MAPPING_FINISHED);
when(poisonEvent.getExceptionMapper())
.thenThrow(new IllegalStateException("This causes the scheduler to stop working"));
eventListener.onEvent(poisonEvent);
return Response.ok().build();
}
@GET
@Path("/queueSize")
public Response queueSize() throws Exception {
MonitoringEventListener monitoringEventListener = listener();
Method method = MonitoringEventListener.class.getDeclaredMethod("getExceptionMapperEvents");
method.setAccessible(true);
Collection<?> queue = (Collection<?>) method.invoke(monitoringEventListener);
return Response.ok(queue.size()).build();
}
private MonitoringEventListener listener() {
Iterable<ApplicationEventListener> listeners =
Providers.getAllProviders(injectionManager, ApplicationEventListener.class);
for (ApplicationEventListener listener : listeners) {
if (listener instanceof MonitoringEventListener) {
return (MonitoringEventListener) listener;
}
}
throw new IllegalStateException("MonitoringEventListener was not found");
}
}

@Provider
public static class RuntimeExceptionMapper implements ExceptionMapper<RuntimeException> {
@Override
public Response toResponse(RuntimeException e) {
return Response.status(500).entity("RuntimeExceptionMapper: " + e.getMessage()).build();
}
}

@Override
protected Application configure() {
ResourceConfig resourceConfig = new ResourceConfig(ExampleResource.class);
// Need to map the exception to be counted by ExceptionMapper
resourceConfig.register(RuntimeExceptionMapper.class);
resourceConfig.property(ServerProperties.MONITORING_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_REFRESH_INTERVAL, 1);
resourceConfig.setApplicationName("MonitoringEventListenerTest");
return resourceConfig;
}

@Test
public void exceptionInScheduler() throws Exception {
final Long ERRORS_BEFORE_FAIL = 10L;
// Send some requests to process some statistics.
request(ERRORS_BEFORE_FAIL);
// Give some time to the scheduler to collect data.
Thread.sleep(TIMEOUT);
// All events were consumed by scheduler
queueIsEmpty();
// Make the scheduler to fail. No more statistics are collected.
makeFailure();
// Sending again requests
request(20);
Thread.sleep(TIMEOUT);
// No new events should be accepted because scheduler is not working.
queueIsEmpty();
Long monitoredErrors = mappedErrorsFromJMX(MBEAN_EXCEPTION);
assertEquals(ERRORS_BEFORE_FAIL, monitoredErrors);
}

private void makeFailure() {
Response response = target("/example/poison").request().get();
assertEquals(200, response.getStatus());
}

private void queueIsEmpty() {
Response response = target("/example/queueSize").request().get();
assertEquals(200, response.getStatus());
assertEquals(Integer.valueOf(0), response.readEntity(Integer.class));
}

private Long mappedErrorsFromJMX(String name) throws Exception {
Long monitoredErrors = null;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(name);
ExceptionMapperMXBean bean = JMX.newMBeanProxy(mbs, objectName, ExceptionMapperMXBean.class);
Map<?, ?> counter = bean.getExceptionMapperCount();
CompositeDataSupport value = (CompositeDataSupport) counter.entrySet().iterator().next().getValue();
for (Object obj : value.values()) {
if (obj instanceof Long) {
// Messy way to get the errors, but generic types doesn't match and there is no nice way
monitoredErrors = (Long) obj;
break;
}
}
return monitoredErrors;
}

private void request(long requests) {
for (long i = 0; i < requests; i++) {
Response response = target("/example/error").request().get();
assertEquals(500, response.getStatus());
}
}
}
1 change: 1 addition & 0 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<module>jersey-4321</module>
<module>jersey-4507</module>
<module>jersey-4542</module>
<module>jersey-4697</module>
<module>jersey-4722</module>
<module>jetty-response-close</module>
<module>microprofile</module>
Expand Down