From 0d6cbf92e5903ad97d1d5c823bb8ba6545581ef1 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 31 May 2019 15:01:12 +0100 Subject: [PATCH] [ML Data Frame] Refactor stop logic (#42644) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Revert "invalid test" This reverts commit 9dd8b52c13c716918ff97e6527aaf43aefc4695d. * Testing * mend * Revert "[ML Data Frame] Mute Data Frame tests" This reverts commit 5d837fa312b0e41a77a65462667a2d92d1114567. * Call onStop and onAbort outside atomic update * Don’t update CS * Tidying up * Remove invalid test that asserted logic that has been removed * Add stopped event * Revert "Add stopped event" This reverts commit 02ba992f4818bebd838e1c7678bd2e1cc090bfab. * Adding check for STOPPED in saveState --- .../core/indexing/AsyncTwoPhaseIndexer.java | 29 ++++---- .../indexing/AsyncTwoPhaseIndexerTests.java | 19 ------ .../integration/DataFrameTransformIT.java | 1 - .../integration/DataFrameAuditorIT.java | 2 - .../DataFrameConfigurationIndexIT.java | 2 - .../DataFrameGetAndGetStatsIT.java | 2 - .../integration/DataFrameMetaDataIT.java | 2 - .../integration/DataFramePivotRestIT.java | 2 - .../DataFrameTaskFailedStateIT.java | 2 - .../integration/DataFrameUsageIT.java | 2 - .../transforms/DataFrameTransformTask.java | 66 ++++++++----------- .../test/data_frame/transforms_start_stop.yml | 45 +++++++++++++ 12 files changed, 87 insertions(+), 87 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 0c4477b6b700e..f9bbf890fe6ce 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -90,28 +90,21 @@ public synchronized IndexerState start() { * Sets the internal state to {@link IndexerState#STOPPING} if an async job is * running in the background, {@link #onStop()} will be called when the background job * detects that the indexer is stopped. - * If there is no job running when this function is called - * the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly. + * If there is no job running when this function is called the returned + * state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called. * * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ public synchronized IndexerState stop() { - AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false); - IndexerState currentState = state.updateAndGet(previousState -> { + return state.updateAndGet(previousState -> { if (previousState == IndexerState.INDEXING) { return IndexerState.STOPPING; } else if (previousState == IndexerState.STARTED) { - wasStartedAndSetStopped.set(true); return IndexerState.STOPPED; } else { return previousState; } }); - - if (wasStartedAndSetStopped.get()) { - onStop(); - } - return currentState; } /** @@ -288,20 +281,22 @@ private void finishWithIndexingFailure(Exception exc) { } private IndexerState finishAndSetState() { - return state.updateAndGet(prev -> { + AtomicBoolean callOnStop = new AtomicBoolean(false); + AtomicBoolean callOnAbort = new AtomicBoolean(false); + IndexerState updatedState = state.updateAndGet(prev -> { switch (prev) { case INDEXING: // ready for another job return IndexerState.STARTED; case STOPPING: + callOnStop.set(true); // must be started again - onStop(); return IndexerState.STOPPED; case ABORTING: + callOnAbort.set(true); // abort and exit - onAbort(); return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first case STOPPED: @@ -316,6 +311,14 @@ private IndexerState finishAndSetState() { throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]"); } }); + + if (callOnStop.get()) { + onStop(); + } else if (callOnAbort.get()) { + onAbort(); + } + + return updatedState; } private void onSearchResponse(SearchResponse searchResponse) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index fc86a9554880f..053e41d9b2a63 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -268,25 +268,6 @@ public void testStateMachineBrokenSearch() throws InterruptedException { } } - public void testStop_AfterIndexerIsFinished() throws InterruptedException { - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { - CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); - indexer.start(); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - countDownLatch.countDown(); - assertTrue(awaitBusy(() -> isFinished.get())); - - indexer.stop(); - assertTrue(isStopped.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); - } finally { - executor.shutdownNow(); - } - } - public void testStop_WhileIndexing() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java index 1ec425c641693..69fb980871dce 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -30,7 +30,6 @@ public void cleanTransforms() throws IOException { cleanUp(); } - @AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public void testDataFrameTransformCrud() throws Exception { createReviewsIndex(); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java index 7dc79c1ae8fbe..9884c9bb6793b 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.junit.Before; @@ -23,7 +22,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameAuditorIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_admin_plus_data"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java index d7e12cf2bee4d..681599331c8af 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java @@ -8,7 +8,6 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -23,7 +22,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase { /** diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 9bac6ca0b4049..d9927cd09ed8f 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.xpack.core.dataframe.DataFrameField; @@ -22,7 +21,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_user"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java index 5b95d1daead53..26a957ea055c2 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -16,7 +15,6 @@ import java.io.IOException; import java.util.Map; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameMetaDataIT extends DataFrameRestTestCase { private boolean indicesCreated = false; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 36f95e599ff73..933fcc6c8e5c4 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -22,7 +21,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFramePivotRestIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_admin_plus_data"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 7b63644dd34ad..96aeeda8755f4 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; @@ -20,7 +19,6 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { public void testDummy() { diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java index f98fa6a271365..4f209c5a9f3f4 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -23,7 +22,6 @@ import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE; import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameUsageIT extends DataFrameRestTestCase { private boolean indicesCreated = false; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 13deab6748c94..575cd4c15bd67 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final Map initialPosition; private final IndexerState initialIndexerState; - private final SetOnce indexer = new SetOnce<>(); + private final SetOnce indexer = new SetOnce<>(); private final AtomicReference taskState; private final AtomicReference stateReason; @@ -125,7 +125,7 @@ public Status getStatus() { return getState(); } - private DataFrameIndexer getIndexer() { + private ClientDataFrameIndexer getIndexer() { return indexer.get(); } @@ -236,7 +236,10 @@ public synchronized void stop() { return; } - getIndexer().stop(); + IndexerState state = getIndexer().stop(); + if (state == IndexerState.STOPPED) { + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); + } } @Override @@ -530,11 +533,17 @@ protected void doSaveState(IndexerState indexerState, Map positi next.run(); return; } + // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` + // OR we called `doSaveState` manually as the indexer was not actively running. + // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state + if (indexerState.equals(IndexerState.STOPPED)) { + transformTask.setTaskStateStopped(); + } final DataFrameTransformState state = new DataFrameTransformState( transformTask.taskState.get(), indexerState, - getPosition(), + position, transformTask.currentCheckpoint.get(), transformTask.stateReason.get(), getProgress()); @@ -542,28 +551,18 @@ protected void doSaveState(IndexerState indexerState, Map positi // Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and // only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity - ActionListener> updateClusterStateListener = ActionListener.wrap( - task -> { - transformsConfigManager.putOrUpdateTransformStats( - new DataFrameTransformStateAndStats(transformId, state, getStats(), - DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null - ActionListener.wrap( - r -> { - next.run(); - }, - statsExc -> { - logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); - next.run(); - } - )); - }, - exc -> { - logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc); - next.run(); - } - ); - - transformTask.persistStateToClusterState(state, updateClusterStateListener); + transformsConfigManager.putOrUpdateTransformStats( + new DataFrameTransformStateAndStats(transformId, state, getStats(), + DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null + ActionListener.wrap( + r -> { + next.run(); + }, + statsExc -> { + logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); + next.run(); + } + )); } @Override @@ -602,20 +601,7 @@ protected void onFinish(ActionListener listener) { protected void onStop() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); - - transformTask.setTaskStateStopped(); - transformsConfigManager.putOrUpdateTransformStats( - new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(), - DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null - ActionListener.wrap( - r -> { - transformTask.shutdown(); - }, - statsExc -> { - transformTask.shutdown(); - logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc); - } - )); + transformTask.shutdown(); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 31f80033e7bdb..2686c57fd06ac 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -190,8 +190,10 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-stop" + wait_for_completion: true - match: { acknowledged: true } + - do: data_frame.get_data_frame_transform_stats: transform_id: "airline-transform-start-later" @@ -209,3 +211,46 @@ teardown: - do: data_frame.delete_data_frame_transform: transform_id: "airline-transform-start-later" + +--- +"Test stop all": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform-stop-all" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-start-later" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-stop-all" + - match: { acknowledged: true } + + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { acknowledged: true } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "_all" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "*" + - match: { count: 2 } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + - match: { transforms.1.state.indexer_state: "stopped" } + - match: { transforms.1.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "airline-transform-stop-all"