Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
move retry template creation to configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Apr 18, 2022
1 parent 9e63f4e commit 61425b6
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 68 deletions.
3 changes: 2 additions & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {
implementation project(':conductor-common')
compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.springframework.boot:spring-boot-starter-validation'
compileOnly "org.springframework.retry:spring-retry"
compileOnly 'org.springframework.retry:spring-retry'
compileOnly 'org.springframework.boot:spring-boot-starter-aop'

implementation "com.fasterxml.jackson.core:jackson-annotations"
Expand Down Expand Up @@ -51,6 +51,7 @@ dependencies {
testImplementation "org.glassfish.jaxb:jaxb-runtime:${revJAXB}"

testImplementation 'org.springframework.boot:spring-boot-starter-validation'
testImplementation 'org.springframework.retry:spring-retry'
testImplementation project(':conductor-common').sourceSets.test.output

testImplementation "org.codehaus.groovy:groovy-all:${revGroovy}"
Expand Down
8 changes: 8 additions & 0 deletions core/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,9 @@
"org.springframework.boot:spring-boot-starter-test"
]
},
"org.springframework.retry:spring-retry": {
"locked": "1.2.5.RELEASE"
},
"org.springframework:spring-aop": {
"locked": "5.2.15.RELEASE",
"transitive": [
Expand All @@ -1121,6 +1124,7 @@
"org.springframework.boot:spring-boot",
"org.springframework.boot:spring-boot-starter",
"org.springframework.boot:spring-boot-starter-test",
"org.springframework.retry:spring-retry",
"org.springframework:spring-aop",
"org.springframework:spring-beans",
"org.springframework:spring-context",
Expand Down Expand Up @@ -1870,6 +1874,9 @@
"org.springframework.boot:spring-boot-starter-test"
]
},
"org.springframework.retry:spring-retry": {
"locked": "1.2.5.RELEASE"
},
"org.springframework:spring-aop": {
"locked": "5.2.15.RELEASE",
"transitive": [
Expand All @@ -1895,6 +1902,7 @@
"org.springframework.boot:spring-boot",
"org.springframework.boot:spring-boot-starter",
"org.springframework.boot:spring-boot-starter-test",
"org.springframework.retry:spring-retry",
"org.springframework:spring-aop",
"org.springframework:spring-beans",
"org.springframework:spring-context",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
Expand All @@ -48,7 +47,6 @@
import static java.util.function.Function.identity;

@Configuration(proxyBeanMethods = false)
@EnableRetry
@EnableConfigurationProperties(ConductorProperties.class)
public class ConductorCoreConfiguration {

Expand Down
3 changes: 2 additions & 1 deletion es6-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {
implementation project(':conductor-core')

compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly "org.springframework.retry:spring-retry"
compileOnly 'org.springframework.retry:spring-retry'
compileOnly 'org.springframework.boot:spring-boot-starter-aop'

implementation "commons-io:commons-io:${revCommonsIo}"
Expand All @@ -28,6 +28,7 @@ dependencies {
implementation "org.elasticsearch.client:elasticsearch-rest-client"
implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client"

testImplementation 'org.springframework.retry:spring-retry'
testImplementation "org.awaitility:awaitility:${revAwaitility}"
testImplementation "org.testcontainers:elasticsearch:${revTestContainer}"
testImplementation project(':conductor-common').sourceSets.test.output
Expand Down
8 changes: 8 additions & 0 deletions es6-persistence/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,9 @@
"org.springframework.boot:spring-boot-starter-test"
]
},
"org.springframework.retry:spring-retry": {
"locked": "1.2.5.RELEASE"
},
"org.springframework:spring-aop": {
"locked": "5.2.15.RELEASE",
"transitive": [
Expand All @@ -1827,6 +1830,7 @@
"org.springframework.boot:spring-boot",
"org.springframework.boot:spring-boot-starter",
"org.springframework.boot:spring-boot-starter-test",
"org.springframework.retry:spring-retry",
"org.springframework:spring-aop",
"org.springframework:spring-beans",
"org.springframework:spring-context",
Expand Down Expand Up @@ -2688,6 +2692,9 @@
"org.springframework.boot:spring-boot-starter-test"
]
},
"org.springframework.retry:spring-retry": {
"locked": "1.2.5.RELEASE"
},
"org.springframework:spring-aop": {
"locked": "5.2.15.RELEASE",
"transitive": [
Expand All @@ -2713,6 +2720,7 @@
"org.springframework.boot:spring-boot",
"org.springframework.boot:spring-boot-starter",
"org.springframework.boot:spring-boot-starter-test",
"org.springframework.retry:spring-retry",
"org.springframework:spring-aop",
"org.springframework:spring-beans",
"org.springframework:spring-context",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.es6.dao.index.ElasticSearchDAOV6;
Expand All @@ -47,7 +49,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration(proxyBeanMethods = false)
@EnableRetry
@EnableConfigurationProperties(ElasticSearchProperties.class)
@Conditional(ElasticSearchConditions.ElasticSearchV6Enabled.class)
public class ElasticSearchV6Configuration {
Expand Down Expand Up @@ -124,15 +125,30 @@ public RestClientBuilder restClientBuilder(ElasticSearchProperties properties) {
public IndexDAO es6IndexRestDAO(
RestClientBuilder restClientBuilder,
ElasticSearchProperties properties,
@Qualifier("es6RetryTemplate") RetryTemplate retryTemplate,
ObjectMapper objectMapper) {
return new ElasticSearchRestDAOV6(restClientBuilder, properties, objectMapper);
return new ElasticSearchRestDAOV6(
restClientBuilder, retryTemplate, properties, objectMapper);
}

@Bean
@Conditional(IsTcpProtocol.class)
public IndexDAO es6IndexDAO(
Client client, ElasticSearchProperties properties, ObjectMapper objectMapper) {
return new ElasticSearchDAOV6(client, properties, objectMapper);
Client client,
@Qualifier("es6RetryTemplate") RetryTemplate retryTemplate,
ElasticSearchProperties properties,
ObjectMapper objectMapper) {
return new ElasticSearchDAOV6(client, retryTemplate, properties, objectMapper);
}

@Bean
@Qualifier("es6RetryTemplate")
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
return retryTemplate;
}

private HttpHost[] convertToHttpHosts(List<URL> hosts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.annotations.Trace;
Expand Down Expand Up @@ -117,14 +115,15 @@ public class ElasticSearchDAOV6 extends ElasticSearchBaseDAO implements IndexDAO
private final long asyncBufferFlushTimeout;
private final ElasticSearchProperties properties;

private final RetryTemplate retryTemplate = createRetryTemplate();
private final RetryTemplate retryTemplate;

static {
SIMPLE_DATE_FORMAT.setTimeZone(GMT);
}

public ElasticSearchDAOV6(
Client elasticSearchClient,
RetryTemplate retryTemplate,
ElasticSearchProperties properties,
ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
Expand Down Expand Up @@ -184,6 +183,7 @@ public ElasticSearchDAOV6(

Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this::flushBulkRequests, 60, 30, TimeUnit.SECONDS);
this.retryTemplate = retryTemplate;
}

@PreDestroy
Expand Down Expand Up @@ -907,20 +907,6 @@ private void flushBulkRequests() {
});
}

private RetryTemplate createRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();

FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);

return retryTemplate;
}

private static class BulkRequests {

private long lastFlushTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.annotations.Trace;
Expand Down Expand Up @@ -85,7 +83,6 @@ public class ElasticSearchRestDAOV6 extends ElasticSearchBaseDAO implements Inde

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchRestDAOV6.class);

private static final int RETRY_COUNT = 3;
private static final int CORE_POOL_SIZE = 6;
private static final long KEEP_ALIVE_TIME = 1L;

Expand Down Expand Up @@ -129,14 +126,15 @@ public class ElasticSearchRestDAOV6 extends ElasticSearchBaseDAO implements Inde
private final long asyncBufferFlushTimeout;
private final ElasticSearchProperties properties;

private final RetryTemplate retryTemplate = createRetryTemplate();
private final RetryTemplate retryTemplate;

static {
SIMPLE_DATE_FORMAT.setTimeZone(GMT);
}

public ElasticSearchRestDAOV6(
RestClientBuilder restClientBuilder,
RetryTemplate retryTemplate,
ElasticSearchProperties properties,
ObjectMapper objectMapper) {

Expand Down Expand Up @@ -203,6 +201,7 @@ public ElasticSearchRestDAOV6(

Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this::flushBulkRequests, 60, 30, TimeUnit.SECONDS);
this.retryTemplate = retryTemplate;
}

@PreDestroy
Expand Down Expand Up @@ -1067,20 +1066,6 @@ private void flushBulkRequests() {
});
}

private RetryTemplate createRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();

FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);

return retryTemplate;
}

private static class BulkRequests {

private final long lastFlushTime;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -25,6 +25,7 @@
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.springframework.retry.support.RetryTemplate;

abstract class ElasticSearchDaoBaseTest extends ElasticSearchTest {

Expand All @@ -45,7 +46,9 @@ public void setup() throws Exception {
new TransportAddress(
InetAddress.getByName("localhost"), mappedPort));

indexDAO = new ElasticSearchDAOV6(elasticSearchClient, properties, objectMapper);
indexDAO =
new ElasticSearchDAOV6(
elasticSearchClient, new RetryTemplate(), properties, objectMapper);
indexDAO.setup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.client.RestClientBuilder;
import org.junit.After;
import org.junit.Before;
import org.springframework.retry.support.RetryTemplate;

abstract class ElasticSearchRestDaoBaseTest extends ElasticSearchTest {

Expand All @@ -40,7 +41,9 @@ public void setup() throws Exception {
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(host, port, "http"));
restClient = restClientBuilder.build();

indexDAO = new ElasticSearchRestDAOV6(restClientBuilder, properties, objectMapper);
indexDAO =
new ElasticSearchRestDAOV6(
restClientBuilder, new RetryTemplate(), properties, objectMapper);
indexDAO.setup();
}

Expand Down
1 change: 1 addition & 0 deletions es7-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
implementation "org.elasticsearch.client:elasticsearch-rest-client"
implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client"

testImplementation 'org.springframework.retry:spring-retry'
testImplementation "org.awaitility:awaitility:${revAwaitility}"
testImplementation "org.testcontainers:elasticsearch:${revTestContainer}"
testImplementation project(':conductor-common').sourceSets.test.output
Expand Down
8 changes: 8 additions & 0 deletions es7-persistence/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,9 @@
"org.springframework.boot:spring-boot-starter-test"
]
},
"org.springframework.retry:spring-retry": {
"locked": "1.2.5.RELEASE"
},
"org.springframework:spring-aop": {
"locked": "5.2.15.RELEASE",
"transitive": [
Expand All @@ -1731,6 +1734,7 @@
"org.springframework.boot:spring-boot",
"org.springframework.boot:spring-boot-starter",
"org.springframework.boot:spring-boot-starter-test",
"org.springframework.retry:spring-retry",
"org.springframework:spring-aop",
"org.springframework:spring-beans",
"org.springframework:spring-context",
Expand Down Expand Up @@ -2632,6 +2636,9 @@
"org.springframework.boot:spring-boot-starter-test"
]
},
"org.springframework.retry:spring-retry": {
"locked": "1.2.5.RELEASE"
},
"org.springframework:spring-aop": {
"locked": "5.2.15.RELEASE",
"transitive": [
Expand All @@ -2657,6 +2664,7 @@
"org.springframework.boot:spring-boot",
"org.springframework.boot:spring-boot-starter",
"org.springframework.boot:spring-boot-starter-test",
"org.springframework.retry:spring-retry",
"org.springframework:spring-aop",
"org.springframework:spring-beans",
"org.springframework:spring-context",
Expand Down
Loading

0 comments on commit 61425b6

Please sign in to comment.