Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
use a separate threadpool/executor service for logs/events
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Oct 1, 2019
1 parent 188d8c0 commit c9ccecf
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
Expand Up @@ -279,4 +279,8 @@ public static void recordWorkerQueueSize(int val) {
public static void recordDiscardedIndexingCount() {
getCounter(Monitors.classQualifier, "discarded_index_count").increment();
}

public static void recordDiscardedLogIndexingCount() {
getCounter(Monitors.classQualifier, "discarded_log_index_count").increment();
}
}
Expand Up @@ -117,6 +117,7 @@ public class ElasticSearchDAOV5 implements IndexDAO {
private final ObjectMapper objectMapper;
private final Client elasticSearchClient;
private final ExecutorService executorService;
private final ExecutorService logExecutorService;
private final int archiveSearchBatchSize;
private ConcurrentHashMap<String, BulkRequestBuilder> bulkRequests;
private final int indexBatchSize;
Expand Down Expand Up @@ -149,6 +150,19 @@ public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration
logger.warn("Request {} to async dao discarded in executor {}", runnable, executor);
Monitors.recordDiscardedIndexingCount();
});

corePoolSize = 1;
maximumPoolSize = 2;
keepAliveTime = 30L;
this.logExecutorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn("Request {} to async log dao discarded in executor {}", runnable, executor);
Monitors.recordDiscardedLogIndexingCount();
});
}

@Override
Expand Down Expand Up @@ -396,7 +410,7 @@ public void addTaskExecutionLogs(List<TaskExecLog> taskExecLogs) {

@Override
public CompletableFuture<Void> asyncAddTaskExecutionLogs(List<TaskExecLog> logs) {
return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs), executorService);
return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs), logExecutorService);
}

@Override
Expand Down Expand Up @@ -484,7 +498,7 @@ public void addEventExecution(EventExecution eventExecution) {

@Override
public CompletableFuture<Void> asyncAddEventExecution(EventExecution eventExecution) {
return CompletableFuture.runAsync(() -> addEventExecution(eventExecution), executorService);
return CompletableFuture.runAsync(() -> addEventExecution(eventExecution), logExecutorService);
}

private void indexObject(UpdateRequest req, String docType) {
Expand All @@ -500,18 +514,21 @@ private void indexObject(UpdateRequest req, String docType) {

private synchronized void updateWithRetry(BulkRequestBuilder request, String docType) {
try {
long startTime = Instant.now().toEpochMilli();
new RetryUtil<BulkResponse>().retryOnException(
() -> request.execute().actionGet(),
null,
BulkResponse::hasFailures,
RETRY_COUNT,
"Indexing all "+ docType + " task",
docType
"Bulk Indexing "+ docType,
"indexObject"
);
long endTime = Instant.now().toEpochMilli();
logger.debug("Time taken {} for indexing object of type: {}", endTime - startTime, docType);
Monitors.recordESIndexTime("index_object", docType, endTime - startTime);
} catch (Exception e) {
Monitors.error(className, "index");
logger.error("Failed to index {} for request docType: {}", request, docType,
e);
logger.error("Failed to index object of type: {}", docType, e);
}
}

Expand Down

0 comments on commit c9ccecf

Please sign in to comment.