Skip to content

Commit

Permalink
Add retry logic to Titus API calls
Browse files Browse the repository at this point in the history
Will retry on 408 and 503 by default 3 times with exponential backoff. All beans are configurable/overridable
  • Loading branch information
tgianos committed Mar 17, 2021
1 parent 046eb6f commit f6273cc
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.classify.Classifier;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
import org.springframework.retry.policy.NeverRetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -84,6 +93,7 @@ public class TitusAgentLauncherImpl implements AgentLauncher {
.map(s -> placeholders.getOrDefault(s, s))
.collect(Collectors.toList());
private final RestTemplate restTemplate;
private final RetryTemplate retryTemplate;
private final Cache<String, String> healthIndicatorCache;
private final GenieHostInfo genieHostInfo;
private final TitusAgentLauncherProperties titusAgentLauncherProperties;
Expand All @@ -97,6 +107,7 @@ public class TitusAgentLauncherImpl implements AgentLauncher {
* Constructor.
*
* @param restTemplate the rest template
* @param retryTemplate The {@link RetryTemplate} to use when making Titus API calls
* @param jobRequestAdapter The implementation of {@link TitusJobRequestAdapter} to use
* @param healthIndicatorCache a cache to store metadata about recently launched jobs
* @param genieHostInfo the metadata about the local server and host
Expand All @@ -106,6 +117,7 @@ public class TitusAgentLauncherImpl implements AgentLauncher {
*/
public TitusAgentLauncherImpl(
final RestTemplate restTemplate,
final RetryTemplate retryTemplate,
final TitusJobRequestAdapter jobRequestAdapter,
final Cache<String, String> healthIndicatorCache,
final GenieHostInfo genieHostInfo,
Expand All @@ -114,6 +126,7 @@ public TitusAgentLauncherImpl(
final MeterRegistry registry
) {
this.restTemplate = restTemplate;
this.retryTemplate = retryTemplate;
this.healthIndicatorCache = healthIndicatorCache;
this.genieHostInfo = genieHostInfo;
this.titusAgentLauncherProperties = titusAgentLauncherProperties;
Expand Down Expand Up @@ -150,10 +163,12 @@ public Optional<JsonNode> launchAgent(

try {
final TitusBatchJobRequest titusJobRequest = this.createJobRequest(resolvedJob);
final TitusBatchJobResponse titusResponse = this.restTemplate.postForObject(
this.titusAgentLauncherProperties.getEndpoint().toString() + TITUS_API_JOB_PATH,
titusJobRequest,
TitusBatchJobResponse.class
final TitusBatchJobResponse titusResponse = this.retryTemplate.execute(
(RetryCallback<TitusBatchJobResponse, Throwable>) context -> restTemplate.postForObject(
titusAgentLauncherProperties.getEndpoint().toString() + TITUS_API_JOB_PATH,
titusJobRequest,
TitusBatchJobResponse.class
)
);

if (titusResponse == null) {
Expand Down Expand Up @@ -183,10 +198,10 @@ public Optional<JsonNode> launchAgent(
.putPOJO(TITUS_JOB_REQUEST_EXT_FIELD, titusJobRequest)
.putPOJO(TITUS_JOB_RESPONSE_EXT_FIELD, titusResponse)
);
} catch (Exception e) {
log.error("Failed to launch job on Titus", e);
MetricsUtils.addFailureTagsWithException(tags, e);
throw new AgentLaunchException("Failed to create titus job for job " + jobId, e);
} catch (Throwable t) {
log.error("Failed to launch job on Titus", t);
MetricsUtils.addFailureTagsWithException(tags, t);
throw new AgentLaunchException("Failed to create titus job for job " + jobId, t);
} finally {
this.registry.timer(LAUNCH_TIMER, tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
this.healthIndicatorCache.put(jobId, StringUtils.isBlank(titusJobId) ? "-" : titusJobId);
Expand Down Expand Up @@ -475,4 +490,41 @@ default void modifyJobRequest(
) throws AgentLaunchException {
}
}

/**
* A retry policy that has different behavior based on the type of exception thrown by the rest client during
* calls to the Titus API.
*
* @author tgianos
* @since 4.0.0
*/
public static class TitusAPIRetryPolicy extends ExceptionClassifierRetryPolicy {

private static final long serialVersionUID = -7978685711081275362L;

/**
* Constructor.
*
* @param retryCodes The {@link HttpStatus} codes which should be retried if an API call to Titus fails
* @param maxAttempts The maximum number of retry attempts that should be made upon call failure
*/
public TitusAPIRetryPolicy(final Set<HttpStatus> retryCodes, final int maxAttempts) {
final NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy();
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(maxAttempts);

this.setExceptionClassifier(
(Classifier<Throwable, RetryPolicy>) classifiable -> {
if (classifiable instanceof HttpStatusCodeException) {
final HttpStatusCodeException httpException = (HttpStatusCodeException) classifiable;
final HttpStatus status = httpException.getStatusCode();
if (retryCodes.contains(status)) {
return simpleRetryPolicy;
}
}

return neverRetryPolicy;
}
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,27 @@
import com.netflix.genie.web.util.ExecutorFactory;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.web.client.RestTemplateAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.RestTemplate;

import java.util.EnumSet;
import java.util.concurrent.TimeUnit;

/**
* Auto configuration for beans responsible ofor launching Genie Agent instances.
* Auto configuration for beans responsible for launching Genie Agent instances.
*
* @author tgianos
* @since 4.0.0
Expand All @@ -54,8 +63,26 @@
TitusAgentLauncherProperties.class
}
)
@AutoConfigureAfter(
{
RestTemplateAutoConfiguration.class
}
)
public class AgentLaunchersAutoConfiguration {

/**
* Provide a {@link RestTemplate} instance used for calling the Titus REST API if no other instance is provided.
*
* @param restTemplateBuilder The Spring {@link RestTemplateBuilder} instance to use
* @return The rest template to use
*/
@Bean
@ConditionalOnProperty(name = TitusAgentLauncherProperties.ENABLE_PROPERTY, havingValue = "true")
@ConditionalOnMissingBean(name = "titusRestTemplate")
public RestTemplate titusRestTemplate(final RestTemplateBuilder restTemplateBuilder) {
return restTemplateBuilder.build();
}

/**
* Provides a default implementation of {@link TitusAgentLauncherImpl.TitusJobRequestAdapter} that is a no-op
* if no other implementation has been provided elsewhere.
Expand All @@ -71,11 +98,63 @@ public TitusAgentLauncherImpl.TitusJobRequestAdapter titusJobRequestAdapter() {
};
}

/**
* Provides a default implementation of {@link org.springframework.retry.RetryPolicy} that retries based on a set
* of HTTP status codes. Currently just {@link org.springframework.http.HttpStatus#SERVICE_UNAVAILABLE} and
* {@link org.springframework.http.HttpStatus#REQUEST_TIMEOUT}. Max retries set to 3.
*
* @return A {@link TitusAgentLauncherImpl.TitusAPIRetryPolicy} instance with the default settings applied
*/
@Bean
@ConditionalOnProperty(name = TitusAgentLauncherProperties.ENABLE_PROPERTY, havingValue = "true")
@ConditionalOnMissingBean(name = "titusAPIRetryPolicy", value = RetryPolicy.class)
public TitusAgentLauncherImpl.TitusAPIRetryPolicy titusAPIRetryPolicy() {
return new TitusAgentLauncherImpl.TitusAPIRetryPolicy(
EnumSet.of(HttpStatus.SERVICE_UNAVAILABLE, HttpStatus.REQUEST_TIMEOUT),
3
);
}

/**
* Provides a default implementation of {@link org.springframework.retry.backoff.BackOffPolicy} if no other has
* been defined in the context.
*
* @return A default {@link ExponentialBackOffPolicy} instance
*/
@Bean
@ConditionalOnProperty(name = TitusAgentLauncherProperties.ENABLE_PROPERTY, havingValue = "true")
@ConditionalOnMissingBean(name = "titusAPIBackoffPolicy", value = BackOffPolicy.class)
public ExponentialBackOffPolicy titusAPIBackoffPolicy() {
return new ExponentialBackOffPolicy();
}

/**
* Provides a default implementation of {@link RetryTemplate} that will be used to retry failed Titus api calls
* based on the retry policy and backoff policies defined in the application context.
*
* @param retryPolicy The {@link RetryPolicy} to use for Titus API call failures
* @param backOffPolicy The {@link BackOffPolicy} to use for Titus API call failures
* @return A {@link RetryTemplate} instance configured with the supplied retry and backoff policies
*/
@Bean
@ConditionalOnProperty(name = TitusAgentLauncherProperties.ENABLE_PROPERTY, havingValue = "true")
@ConditionalOnMissingBean(name = "titusAPIRetryTemplate", value = RetryTemplate.class)
public RetryTemplate titusAPIRetryTemplate(
@Qualifier("titusAPIRetryPolicy") final RetryPolicy retryPolicy,
@Qualifier("titusAPIBackoffPolicy") final BackOffPolicy backOffPolicy
) {
final RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}

/**
* Provide a {@link TitusAgentLauncherImpl} implementation which launches agent processes in a dedicated Titus
* container if enabled via property.
*
* @param restTemplate the rest template
* @param retryTemplate The {@link RetryTemplate} instance to use to retry failed Titus API calls
* @param titusJobRequestAdapter The {@link TitusAgentLauncherImpl.TitusJobRequestAdapter} implementation to
* use
* @param genieHostInfo the metadata about the local server and host
Expand All @@ -89,6 +168,7 @@ public TitusAgentLauncherImpl.TitusJobRequestAdapter titusJobRequestAdapter() {
@ConditionalOnProperty(name = TitusAgentLauncherProperties.ENABLE_PROPERTY, havingValue = "true")
public TitusAgentLauncherImpl titusAgentLauncher(
@Qualifier("titusRestTemplate") final RestTemplate restTemplate,
@Qualifier("titusAPIRetryTemplate") final RetryTemplate retryTemplate,
final TitusAgentLauncherImpl.TitusJobRequestAdapter titusJobRequestAdapter,
final GenieHostInfo genieHostInfo,
final TitusAgentLauncherProperties titusAgentLauncherProperties,
Expand All @@ -105,6 +185,7 @@ public TitusAgentLauncherImpl titusAgentLauncher(

return new TitusAgentLauncherImpl(
restTemplate,
retryTemplate,
titusJobRequestAdapter,
healthIndicatorCache,
genieHostInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ import com.netflix.genie.web.dtos.ResolvedJob
import com.netflix.genie.web.exceptions.checked.AgentLaunchException
import com.netflix.genie.web.properties.TitusAgentLauncherProperties
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.springframework.http.HttpStatus
import org.springframework.mock.env.MockEnvironment
import org.springframework.retry.backoff.FixedBackOffPolicy
import org.springframework.retry.policy.NeverRetryPolicy
import org.springframework.retry.support.RetryTemplate
import org.springframework.util.unit.DataSize
import org.springframework.web.client.HttpClientErrorException
import org.springframework.web.client.HttpServerErrorException
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate
import spock.lang.Specification
Expand Down Expand Up @@ -96,8 +102,18 @@ class TitusAgentLauncherImplSpec extends Specification {
this.registry = new SimpleMeterRegistry()
this.environment = new MockEnvironment()

/*
* Note: The retry template used here is "transparent" in the sense it doesn't do
* Anything. We don't really want to test how retries work as that is up to
* the configured retry policy and backoff not so much the business logic
* of this class. For now this is fine
*/
def retryTemplate = new RetryTemplate()
retryTemplate.setRetryPolicy(new NeverRetryPolicy())

this.launcher = new TitusAgentLauncherImpl(
this.restTemplate,
retryTemplate,
this.adapter,
this.cache,
this.genieHostInfo,
Expand Down Expand Up @@ -607,4 +623,79 @@ class TitusAgentLauncherImplSpec extends Specification {
requestCapture.getContainer().getAttributes().get(prop1Key) == prop1Value
requestCapture.getContainer().getAttributes().get(prop2Key) == prop2Value
}

def "Retry policy works as expected"() {
def retryCodes = EnumSet.of(HttpStatus.REQUEST_TIMEOUT, HttpStatus.SERVICE_UNAVAILABLE)
def maxRetries = 2
def policy = new TitusAgentLauncherImpl.TitusAPIRetryPolicy(retryCodes, maxRetries)
def retryTemplate = new RetryTemplate()
def backoffPolicy = new FixedBackOffPolicy()
backoffPolicy.setBackOffPeriod(1L)
retryTemplate.setRetryPolicy(policy)
retryTemplate.setBackOffPolicy(backoffPolicy)
def mockResponse = Mock(TitusBatchJobResponse)

when:
retryTemplate.execute(
{ arg ->
this.restTemplate.postForObject(TITUS_ENDPOINT, Mock(TitusBatchJobRequest), TitusBatchJobResponse.class)
}
)

then:
1 * this.restTemplate.postForObject(TITUS_ENDPOINT, _ as TitusBatchJobRequest, TitusBatchJobResponse.class) >> {
throw new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR)
}
thrown(HttpServerErrorException)

when:
retryTemplate.execute(
{ arg ->
this.restTemplate.postForObject(TITUS_ENDPOINT, Mock(TitusBatchJobRequest), TitusBatchJobResponse.class)
}
)

then:
2 * this.restTemplate.postForObject(TITUS_ENDPOINT, _ as TitusBatchJobRequest, TitusBatchJobResponse.class) >> {
throw new HttpClientErrorException(HttpStatus.REQUEST_TIMEOUT)
}
thrown(HttpClientErrorException)

when:
def response = retryTemplate.execute(
{ arg ->
this.restTemplate.postForObject(TITUS_ENDPOINT, Mock(TitusBatchJobRequest), TitusBatchJobResponse.class)
}
)

then:
2 * this.restTemplate.postForObject(
TITUS_ENDPOINT,
_ as TitusBatchJobRequest,
TitusBatchJobResponse.class
) >>> [] >> { throw new HttpClientErrorException(HttpStatus.REQUEST_TIMEOUT) } >> mockResponse
response == mockResponse
noExceptionThrown()

when:
retryTemplate.execute(
{ arg ->
this.restTemplate.postForObject(TITUS_ENDPOINT, Mock(TitusBatchJobRequest), TitusBatchJobResponse.class)
}
)

then:
2 * this.restTemplate.postForObject(
TITUS_ENDPOINT,
_ as TitusBatchJobRequest,
TitusBatchJobResponse.class
) >>> [

] >> {
throw new HttpServerErrorException(HttpStatus.SERVICE_UNAVAILABLE)
} >> {
throw new HttpClientErrorException(HttpStatus.REQUEST_TIMEOUT)
}
thrown(HttpClientErrorException)
}
}
Loading

0 comments on commit f6273cc

Please sign in to comment.