From 4cf1d1ad059eeffbe3d7bf9d9782fbaeec0e3880 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 24 Mar 2020 12:09:25 +0200 Subject: [PATCH] [ML] No refresh on indexing DFA stats (#53977) When we index data frame analytics stats docs we do not need to refresh immediately. --- .../process/AnalyticsProcessManager.java | 16 ++++++++++------ .../xpack/ml/dataframe/stats/StatsPersister.java | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index c1f09ff3fb34f..cc3852f32f051 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -178,7 +179,7 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont processContext.setFailureReason(resultProcessor.getFailure()); refreshDest(config); - refreshStateIndex(config.getId()); + refreshIndices(config.getId()); LOGGER.info("[{}] Result processor has completed", config.getId()); } catch (Exception e) { if (task.isStopping()) { @@ -316,12 +317,15 @@ private void refreshDest(DataFrameAnalyticsConfig config) { () -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet()); } - private void refreshStateIndex(String jobId) { - String indexName = AnomalyDetectorsIndex.jobStateIndexPattern(); - LOGGER.debug("[{}] Refresh index {}", jobId, indexName); - - RefreshRequest refreshRequest = new RefreshRequest(indexName); + private void refreshIndices(String jobId) { + RefreshRequest refreshRequest = new RefreshRequest( + AnomalyDetectorsIndex.jobStateIndexPattern(), + MlStatsIndex.indexPattern() + ); refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + + LOGGER.debug("[{}] Refreshing indices {}", jobId, Arrays.toString(refreshRequest.indices())); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { client.admin().indices().refresh(refreshRequest).actionGet(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java index eeb8924928ce4..e553ac1810729 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java @@ -47,7 +47,7 @@ public void persistWithRetry(ToXContentObject result, Function d MlStatsIndex.writeAlias(), result, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), - WriteRequest.RefreshPolicy.IMMEDIATE, + WriteRequest.RefreshPolicy.NONE, docIdSupplier.apply(jobId), () -> isCancelled == false, errorMsg -> auditor.error(jobId,