diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index d85431edd5..82dba4f100 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -279,4 +279,8 @@ public static void recordWorkerQueueSize(int val) { public static void recordDiscardedIndexingCount() { getCounter(Monitors.classQualifier, "discarded_index_count").increment(); } + + public static void recordDiscardedLogIndexingCount() { + getCounter(Monitors.classQualifier, "discarded_log_index_count").increment(); + } } diff --git a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java index 55a651fe34..364f51fc31 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java @@ -117,6 +117,7 @@ public class ElasticSearchDAOV5 implements IndexDAO { private final ObjectMapper objectMapper; private final Client elasticSearchClient; private final ExecutorService executorService; + private final ExecutorService logExecutorService; private final int archiveSearchBatchSize; private ConcurrentHashMap bulkRequests; private final int indexBatchSize; @@ -149,6 +150,19 @@ public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration logger.warn("Request {} to async dao discarded in executor {}", runnable, executor); Monitors.recordDiscardedIndexingCount(); }); + + corePoolSize = 1; + maximumPoolSize = 2; + keepAliveTime = 30L; + this.logExecutorService = new ThreadPoolExecutor(corePoolSize, + maximumPoolSize, + keepAliveTime, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(workerQueueSize), + (runnable, executor) -> { + logger.warn("Request {} to async log dao discarded in executor {}", runnable, executor); + Monitors.recordDiscardedLogIndexingCount(); + }); } @Override @@ -396,7 +410,7 @@ public void addTaskExecutionLogs(List taskExecLogs) { @Override public CompletableFuture asyncAddTaskExecutionLogs(List logs) { - return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs), executorService); + return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs), logExecutorService); } @Override @@ -484,7 +498,7 @@ public void addEventExecution(EventExecution eventExecution) { @Override public CompletableFuture asyncAddEventExecution(EventExecution eventExecution) { - return CompletableFuture.runAsync(() -> addEventExecution(eventExecution), executorService); + return CompletableFuture.runAsync(() -> addEventExecution(eventExecution), logExecutorService); } private void indexObject(UpdateRequest req, String docType) { @@ -500,18 +514,21 @@ private void indexObject(UpdateRequest req, String docType) { private synchronized void updateWithRetry(BulkRequestBuilder request, String docType) { try { + long startTime = Instant.now().toEpochMilli(); new RetryUtil().retryOnException( () -> request.execute().actionGet(), null, BulkResponse::hasFailures, RETRY_COUNT, - "Indexing all "+ docType + " task", - docType + "Bulk Indexing "+ docType, + "indexObject" ); + long endTime = Instant.now().toEpochMilli(); + logger.debug("Time taken {} for indexing object of type: {}", endTime - startTime, docType); + Monitors.recordESIndexTime("index_object", docType, endTime - startTime); } catch (Exception e) { Monitors.error(className, "index"); - logger.error("Failed to index {} for request docType: {}", request, docType, - e); + logger.error("Failed to index object of type: {}", docType, e); } }