Skip to content

Commit

Permalink
fix(hc-improvement): await for HC call to finish before call another one
Browse files Browse the repository at this point in the history
  • Loading branch information
wbabyte committed Mar 7, 2024
1 parent 7600066 commit 067598b
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 31 deletions.
Expand Up @@ -59,8 +59,7 @@ protected RequestOptions prepareHttpClientRequest(URL request, HealthCheckStep s

// Set timeout on request
if (rule.endpoint().getHttpClientOptions() != null) {
// We want to ensure that the read/write timeout will expire BEFORE an other HC will be executed
options.setTimeout(Math.min(getDelayMillis(), rule.endpoint().getHttpClientOptions().getReadTimeout()));
options.setTimeout(rule.endpoint().getHttpClientOptions().getReadTimeout());
}

return options;
Expand All @@ -82,8 +81,7 @@ protected HttpClientOptions createHttpClientOptions(final HttpEndpoint endpoint,
.setKeepAlive(endpoint.getHttpClientOptions().isKeepAlive())
.setTcpKeepAlive(endpoint.getHttpClientOptions().isKeepAlive())
.setIdleTimeout((int) (endpoint.getHttpClientOptions().getIdleTimeout() / 1000))
// We want to ensure that the connect timeout will expire BEFORE an other HC will be executed
.setConnectTimeout((int) Math.min(getDelayMillis(), endpoint.getHttpClientOptions().getConnectTimeout()))
.setConnectTimeout((int) endpoint.getHttpClientOptions().getConnectTimeout())
.setTryUseCompression(endpoint.getHttpClientOptions().isUseCompression());

if (endpoint.getHttpClientOptions().getVersion() == ProtocolVersion.HTTP_2) {
Expand Down
Expand Up @@ -48,8 +48,7 @@ public EndpointRuleCronHandler<T> schedule(EndpointRuleHandler<T> handler) {

@Override
public void handle(final Long timerId) {
this.timerId = vertx.setTimer(handler.getDelayMillis(), this);
handler.handle(timerId);
handler.handle(hcResponseHandler -> this.timerId = vertx.setTimer(handler.getDelayMillis(), this));
}

public void cancel() {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import io.gravitee.reporter.api.health.EndpointStatus;
import io.gravitee.reporter.api.health.Step;
import io.netty.channel.ConnectTimeoutException;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand All @@ -62,7 +63,7 @@
* @author Azize ELAMRANI (azize.elamrani at graviteesource.com)
* @author GraviteeSource Team
*/
public abstract class EndpointRuleHandler<T extends Endpoint> implements Handler<Long> {
public abstract class EndpointRuleHandler<T extends Endpoint> implements Handler<Handler<AsyncResult<HttpClientResponse>>> {

private final Logger logger = LoggerFactory.getLogger(EndpointRuleHandler.class);

Expand Down Expand Up @@ -117,16 +118,14 @@ public EndpointRuleHandler(Vertx vertx, EndpointRule<T> rule, TemplateEngine tem
}

@Override
public void handle(Long timer) {
public void handle(Handler<AsyncResult<HttpClientResponse>> healthCheckResponseHandler) {
try {
MDC.put("api", rule.api().getId());
T endpoint = rule.endpoint();
logger.debug("Running health-check for endpoint: {} [{}]", endpoint.getName(), endpoint.getTarget());

// Run request for each step
for (HealthCheckStep step : rule.steps()) {
runStep(endpoint, step);
}
// We only allow one step per rule. To support more than one step implement healthCheckResponseHandler accordingly
runStep(endpoint, rule.steps().get(0), healthCheckResponseHandler);
} finally {
MDC.remove("api");
}
Expand Down Expand Up @@ -199,7 +198,7 @@ protected URL createRequest(T endpoint, HealthCheckStep step) throws MalformedUR
return resultURL;
}

protected void runStep(T endpoint, HealthCheckStep step) {
protected void runStep(T endpoint, HealthCheckStep step, Handler<AsyncResult<HttpClientResponse>> healthCheckResponseHandler) {
try {
URL hcRequestUrl = createRequest(endpoint, step);
Future<HttpClientRequest> healthRequestPromise = createHttpClientRequest(httpClient, hcRequestUrl, step);
Expand Down Expand Up @@ -242,6 +241,7 @@ protected void runStep(T endpoint, HealthCheckStep step) {
logger.error("An error has occurred during Health check request", healthRequestEvent.cause());
reportThrowable(healthRequestEvent.cause(), step, healthBuilder, startTime, request);
}
healthCheckResponseHandler.handle(healthRequestEvent);
});

healthRequest.exceptionHandler(throwable -> {
Expand Down
Expand Up @@ -37,18 +37,23 @@
import io.gravitee.gateway.services.healthcheck.EndpointRule;
import io.gravitee.reporter.api.health.EndpointStatus;
import io.gravitee.reporter.api.health.Step;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.net.ProxyOptions;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.VertxTestContext;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junitpioneer.jupiter.RetryingTest;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.SimpleTriggerContext;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
Expand Down Expand Up @@ -79,7 +84,8 @@ void setup() {
void shouldNotValidate_invalidEndpoint(Vertx vertx, VertxTestContext context) throws Throwable {
// Prepare HTTP endpoint
wm.stubFor(get(urlEqualTo("/")).willReturn(notFound()));
final Checkpoint checkpoint = context.checkpoint();
final Checkpoint statusCheckpoint = context.checkpoint();
final Checkpoint responseCheckpoint = context.checkpoint();

EndpointRule rule = createEndpointRule();

Expand All @@ -100,20 +106,26 @@ void shouldNotValidate_invalidEndpoint(Vertx vertx, VertxTestContext context) th
(Handler<EndpointStatus>) status -> {
assertFalse(status.isSuccess());
wm.verify(getRequestedFor(urlEqualTo("/")));
checkpoint.flag();
statusCheckpoint.flag();
}
);

// Run
runner.handle(null);
runner.handle(
(Handler<AsyncResult<HttpClientResponse>>) hcResponseHandler -> {
assertTrue(hcResponseHandler.succeeded());
responseCheckpoint.flag();
}
);
}

@Test
void shouldValidate(Vertx vertx, VertxTestContext context) throws Throwable {
// Prepare HTTP endpoint
wm.stubFor(get(urlEqualTo("/")).willReturn(ok("{\"status\": \"green\"}")));

final Checkpoint checkpoint = context.checkpoint();
final Checkpoint statusCheckpoint = context.checkpoint();
final Checkpoint responseCheckpoint = context.checkpoint();

// Prepare
EndpointRule rule = createEndpointRule();
Expand All @@ -134,20 +146,26 @@ void shouldValidate(Vertx vertx, VertxTestContext context) throws Throwable {
(Handler<EndpointStatus>) status -> {
assertTrue(status.isSuccess());
wm.verify(getRequestedFor(urlEqualTo("/")));
checkpoint.flag();
statusCheckpoint.flag();
}
);

// Run
runner.handle(null);
runner.handle(
(Handler<AsyncResult<HttpClientResponse>>) hcResponseHandler -> {
assertTrue(hcResponseHandler.succeeded());
responseCheckpoint.flag();
}
);
}

@Test
void shouldValidateWithEL(Vertx vertx, VertxTestContext context) throws Throwable {
// Prepare HTTP endpoint
wm.stubFor(get(urlEqualTo("/withProperties/")).willReturn(ok("{\"status\": \"green\"}")));

final Checkpoint checkpoint = context.checkpoint();
final Checkpoint statusCheckpoint = context.checkpoint();
final Checkpoint responseCheckpoint = context.checkpoint();

// Prepare
EndpointRule rule = createEndpointRule("{#properties['backend’]}");
Expand All @@ -170,20 +188,26 @@ void shouldValidateWithEL(Vertx vertx, VertxTestContext context) throws Throwabl
(Handler<EndpointStatus>) status -> {
assertTrue(status.isSuccess());
wm.verify(getRequestedFor(urlEqualTo("/withProperties/")));
checkpoint.flag();
statusCheckpoint.flag();
}
);

// Run
runner.handle(null);
runner.handle(
(Handler<AsyncResult<HttpClientResponse>>) hcResponseHandler -> {
assertTrue(hcResponseHandler.succeeded());
responseCheckpoint.flag();
}
);
}

@Test
void shouldNotValidate_invalidResponseBody(Vertx vertx, VertxTestContext context) throws Throwable {
// Prepare HTTP endpoint
wm.stubFor(get(urlEqualTo("/")).willReturn(ok("{\"status\": \"yellow\"}")));

final Checkpoint checkpoint = context.checkpoint();
final Checkpoint statusCheckpoint = context.checkpoint();
final Checkpoint responseCheckpoint = context.checkpoint();

// Prepare
EndpointRule rule = createEndpointRule();
Expand All @@ -210,12 +234,17 @@ void shouldNotValidate_invalidResponseBody(Vertx vertx, VertxTestContext context
assertEquals(HttpMethod.GET, result.getRequest().getMethod());
assertNotNull(result.getResponse().getBody());

checkpoint.flag();
statusCheckpoint.flag();
}
);

// Run
runner.handle(null);
runner.handle(
(Handler<AsyncResult<HttpClientResponse>>) hcResponseHandler -> {
assertTrue(hcResponseHandler.succeeded());
responseCheckpoint.flag();
}
);

// Wait until completion
assertTrue(context.awaitCompletion(5, TimeUnit.SECONDS));
Expand All @@ -226,7 +255,8 @@ void shouldNotValidate_invalidResponseBody(Vertx vertx, VertxTestContext context
void shouldValidateFromRoot(Vertx vertx, VertxTestContext context) throws Throwable {
// Prepare HTTP endpoint
wm.stubFor(get(urlEqualTo("/")).willReturn(ok()));
final Checkpoint checkpoint = context.checkpoint();
final Checkpoint statusCheckpoint = context.checkpoint();
final Checkpoint responseCheckpoint = context.checkpoint();

// Prepare
EndpointRule rule = createEndpointRule("/additional-but-unused-path-for-hc");
Expand All @@ -249,20 +279,26 @@ void shouldValidateFromRoot(Vertx vertx, VertxTestContext context) throws Throwa
wm.verify(getRequestedFor(urlEqualTo("/")));
wm.verify(0, getRequestedFor(urlEqualTo("/additional-but-unused-path-for-hc")));
assertTrue(status.isSuccess());
checkpoint.flag();
statusCheckpoint.flag();
}
);

// Run
runner.handle(null);
runner.handle(
(Handler<AsyncResult<HttpClientResponse>>) hcResponseHandler -> {
assertTrue(hcResponseHandler.succeeded());
responseCheckpoint.flag();
}
);
}

@Test
void shouldValidateWithUnderscoreInHostname(Vertx vertx, VertxTestContext context) throws Throwable {
// Prepare HTTP endpoint
wm.stubFor(get(urlEqualTo("/")).withHost(equalTo("my_local_host")).willReturn(ok()));

final Checkpoint checkpoint = context.checkpoint();
final Checkpoint statusCheckpoint = context.checkpoint();
final Checkpoint responseCheckpoint = context.checkpoint();

// Prepare
EndpointRule rule = createEndpointRule("http://my_local_host", null, true);
Expand All @@ -283,12 +319,64 @@ void shouldValidateWithUnderscoreInHostname(Vertx vertx, VertxTestContext contex
(Handler<EndpointStatus>) status -> {
assertTrue(status.isSuccess());
wm.verify(getRequestedFor(urlEqualTo("/")));
checkpoint.flag();
statusCheckpoint.flag();
}
);

// Run
runner.handle(null);
runner.handle(
(Handler<AsyncResult<HttpClientResponse>>) hcResponseHandler -> {
assertTrue(hcResponseHandler.succeeded());
responseCheckpoint.flag();
}
);
}

@Test
void shouldValidateWithFixedDelayed(Vertx vertx, VertxTestContext context) throws Throwable {
// Prepare HTTP endpoint
wm.stubFor(get(urlEqualTo("/")).willReturn(ok("{\"status\": \"green\"}").withFixedDelay(3500)));

final Checkpoint statusCheckpoint = context.checkpoint();
final Checkpoint responseCheckpoint = context.checkpoint();

// Prepare
EndpointRule rule = createEndpointRule();
when(rule.schedule()).thenReturn("*/1 * * * * *");

HealthCheckStep step = new HealthCheckStep();
HealthCheckRequest request = new HealthCheckRequest("/", HttpMethod.GET);

step.setRequest(request);
HealthCheckResponse response = new HealthCheckResponse();
response.setAssertions(Collections.singletonList(HealthCheckResponse.DEFAULT_ASSERTION));
step.setResponse(response);
when(rule.steps()).thenReturn(Collections.singletonList(step));

HttpEndpointRuleHandler runner = new HttpEndpointRuleHandler(vertx, rule, templateEngine, environment);

// Verify
runner.setStatusHandler(
(Handler<EndpointStatus>) status -> {
assertTrue(status.isSuccess());
wm.verify(getRequestedFor(urlEqualTo("/")));
statusCheckpoint.flag();
}
);

Date nextExecutionDate = new CronTrigger(rule.schedule()).nextExecutionTime(new SimpleTriggerContext());
// Run
runner.handle(
(Handler<AsyncResult<HttpClientResponse>>) hcResponseHandler -> {
assertTrue(hcResponseHandler.succeeded());
Date nextExecutionDateAfterDelayedRequest = new CronTrigger(rule.schedule()).nextExecutionTime(new SimpleTriggerContext());
//at least 3 cron schedules should be ignored
assertTrue((nextExecutionDateAfterDelayedRequest.getTime() - nextExecutionDate.getTime()) / 1000 > 2);
responseCheckpoint.flag();
}
);
assertTrue(context.awaitCompletion(5, TimeUnit.SECONDS));
assertTrue(context.completed());
}

private Endpoint createEndpoint(String baseUrl, String targetPath, boolean useSystemProxy) {
Expand Down

0 comments on commit 067598b

Please sign in to comment.