From 45f944dd8a003aa814c968a7ca63a9cd743a3473 Mon Sep 17 00:00:00 2001 From: Antoine Date: Sun, 4 Sep 2022 13:37:23 -0400 Subject: [PATCH] 1.2: Add a way to disable thread interruption on timeout (#3) --- README.md | 15 +- pom.xml | 2 +- .../teketik/spring/health/AsyncHealth.java | 13 +- .../spring/health/AsyncHealthIndicator.java | 10 +- ...AsyncHealthIndicatorAutoConfiguration.java | 3 +- .../java/com/teketik/utils/Schedulable.java | 13 +- .../utils/SchedulingThreadPoolExecutor.java | 17 ++- .../TimeoutInterruptibleDisabledITest.java | 135 ++++++++++++++++++ ...SleepingIndicatorInterruptionDisabled.java | 30 ++++ 9 files changed, 214 insertions(+), 24 deletions(-) create mode 100644 src/test/java/com/teketik/spring/health/TimeoutInterruptibleDisabledITest.java create mode 100644 src/test/java/com/teketik/spring/health/indicators/TimingOutSleepingIndicatorInterruptionDisabled.java diff --git a/README.md b/README.md index e93ae1b..916ceb6 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,4 @@ - - - # Spring Boot Async Health Indicator Async Health Indicator for [spring-boot-actuator](https://docs.spring.io/spring-boot/docs/current/reference/html/actuator.html) >=2.2.0 gives [Health Indicator](https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/actuate/health/HealthIndicator.html) the ability to get refreshed asynchronously on a background ThreadPoolExecutor using the annotation `@AsyncHealth`. @@ -35,18 +32,20 @@ This module is auto-configured. com.teketik async-health-indicator - boot2-v1.1 + boot2-v1.2 ``` - - Annotate any `HealthIndicator` with `@AsyncHealth(refreshRate = $REFRESH_RATE, timeout = $TIMEOUT)` + - Annotate any `HealthIndicator` with `@AsyncHealth(refreshRate = $REFRESH_RATE, timeout = $TIMEOUT, interruptOnTimeout = $INTERRUPT_ON_TIMEOUT)` + +`$REFRESH_RATE` = Fixed delay in seconds between the termination of the `health()` execution and the commencement of the next (default `1`). -`$REFRESH_RATE` = Fixed delay in seconds between the termination of the `health()` execution and the commencement of the next (default 1). +`$TIMEOUT`= The maximum time in seconds that the `health()` execution can take before being considered `DOWN` (default `10`). -`$TIMEOUT`= The maximum time in seconds that the `health()` execution can take before being considered `DOWN` (default 10). +`$INTERRUPT_ON_TIMEOUT` = Whether the thread should be interrupted when a timeout occurs. (if `false`, the next check will only be scheduled after the current execution terminates naturally). (default `true`) ## Regarding Timeout -When a `health()` method duration exceeds the configured `timeout`, the thread running it is `interrupted`with the hope that the method will fail with an exception (causing it to be `DOWN`) and free up the thread. +When a `health()` method duration exceeds the configured `timeout`, the thread running it is `interrupted` if `interruptOnTimeout` is `true` with the hope that the method will fail with an exception (causing it to be `DOWN`) and free up the thread. Unfortunately, most I/O calls are not interruptible and the thread may continue to execute the method until it times out (according to the libraries and configuration used). If that happens, you will observe the `timeout` error message printed for each `/health` hit until that method times out like: ``` diff --git a/pom.xml b/pom.xml index e7f48cf..e0f1f54 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.teketik async-health-indicator - boot2-v1.1 + boot2-v1.2 Spring Boot Async Health Indicator Asynchronous Health Indicator for Spring Boot https://github.com/antoinemeyer/spring-boot-async-health-indicator diff --git a/src/main/java/com/teketik/spring/health/AsyncHealth.java b/src/main/java/com/teketik/spring/health/AsyncHealth.java index 39273f6..a19c9c1 100644 --- a/src/main/java/com/teketik/spring/health/AsyncHealth.java +++ b/src/main/java/com/teketik/spring/health/AsyncHealth.java @@ -15,8 +15,8 @@ *

The {@link HealthIndicator#health()} method: *

*

The `/health` endpoint will not invoke the {@link HealthIndicator#health() health method} but return the last {@link Health} calculated asynchronously. *

Note that the {@link HealthIndicator} may return {@link Status#UNKNOWN} on application startup if the `/health` endpoint @@ -37,8 +37,8 @@ *


*

Regarding Timeout

*

- * When a {@link HealthIndicator#health() health method} duration exceeds the configured {@link #timeout()}, the thread running it is interrupted with the hope that the method will fail with an exception (causing it to be {@link Status#DOWN}) - * and free up the thread.
+ * When a {@link HealthIndicator#health() health method} duration exceeds the configured {@link #timeout()}, the thread running it is interrupted if {@link #interruptOnTimeout()} is {@code true} + * with the hope that the method will fail with an exception (causing it to be {@link Status#DOWN}) and free up the thread.
* Unfortunately, most I/O calls are not interruptible and the thread may continue to execute the method until it times out (according to the libraries and configuration used). *

* If that happens, you will observe the 'timeout' error message printed for each `/health` hit until that method times out like: @@ -67,5 +67,10 @@ */ int timeout() default 10; + /** + * @return whether the thread should be interrupted when a timeout occurs. (if {@code false}, the next check will only be scheduled after the current execution terminates naturally). + */ + boolean interruptOnTimeout() default true; + } diff --git a/src/main/java/com/teketik/spring/health/AsyncHealthIndicator.java b/src/main/java/com/teketik/spring/health/AsyncHealthIndicator.java index 803569e..6731076 100644 --- a/src/main/java/com/teketik/spring/health/AsyncHealthIndicator.java +++ b/src/main/java/com/teketik/spring/health/AsyncHealthIndicator.java @@ -28,15 +28,17 @@ class AsyncHealthIndicator implements HealthIndicator, Schedulable { private final String name; private final int refreshRateInSeconds; private final int timeoutInSeconds; + private final boolean interruptOnTimeout; private volatile Health lastHealth; private volatile long healthStartTimeMillis = -1; - public AsyncHealthIndicator(HealthIndicator originalHealthIndicator, String name, int refreshRateInSeconds, int timeoutInSeconds) { + public AsyncHealthIndicator(HealthIndicator originalHealthIndicator, String name, int refreshRateInSeconds, int timeoutInSeconds, boolean interruptOnTimeout) { this.originalHealthIndicator = originalHealthIndicator; this.name = name; this.refreshRateInSeconds = refreshRateInSeconds; this.timeoutInSeconds = timeoutInSeconds; + this.interruptOnTimeout = interruptOnTimeout; } @Override @@ -115,8 +117,10 @@ public int getRefreshRateInSeconds() { } @Override - public int getTimeoutInSeconds() { - return timeoutInSeconds; + public Optional getTimeoutInSeconds() { + return interruptOnTimeout + ? Optional.of(timeoutInSeconds) + : Optional.empty(); } @Override diff --git a/src/main/java/com/teketik/spring/health/AsyncHealthIndicatorAutoConfiguration.java b/src/main/java/com/teketik/spring/health/AsyncHealthIndicatorAutoConfiguration.java index 9850430..e55f83e 100644 --- a/src/main/java/com/teketik/spring/health/AsyncHealthIndicatorAutoConfiguration.java +++ b/src/main/java/com/teketik/spring/health/AsyncHealthIndicatorAutoConfiguration.java @@ -53,7 +53,8 @@ public void afterPropertiesSet() throws Exception { indicator, contributorName, annotation.refreshRate(), - annotation.timeout() + annotation.timeout(), + annotation.interruptOnTimeout() ); healthContributorRegistry.registerContributor(contributorName, asyncHealthIndicator); asyncHealthIndicators.add(asyncHealthIndicator); diff --git a/src/main/java/com/teketik/utils/Schedulable.java b/src/main/java/com/teketik/utils/Schedulable.java index 2e588ce..45267b7 100644 --- a/src/main/java/com/teketik/utils/Schedulable.java +++ b/src/main/java/com/teketik/utils/Schedulable.java @@ -1,6 +1,17 @@ package com.teketik.utils; +import java.util.Optional; + public interface Schedulable extends Runnable { + + /** + * @return time in seconds between the termination of one execution and the commencement of the next. + */ int getRefreshRateInSeconds(); - int getTimeoutInSeconds(); + + /** + * @return maximum time in seconds this can run before being interrupted ({@link Optional#empty() empty} if never to be interrupted). + */ + Optional getTimeoutInSeconds(); + } diff --git a/src/main/java/com/teketik/utils/SchedulingThreadPoolExecutor.java b/src/main/java/com/teketik/utils/SchedulingThreadPoolExecutor.java index 7c859dc..5bb7e1a 100644 --- a/src/main/java/com/teketik/utils/SchedulingThreadPoolExecutor.java +++ b/src/main/java/com/teketik/utils/SchedulingThreadPoolExecutor.java @@ -62,12 +62,17 @@ protected RunnableFuture newTaskFor(Runnable runnable, T value) { @Override protected void beforeExecute(Thread thread, Runnable runnable) { TaskDecorator decorator = (TaskDecorator) runnable; - final ScheduledFuture interrupterScheduledFuture = coordinatorExecutorService.schedule( - () -> thread.interrupt(), - decorator.schedulable.getTimeoutInSeconds(), - TimeUnit.SECONDS - ); - runningTasks.put(runnable, interrupterScheduledFuture); + decorator + .schedulable + .getTimeoutInSeconds() + .ifPresent(timeoutInSeconds -> { + final ScheduledFuture interrupterScheduledFuture = coordinatorExecutorService.schedule( + () -> thread.interrupt(), + timeoutInSeconds, + TimeUnit.SECONDS + ); + runningTasks.put(runnable, interrupterScheduledFuture); + }); } @Override diff --git a/src/test/java/com/teketik/spring/health/TimeoutInterruptibleDisabledITest.java b/src/test/java/com/teketik/spring/health/TimeoutInterruptibleDisabledITest.java new file mode 100644 index 0000000..85a00d4 --- /dev/null +++ b/src/test/java/com/teketik/spring/health/TimeoutInterruptibleDisabledITest.java @@ -0,0 +1,135 @@ +package com.teketik.spring.health; + +import org.assertj.core.matcher.AssertionMatcher; +import org.awaitility.Awaitility; +import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.result.MockMvcResultMatchers; + +import java.time.Duration; +import java.time.LocalDateTime; + +@ActiveProfiles("with-timing-out-indicator-interruption-disabled") +public class TimeoutInterruptibleDisabledITest extends BaseITest { + + @Test + public void test() throws Exception { + final LocalDateTime[] upIndicator1FirstLastChecked = new LocalDateTime[1]; + final LocalDateTime[] timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked = new LocalDateTime[1]; + Awaitility.await().untilAsserted(() -> { + mockMvc + .perform(MockMvcRequestBuilders.get("/actuator/health")) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value(CoreMatchers.not("UNKNOWN"))); + }); + mockMvc + .perform(MockMvcRequestBuilders.get("/actuator/health")) + .andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.status").value("UP")) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value("DOWN")) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.error").doesNotExist()) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.reason").value(Matchers.equalTo("Timeout"))) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastDuration").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + Assert.assertEquals(6, actual.length()); + Assert.assertTrue(actual.endsWith("ms")); + Assert.assertThat(actual, CoreMatchers.startsWith("1")); + } + } + )) + .andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.details.lastChecked").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + upIndicator1FirstLastChecked[0] = LocalDateTime.parse(actual); + } + } + )) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastChecked").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked[0] = LocalDateTime.parse(actual); + } + } + )); + Thread.sleep(1000); + //still the same + mockMvc + .perform(MockMvcRequestBuilders.get("/actuator/health")) + .andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.status").value("UP")) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value("DOWN")) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.error").doesNotExist()) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.reason").value(Matchers.equalTo("Timeout"))) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastDuration").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + Assert.assertEquals(6, actual.length()); + Assert.assertTrue(actual.endsWith("ms")); + Assert.assertThat(actual, CoreMatchers.startsWith("2")); + } + } + )) + .andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.details.lastChecked").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + final int difference = (int) Duration.between(upIndicator1FirstLastChecked[0], LocalDateTime.parse(actual)).toMillis(); + Assert.assertThat(difference, Matchers.greaterThan(0)); + } + } + )) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastChecked").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + final int difference = (int) Duration.between(timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked[0], LocalDateTime.parse(actual)).toMillis(); + Assert.assertThat(difference, Matchers.equalTo(0)); + } + } + )); + Thread.sleep(2000); + //then update last check + mockMvc + .perform(MockMvcRequestBuilders.get("/actuator/health")) + .andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.status").value("UP")) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value("DOWN")) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.error").doesNotExist()) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.reason").value(Matchers.equalTo("Timeout"))) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastDuration").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + Assert.assertEquals(6, actual.length()); + Assert.assertTrue(actual.endsWith("ms")); + Assert.assertThat(actual, CoreMatchers.startsWith("1")); + } + } + )) + .andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.details.lastChecked").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + final int difference = (int) Duration.between(upIndicator1FirstLastChecked[0], LocalDateTime.parse(actual)).toMillis(); + Assert.assertThat(difference, Matchers.greaterThan(0)); + } + } + )) + .andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastChecked").value( + new AssertionMatcher() { + @Override + public void assertion(String actual) throws AssertionError { + final int difference = (int) Duration.between(timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked[0], LocalDateTime.parse(actual)).toMillis(); + Assert.assertThat(difference, Matchers.greaterThan(0)); + } + } + )); + + } + +} diff --git a/src/test/java/com/teketik/spring/health/indicators/TimingOutSleepingIndicatorInterruptionDisabled.java b/src/test/java/com/teketik/spring/health/indicators/TimingOutSleepingIndicatorInterruptionDisabled.java new file mode 100644 index 0000000..35f0800 --- /dev/null +++ b/src/test/java/com/teketik/spring/health/indicators/TimingOutSleepingIndicatorInterruptionDisabled.java @@ -0,0 +1,30 @@ +package com.teketik.spring.health.indicators; + +import com.teketik.spring.health.AsyncHealth; + +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + +@Profile("with-timing-out-indicator-interruption-disabled") +@Component +@AsyncHealth(refreshRate = 1, timeout = 1, interruptOnTimeout = false) +public class TimingOutSleepingIndicatorInterruptionDisabled implements HealthIndicator { + + public static final int SLEEP_DURATION = 2000; + + @Override + public Health health() { + try { + Thread.sleep(SLEEP_DURATION); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Health + .up() + .withDetail("detailKey", "detailValue") + .build(); + } + +}