Skip to content

Commit

Permalink
1.2: Add a way to disable thread interruption on timeout (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
antoinemeyer committed Sep 4, 2022
1 parent 04f747d commit 45f944d
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 24 deletions.
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@




# Spring Boot Async Health Indicator

Async Health Indicator for [spring-boot-actuator](https://docs.spring.io/spring-boot/docs/current/reference/html/actuator.html) >=2.2.0 gives [Health Indicator](https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/actuate/health/HealthIndicator.html) the ability to get refreshed asynchronously on a background ThreadPoolExecutor using the annotation `@AsyncHealth`.
Expand Down Expand Up @@ -35,18 +32,20 @@ This module is auto-configured.
<dependency>
<groupId>com.teketik</groupId>
<artifactId>async-health-indicator</artifactId>
<version>boot2-v1.1</version>
<version>boot2-v1.2</version>
</dependency>
```
- Annotate any `HealthIndicator` with `@AsyncHealth(refreshRate = $REFRESH_RATE, timeout = $TIMEOUT)`
- Annotate any `HealthIndicator` with `@AsyncHealth(refreshRate = $REFRESH_RATE, timeout = $TIMEOUT, interruptOnTimeout = $INTERRUPT_ON_TIMEOUT)`

`$REFRESH_RATE` = Fixed delay in seconds between the termination of the `health()` execution and the commencement of the next (default `1`).

`$REFRESH_RATE` = Fixed delay in seconds between the termination of the `health()` execution and the commencement of the next (default 1).
`$TIMEOUT`= The maximum time in seconds that the `health()` execution can take before being considered `DOWN` (default `10`).

`$TIMEOUT`= The maximum time in seconds that the `health()` execution can take before being considered `DOWN` (default 10).
`$INTERRUPT_ON_TIMEOUT` = Whether the thread should be interrupted when a timeout occurs. (if `false`, the next check will only be scheduled after the current execution terminates naturally). (default `true`)

## Regarding Timeout

When a `health()` method duration exceeds the configured `timeout`, the thread running it is `interrupted`with the hope that the method will fail with an exception (causing it to be `DOWN`) and free up the thread.
When a `health()` method duration exceeds the configured `timeout`, the thread running it is `interrupted` if `interruptOnTimeout` is `true` with the hope that the method will fail with an exception (causing it to be `DOWN`) and free up the thread.
Unfortunately, most I/O calls are not interruptible and the thread may continue to execute the method until it times out (according to the libraries and configuration used).
If that happens, you will observe the `timeout` error message printed for each `/health` hit until that method times out like:
```
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.teketik</groupId>
<artifactId>async-health-indicator</artifactId>
<version>boot2-v1.1</version>
<version>boot2-v1.2</version>
<name>Spring Boot Async Health Indicator</name>
<description>Asynchronous Health Indicator for Spring Boot</description>
<url>https://github.com/antoinemeyer/spring-boot-async-health-indicator</url>
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/teketik/spring/health/AsyncHealth.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
* <p>The {@link HealthIndicator#health()} method:
* <ul>
* <li>is executed with the given delay configured by {@link AsyncHealth#refreshRate() refreshRate()} between the termination of one execution and the commencement of the next.</li>
* <li>may not execute in more than the time configured by {@link AsyncHealth#timeout() timeout()}. Passed this delay, the thread running the {@link HealthIndicator} will be {@link Thread#interrupt() interrupted}
* and the {@link Health} will be set as {@link Status#DOWN} until the next execution.</li>
* <li>may not execute in more than the time configured by {@link AsyncHealth#timeout() timeout()}. Passed this delay, the {@link Health} will be set as {@link Status#DOWN} until the next execution
* and the thread running the {@link HealthIndicator} will be {@link Thread#interrupt() interrupted} if {@link #interruptOnTimeout()} is {@code true}.</li>
* </ul>
* <p>The `/health` endpoint will not invoke the {@link HealthIndicator#health() health method} but return the last {@link Health} calculated asynchronously.
* <p><i>Note that the {@link HealthIndicator} may return {@link Status#UNKNOWN} on application startup if the `/health` endpoint
Expand All @@ -37,8 +37,8 @@
* <hr>
* <h3>Regarding Timeout</h3>
* <p>
* When a {@link HealthIndicator#health() health method} duration exceeds the configured {@link #timeout()}, the thread running it is <strong>interrupted</strong> with the hope that the method will fail with an exception (causing it to be {@link Status#DOWN})
* and free up the thread.<br>
* When a {@link HealthIndicator#health() health method} duration exceeds the configured {@link #timeout()}, the thread running it is <strong>interrupted</strong> if {@link #interruptOnTimeout()} is {@code true}
* with the hope that the method will fail with an exception (causing it to be {@link Status#DOWN}) and free up the thread.<br>
* Unfortunately, most I/O calls are not interruptible and the thread may continue to execute the method until it times out (according to the libraries and configuration used).
* <p>
* If that happens, you will observe the 'timeout' error message printed for each `/health` hit until that method times out like:
Expand Down Expand Up @@ -67,5 +67,10 @@
*/
int timeout() default 10;

/**
* @return whether the thread should be interrupted when a timeout occurs. (if {@code false}, the next check will only be scheduled after the current execution terminates naturally).
*/
boolean interruptOnTimeout() default true;

}

Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ class AsyncHealthIndicator implements HealthIndicator, Schedulable {
private final String name;
private final int refreshRateInSeconds;
private final int timeoutInSeconds;
private final boolean interruptOnTimeout;

private volatile Health lastHealth;
private volatile long healthStartTimeMillis = -1;

public AsyncHealthIndicator(HealthIndicator originalHealthIndicator, String name, int refreshRateInSeconds, int timeoutInSeconds) {
public AsyncHealthIndicator(HealthIndicator originalHealthIndicator, String name, int refreshRateInSeconds, int timeoutInSeconds, boolean interruptOnTimeout) {
this.originalHealthIndicator = originalHealthIndicator;
this.name = name;
this.refreshRateInSeconds = refreshRateInSeconds;
this.timeoutInSeconds = timeoutInSeconds;
this.interruptOnTimeout = interruptOnTimeout;
}

@Override
Expand Down Expand Up @@ -115,8 +117,10 @@ public int getRefreshRateInSeconds() {
}

@Override
public int getTimeoutInSeconds() {
return timeoutInSeconds;
public Optional<Integer> getTimeoutInSeconds() {
return interruptOnTimeout
? Optional.of(timeoutInSeconds)
: Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void afterPropertiesSet() throws Exception {
indicator,
contributorName,
annotation.refreshRate(),
annotation.timeout()
annotation.timeout(),
annotation.interruptOnTimeout()
);
healthContributorRegistry.registerContributor(contributorName, asyncHealthIndicator);
asyncHealthIndicators.add(asyncHealthIndicator);
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/com/teketik/utils/Schedulable.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
package com.teketik.utils;

import java.util.Optional;

public interface Schedulable extends Runnable {

/**
* @return time in seconds between the termination of one execution and the commencement of the next.
*/
int getRefreshRateInSeconds();
int getTimeoutInSeconds();

/**
* @return maximum time in seconds this can run before being interrupted ({@link Optional#empty() empty} if never to be interrupted).
*/
Optional<Integer> getTimeoutInSeconds();

}
17 changes: 11 additions & 6 deletions src/main/java/com/teketik/utils/SchedulingThreadPoolExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
TaskDecorator decorator = (TaskDecorator) runnable;
final ScheduledFuture<?> interrupterScheduledFuture = coordinatorExecutorService.schedule(
() -> thread.interrupt(),
decorator.schedulable.getTimeoutInSeconds(),
TimeUnit.SECONDS
);
runningTasks.put(runnable, interrupterScheduledFuture);
decorator
.schedulable
.getTimeoutInSeconds()
.ifPresent(timeoutInSeconds -> {
final ScheduledFuture<?> interrupterScheduledFuture = coordinatorExecutorService.schedule(
() -> thread.interrupt(),
timeoutInSeconds,
TimeUnit.SECONDS
);
runningTasks.put(runnable, interrupterScheduledFuture);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.teketik.spring.health;

import org.assertj.core.matcher.AssertionMatcher;
import org.awaitility.Awaitility;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;

import java.time.Duration;
import java.time.LocalDateTime;

@ActiveProfiles("with-timing-out-indicator-interruption-disabled")
public class TimeoutInterruptibleDisabledITest extends BaseITest {

@Test
public void test() throws Exception {
final LocalDateTime[] upIndicator1FirstLastChecked = new LocalDateTime[1];
final LocalDateTime[] timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked = new LocalDateTime[1];
Awaitility.await().untilAsserted(() -> {
mockMvc
.perform(MockMvcRequestBuilders.get("/actuator/health"))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value(CoreMatchers.not("UNKNOWN")));
});
mockMvc
.perform(MockMvcRequestBuilders.get("/actuator/health"))
.andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.status").value("UP"))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value("DOWN"))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.error").doesNotExist())
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.reason").value(Matchers.equalTo("Timeout")))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastDuration").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
Assert.assertEquals(6, actual.length());
Assert.assertTrue(actual.endsWith("ms"));
Assert.assertThat(actual, CoreMatchers.startsWith("1"));
}
}
))
.andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.details.lastChecked").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
upIndicator1FirstLastChecked[0] = LocalDateTime.parse(actual);
}
}
))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastChecked").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked[0] = LocalDateTime.parse(actual);
}
}
));
Thread.sleep(1000);
//still the same
mockMvc
.perform(MockMvcRequestBuilders.get("/actuator/health"))
.andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.status").value("UP"))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value("DOWN"))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.error").doesNotExist())
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.reason").value(Matchers.equalTo("Timeout")))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastDuration").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
Assert.assertEquals(6, actual.length());
Assert.assertTrue(actual.endsWith("ms"));
Assert.assertThat(actual, CoreMatchers.startsWith("2"));
}
}
))
.andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.details.lastChecked").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
final int difference = (int) Duration.between(upIndicator1FirstLastChecked[0], LocalDateTime.parse(actual)).toMillis();
Assert.assertThat(difference, Matchers.greaterThan(0));
}
}
))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastChecked").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
final int difference = (int) Duration.between(timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked[0], LocalDateTime.parse(actual)).toMillis();
Assert.assertThat(difference, Matchers.equalTo(0));
}
}
));
Thread.sleep(2000);
//then update last check
mockMvc
.perform(MockMvcRequestBuilders.get("/actuator/health"))
.andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.status").value("UP"))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.status").value("DOWN"))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.error").doesNotExist())
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.reason").value(Matchers.equalTo("Timeout")))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastDuration").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
Assert.assertEquals(6, actual.length());
Assert.assertTrue(actual.endsWith("ms"));
Assert.assertThat(actual, CoreMatchers.startsWith("1"));
}
}
))
.andExpect(MockMvcResultMatchers.jsonPath("components.upIndicator1.details.lastChecked").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
final int difference = (int) Duration.between(upIndicator1FirstLastChecked[0], LocalDateTime.parse(actual)).toMillis();
Assert.assertThat(difference, Matchers.greaterThan(0));
}
}
))
.andExpect(MockMvcResultMatchers.jsonPath("components.timingOutSleepingIndicatorInterruptionDisabled.details.lastChecked").value(
new AssertionMatcher<String>() {
@Override
public void assertion(String actual) throws AssertionError {
final int difference = (int) Duration.between(timingOutSleepingIndicatorInterruptionDisabledFirstLastChecked[0], LocalDateTime.parse(actual)).toMillis();
Assert.assertThat(difference, Matchers.greaterThan(0));
}
}
));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.teketik.spring.health.indicators;

import com.teketik.spring.health.AsyncHealth;

import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Profile("with-timing-out-indicator-interruption-disabled")
@Component
@AsyncHealth(refreshRate = 1, timeout = 1, interruptOnTimeout = false)
public class TimingOutSleepingIndicatorInterruptionDisabled implements HealthIndicator {

public static final int SLEEP_DURATION = 2000;

@Override
public Health health() {
try {
Thread.sleep(SLEEP_DURATION);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return Health
.up()
.withDetail("detailKey", "detailValue")
.build();
}

}

0 comments on commit 45f944d

Please sign in to comment.