Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1138 from naveenchlsn/DISCARD_POLICY
Browse files Browse the repository at this point in the history
Adds pool configuration for index dao and adds discarding policy to e…
  • Loading branch information
apanicker-nflx committed May 17, 2019
2 parents 8699bc8 + 0185374 commit 7d3e1df
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 9 deletions.
4 changes: 3 additions & 1 deletion es5-persistence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ If using the `http` or `https`, then conductor will use the REST transport proto
Defaults to `conductor`
* `workflow.elasticsearch.tasklog.index.name` - The name of the task log index.
Defaults to `task_log`
* `workflow.elasticsearch.async.dao.worker.queue.size=100` - Worker Queue size used in executor service for async methods in IndexDao
* `workflow.elasticsearch.async.dao.worker.queue.size` - Worker Queue size used in executor service for async methods in IndexDao
Defaults to `100`
* `workflow.elasticsearch.async.dao.max.pool.size` - Maximum thread pool size in executor service for async methods in IndexDao
Defaults to `12`

### Embedded Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,17 @@ public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration
this.archiveSearchBatchSize = config.getArchiveSearchBatchSize();

int corePoolSize = 6;
int maximumPoolSize = 12;
int maximumPoolSize = config.getAsyncMaxPoolSize();
long keepAliveTime = 1L;
int workerQueueSize = config.getAsyncWorkerQueueSize();
this.executorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(workerQueueSize));
new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn("Request {} to async dao discarded in executor {}", runnable, executor);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.es5.index.query.parser.Expression;
import com.netflix.conductor.elasticsearch.query.parser.ParserException;
import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration;
import com.netflix.conductor.metrics.Monitors;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -122,14 +121,17 @@ public ElasticSearchRestDAOV5(RestClient lowLevelRestClient, ElasticSearchConfig

// Set up a workerpool for performing async operations.
int corePoolSize = 6;
int maximumPoolSize = 12;
int maximumPoolSize = config.getAsyncMaxPoolSize();
long keepAliveTime = 1L;
int workerQueueSize = config.getAsyncWorkerQueueSize();
this.executorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(workerQueueSize));
new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn("Request {} to async dao discarded in executor {}", runnable, executor);
});

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface ElasticSearchConfiguration extends Configuration {
String ELASTIC_SEARCH_ASYNC_DAO_WORKER_QUEUE_SIZE = "workflow.elasticsearch.async.dao.worker.queue.size";
int DEFAULT_ASYNC_WORKER_QUEUE_SIZE = 100;

String ELASTIC_SEARCH_ASYNC_DAO_MAX_POOL_SIZE = "workflow.elasticsearch.async.dao.max.pool.size";
int DEFAULT_ASYNC_MAX_POOL_SIZE = 12;

default String getURL() {
return getProperty(ELASTIC_SEARCH_URL_PROPERTY_NAME, ELASTIC_SEARCH_URL_DEFAULT_VALUE);
}
Expand Down Expand Up @@ -122,4 +125,8 @@ default int getArchiveSearchBatchSize() {
default int getAsyncWorkerQueueSize() {
return getIntProperty(ELASTIC_SEARCH_ASYNC_DAO_WORKER_QUEUE_SIZE, DEFAULT_ASYNC_WORKER_QUEUE_SIZE);
}

default int getAsyncMaxPoolSize() {
return getIntProperty(ELASTIC_SEARCH_ASYNC_DAO_MAX_POOL_SIZE, DEFAULT_ASYNC_MAX_POOL_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,11 @@ public void testAsyncWorkerQueueSize() {
int workerQueueSize = es.getAsyncWorkerQueueSize();
Assert.assertEquals(workerQueueSize, 100);
}

@Test
public void testAsyncMaxPoolSize() {
ElasticSearchConfiguration es = new SystemPropertiesElasticSearchConfiguration();
int poolSize = es.getAsyncMaxPoolSize();
Assert.assertEquals(poolSize, 12);
}
}
2 changes: 2 additions & 0 deletions es6-persistence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ Defaults to `conductor`
Defaults to `task_log`
* `workflow.elasticsearch.async.dao.worker.queue.size` - Worker Queue size used in executor service for async methods in IndexDao
Defaults to `100`
* `workflow.elasticsearch.async.dao.max.pool.size` - Maximum thread pool size in executor service for async methods in IndexDao
Defaults to `12`

### Embedded Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ public ElasticSearchDAOV6(Client elasticSearchClient, ElasticSearchConfiguration
this.messageIndexPrefix = this.indexPrefix + "_" + MSG_DOC_TYPE;
this.eventIndexPrefix = this.indexPrefix + "_" + EVENT_DOC_TYPE;
int workerQueueSize = config.getAsyncWorkerQueueSize();
int maximumPoolSize = config.getAsyncMaxPoolSize();

this.executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue<>(workerQueueSize));
this.executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, maximumPoolSize, KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn("Request {} to async dao discarded in executor {}", runnable, executor);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public ElasticSearchRestDAOV6(RestClientBuilder restClientBuilder, ElasticSearch

this.objectMapper = objectMapper;
this.elasticSearchAdminClient = restClientBuilder.build();
this.elasticSearchClient = new RestHighLevelClient(restClientBuilder);
this.elasticSearchClient = new RestHighLevelClient(restClientBuilder.build());
this.clusterHealthColor = config.getClusterHealthColor();

this.indexPrefix = config.getIndexName();
Expand All @@ -151,9 +151,13 @@ public ElasticSearchRestDAOV6(RestClientBuilder restClientBuilder, ElasticSearch
this.messageIndexPrefix = this.indexPrefix + "_" + MSG_DOC_TYPE;
this.eventIndexPrefix = this.indexPrefix + "_" + EVENT_DOC_TYPE;
int workerQueueSize = config.getAsyncWorkerQueueSize();
int maximumPoolSize = config.getAsyncMaxPoolSize();

// Set up a workerpool for performing async operations.
this.executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue<>(workerQueueSize));
this.executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, maximumPoolSize, KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn("Request {} to async dao discarded in executor {}", runnable, executor);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface ElasticSearchConfiguration extends Configuration {
String ELASTIC_SEARCH_ASYNC_DAO_WORKER_QUEUE_SIZE = "workflow.elasticsearch.async.dao.worker.queue.size";
int DEFAULT_ASYNC_WORKER_QUEUE_SIZE = 100;

String ELASTIC_SEARCH_ASYNC_DAO_MAX_POOL_SIZE = "workflow.elasticsearch.async.dao.max.pool.size";
int DEFAULT_ASYNC_MAX_POOL_SIZE = 12;

default String getURL() {
return getProperty(ELASTIC_SEARCH_URL_PROPERTY_NAME, ELASTIC_SEARCH_URL_DEFAULT_VALUE);
}
Expand Down Expand Up @@ -122,4 +125,8 @@ default int getArchiveSearchBatchSize() {
default int getAsyncWorkerQueueSize() {
return getIntProperty(ELASTIC_SEARCH_ASYNC_DAO_WORKER_QUEUE_SIZE, DEFAULT_ASYNC_WORKER_QUEUE_SIZE);
}

default int getAsyncMaxPoolSize() {
return getIntProperty(ELASTIC_SEARCH_ASYNC_DAO_MAX_POOL_SIZE, DEFAULT_ASYNC_MAX_POOL_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,11 @@ public void testAsyncWorkerQueueSize() {
int workerQueueSize = es.getAsyncWorkerQueueSize();
Assert.assertEquals(workerQueueSize, 100);
}

@Test
public void testAsyncMaxPoolSize() {
ElasticSearchConfiguration es = new SystemPropertiesElasticSearchConfiguration();
int poolSize = es.getAsyncMaxPoolSize();
Assert.assertEquals(poolSize, 12);
}
}

0 comments on commit 7d3e1df

Please sign in to comment.