From c65528b9f68701e8106ea119ec354e67246b531a Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 30 Jan 2018 10:28:48 +0100 Subject: [PATCH] Watcher: Ensure state is cleaned properly in watcher life cycle service (elastic/x-pack-elasticsearch#3770) The WatcherLifeCycleService is responsible for deciding if watches need to be reloaded while running. In order to do this, the service stores the currently local shard allocation ids in a List. This data structure however was not properly updated all the time, when it should have been - for example when a master node is not available. This lead to unintended reloads, even though there was no change in the allocated shards. This in turn lead to unwanted executions and unwanted loading of triggered watches. This commit should also fix one of the more nasty ongoing test failures, where the test returns with an exception that only parts of watcher have been started. The AbstractWatcherIntegrationTestCase now properly waits until watcher is started before starting the test case itself. Original commit: elastic/x-pack-elasticsearch@097f12a900b879d3a9fab5aeaf57f5b4be10def0 --- .../watcher/WatcherLifeCycleService.java | 138 +++++++++++------- .../stats/TransportWatcherStatsAction.java | 16 +- .../watcher/WatcherLifeCycleServiceTests.java | 44 +++++- .../AbstractWatcherIntegrationTestCase.java | 34 ++--- .../test/integration/BootStrapTests.java | 4 +- .../transform/TransformIntegrationTests.java | 3 - 6 files changed, 154 insertions(+), 85 deletions(-) diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 3e512305f3034..f0fd4ba1575b9 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -21,11 +21,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.upgrade.UpgradeField; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.core.upgrade.UpgradeField; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; @@ -43,7 +43,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste private final WatcherService watcherService; private final ExecutorService executor; private AtomicReference> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); - private volatile WatcherMetaData watcherMetaData; private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. @@ -61,7 +60,6 @@ public void beforeStop() { shutDown(); } }); - watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("xpack.watcher.start_immediately", true)); } public synchronized void stop(String reason) { @@ -73,7 +71,7 @@ synchronized void shutDown() { stop("shutdown initiated"); } - private synchronized void start(ClusterState state, boolean manual) { + private synchronized void start(ClusterState state) { if (shutDown) { return; } @@ -86,7 +84,8 @@ private synchronized void start(ClusterState state, boolean manual) { // If we start from a cluster state update we need to check if previously we stopped manually // otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run - if (!manual && watcherMetaData != null && watcherMetaData.manuallyStopped()) { + WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE); + if (watcherMetaData != null && watcherMetaData.manuallyStopped()) { logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-started"); return; } @@ -99,8 +98,10 @@ private synchronized void start(ClusterState state, boolean manual) { } if (watcherService.validate(state)) { - logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual); + logger.trace("starting... (based on cluster state version [{}])", state.getVersion()); try { + // we need to populate the allocation ids before the next cluster state listener comes in + checkAndSetAllocationIds(state, false); watcherService.start(state); } catch (Exception e) { logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e); @@ -120,68 +121,30 @@ private synchronized void start(ClusterState state, boolean manual) { @Override public void clusterChanged(ClusterChangedEvent event) { if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) || shutDown) { + clearAllocationIds(); // wait until the gateway has recovered from disk, otherwise we think may not have .watches and // a .triggered_watches index, but they may not have been restored from the cluster state on disk return; } if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) { + clearAllocationIds(); executor.execute(() -> this.stop("no master node")); return; } if (event.state().getBlocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) { + clearAllocationIds(); executor.execute(() -> this.stop("write level cluster block")); return; } - // find out if watcher was stopped or started manually due to this cluster state change - WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE); - - if (watcherMetaData != null) { - this.watcherMetaData = watcherMetaData; - } - - boolean currentWatcherStopped = watcherMetaData != null && watcherMetaData.manuallyStopped() == true; - if (currentWatcherStopped) { + if (isWatcherStoppedManually(event.state())) { + clearAllocationIds(); executor.execute(() -> this.stop("watcher manually marked to shutdown by cluster state update")); } else { if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) { - DiscoveryNode localNode = event.state().nodes().getLocalNode(); - RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId()); - IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); - - // no watcher index, time to pause, as there are for sure no shards on this node - if (watcherIndexMetaData == null) { - if (previousAllocationIds.get().isEmpty() == false) { - previousAllocationIds.set(Collections.emptyList()); - executor.execute(() -> watcherService.pauseExecution("no watcher index found")); - } - return; - } - - String watchIndex = watcherIndexMetaData.getIndex().getName(); - List localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED); - - // no local shards, empty out watcher and not waste resources! - if (localShards.isEmpty()) { - if (previousAllocationIds.get().isEmpty() == false) { - executor.execute(() -> watcherService.pauseExecution("no local watcher shards")); - previousAllocationIds.set(Collections.emptyList()); - } - return; - } - - List currentAllocationIds = localShards.stream() - .map(ShardRouting::allocationId) - .map(AllocationId::getId) - .collect(Collectors.toList()); - Collections.sort(currentAllocationIds); - - if (previousAllocationIds.get().equals(currentAllocationIds) == false) { - previousAllocationIds.set(currentAllocationIds); - executor.execute(() -> watcherService.reload(event.state(), "different shards allocated on this node")); - } + checkAndSetAllocationIds(event.state(), true); } else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) { IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, @@ -191,7 +154,8 @@ public void clusterChanged(ClusterChangedEvent event) { boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null || UpgradeField.checkInternalIndexFormat(triggeredWatchesIndexMetaData); if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) { - executor.execute(() -> start(event.state(), false)); + checkAndSetAllocationIds(event.state(), false); + executor.execute(() -> start(event.state())); } else { logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]", isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex); @@ -200,7 +164,75 @@ public void clusterChanged(ClusterChangedEvent event) { } } - public WatcherMetaData watcherMetaData() { - return watcherMetaData; + /** + * check if watcher has been stopped manually via the stop API + */ + private boolean isWatcherStoppedManually(ClusterState state) { + WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE); + return watcherMetaData != null && watcherMetaData.manuallyStopped(); + } + + /** + * check and optionally set the current allocation ids + * + * @param state the current cluster state + * @param callWatcherService should the watcher service be called for starting/stopping/reloading or should this be treated as a + * dryrun so that the caller is responsible for this + */ + private void checkAndSetAllocationIds(ClusterState state, boolean callWatcherService) { + IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData()); + if (watcherIndexMetaData == null) { + if (clearAllocationIds() && callWatcherService) { + executor.execute(() -> watcherService.pauseExecution("no watcher index found")); + } + return; + } + + DiscoveryNode localNode = state.nodes().getLocalNode(); + RoutingNode routingNode = state.getRoutingNodes().node(localNode.getId()); + // this can happen if the node does not hold any data + if (routingNode == null) { + if (clearAllocationIds() && callWatcherService) { + executor.execute(() -> watcherService.pauseExecution("no routing node for local node found, network issue?")); + } + return; + } + + String watchIndex = watcherIndexMetaData.getIndex().getName(); + List localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED); + // no local shards, empty out watcher and dont waste resources! + if (localShards.isEmpty()) { + if (clearAllocationIds() && callWatcherService) { + executor.execute(() -> watcherService.pauseExecution("no local watcher shards found")); + } + return; + } + + List currentAllocationIds = localShards.stream() + .map(ShardRouting::allocationId) + .map(AllocationId::getId) + .collect(Collectors.toList()); + Collections.sort(currentAllocationIds); + + if (previousAllocationIds.get().equals(currentAllocationIds) == false) { + previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds)); + if (callWatcherService) { + executor.execute(() -> watcherService.reload(state, "new local watcher shard allocation ids")); + } + } + } + + /** + * clear out current allocation ids if not already happened + * @return true, if existing allocation ids were cleaned out, false otherwise + */ + private boolean clearAllocationIds() { + List previousIds = previousAllocationIds.getAndSet(Collections.emptyList()); + return previousIds.equals(Collections.emptyList()) == false; + } + + // for testing purposes only + List allocationIds() { + return previousAllocationIds.get(); } } diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java index 5c966291caec4..6d8aa29e6467e 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java @@ -14,10 +14,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; -import org.elasticsearch.xpack.watcher.WatcherLifeCycleService; import org.elasticsearch.xpack.watcher.WatcherService; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.trigger.TriggerService; @@ -33,27 +33,24 @@ public class TransportWatcherStatsAction extends TransportNodesAction nodes, List failures) { - return new WatcherStatsResponse(clusterService.getClusterName(), lifeCycleService.watcherMetaData(), nodes, failures); + return new WatcherStatsResponse(clusterService.getClusterName(), getWatcherMetaData(), nodes, failures); } @Override @@ -83,4 +80,11 @@ protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node reque return statsResponse; } + private WatcherMetaData getWatcherMetaData() { + WatcherMetaData watcherMetaData = clusterService.state().getMetaData().custom(WatcherMetaData.TYPE); + if (watcherMetaData == null) { + watcherMetaData = new WatcherMetaData(false); + } + return watcherMetaData; + } } \ No newline at end of file diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index a9c597ec714c5..639c05a25a6c1 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -20,6 +20,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -50,6 +52,7 @@ import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME; import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME; import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; @@ -278,15 +281,15 @@ public void testNoLocalShards() throws Exception { // set current allocation ids lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithLocalShards, clusterStateWithoutLocalShards)); - verify(watcherService, times(0)).pauseExecution(eq("no local watcher shards")); + verify(watcherService, times(0)).pauseExecution(eq("no local watcher shards found")); // no more local hards, lets pause execution lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithLocalShards)); - verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards")); + verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found")); // no further invocations should happen if the cluster state does not change in regard to local shards lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithoutLocalShards)); - verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards")); + verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found")); } public void testReplicaWasAddedOrRemoved() throws Exception { @@ -539,6 +542,41 @@ public void testWatcherStopsOnClusterLevelBlock() { verify(watcherService, times(1)).stop(eq("write level cluster block")); } + public void testStateIsSetImmediately() throws Exception { + Index index = new Index(Watch.INDEX, "foo"); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addShard( + TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED)); + IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required + .numberOfShards(1).numberOfReplicas(0); + ClusterState state = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") + .add(newNode("node_1"))) + .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()) + .metaData(MetaData.builder() + .put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns())) + .put(indexMetaDataBuilder) + .build()) + .build(); + when(watcherService.validate(state)).thenReturn(true); + when(watcherService.state()).thenReturn(WatcherState.STOPPED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state)); + verify(watcherService, times(1)).start(eq(state)); + assertThat(lifeCycleService.allocationIds(), hasSize(1)); + + // now do any cluster state upgrade, see that reload gets triggers, but should not + when(watcherService.state()).thenReturn(WatcherState.STARTED); + lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state)); + verify(watcherService, never()).pauseExecution(anyString()); + + verify(watcherService, never()).reload(eq(state), anyString()); + assertThat(lifeCycleService.allocationIds(), hasSize(1)); + } + private List randomIndexPatterns() { return IntStream.range(0, between(1, 10)) .mapToObj(n -> randomAlphaOfLengthBetween(1, 100)) diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index d7e99c0c00d03..1f37a8f8e8bec 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -185,20 +185,10 @@ public void _setup() throws Exception { timeWarp = new TimeWarp(internalCluster().getInstances(ScheduleTriggerEngineMock.class), (ClockMock)getInstanceFromMaster(Clock.class)); } - startWatcherIfNodesExist(); - createWatcherIndicesOrAliases(); - } - @After - public void _cleanup() throws Exception { - // Clear all internal watcher state for the next test method: - logger.info("[#{}]: clearing watcher state", getTestName()); - stopWatcher(); - } - - private void startWatcherIfNodesExist() throws Exception { if (internalCluster().size() > 0) { ensureLicenseEnabled(); + if (timeWarped()) { // now that the license is enabled and valid we can freeze all nodes clocks logger.info("[{}#{}]: freezing time on nodes", getTestClass().getSimpleName(), getTestName()); @@ -206,10 +196,19 @@ private void startWatcherIfNodesExist() throws Exception { internalCluster().setDisruptionScheme(ice); ice.startDisrupting(); } - assertAcked(watcherClient().prepareWatchService().start().get()); + + createWatcherIndicesOrAliases(); + startWatcher(); } } + @After + public void _cleanup() throws Exception { + // Clear all internal watcher state for the next test method: + logger.info("[#{}]: clearing watcher state", getTestName()); + stopWatcher(); + } + /** * In order to test, that .watches and .triggered-watches indices can also point to an alias, we will rarely create those * after starting watcher @@ -237,6 +236,7 @@ private void createWatcherIndicesOrAliases() throws Exception { assertAcked(response); ensureGreen(newIndex); } + logger.info("set alias for .watches index to [{}]", newIndex); } else { Settings.Builder builder = Settings.builder(); if (randomBoolean()) { @@ -270,6 +270,7 @@ private void createWatcherIndicesOrAliases() throws Exception { assertAcked(response); ensureGreen(newIndex); } + logger.info("set alias for .triggered-watches index to [{}]", newIndex); } else { assertAcked(client().admin().indices().prepareCreate(TriggeredWatchStoreField.INDEX_NAME)); ensureGreen(TriggeredWatchStoreField.INDEX_NAME); @@ -277,9 +278,8 @@ private void createWatcherIndicesOrAliases() throws Exception { String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(DateTime.now(DateTimeZone.UTC)); assertAcked(client().admin().indices().prepareCreate(historyIndex)); + logger.info("creating watch history index [{}]", historyIndex); ensureGreen(historyIndex); - - ensureWatcherStarted(); } } @@ -364,7 +364,7 @@ protected void assertWatchWithMinimumPerformedActionsCount(final String watchNam ExecutionState.EXECUTED.id()))) .get(); lastResponse.set(searchResponse); - assertThat("could not find executed watch record", searchResponse.getHits().getTotalHits(), + assertThat("could not find executed watch record for watch " + watchName, searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedWatchActionsWithActionPerformed)); if (assertConditionMet) { assertThat((Integer) XContentMapValues.extractValue("result.input.payload.hits.total", @@ -508,12 +508,12 @@ protected void ensureWatcherStopped() throws Exception { } protected void startWatcher() throws Exception { - watcherClient().prepareWatchService().start().get(); + assertAcked(watcherClient().prepareWatchService().start().get()); ensureWatcherStarted(); } protected void stopWatcher() throws Exception { - watcherClient().prepareWatchService().stop().get(); + assertAcked(watcherClient().prepareWatchService().stop().get()); ensureWatcherStopped(); } diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index f39e7028ed03c..2615c40ad49ba 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition; import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; @@ -56,8 +55,6 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.joda.time.DateTimeZone.UTC; -@TestLogging("org.elasticsearch.xpack.watcher:DEBUG," + - "org.elasticsearch.xpack.watcher.execution:TRACE") public class BootStrapTests extends AbstractWatcherIntegrationTestCase { @Override @@ -192,6 +189,7 @@ public void testMixedTriggeredWatchLoading() throws Exception { WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index"); + ensureGreen("output", "my-index"); int numWatches = 8; for (int i = 0; i < numWatches; i++) { String watchId = "_id" + i; diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/TransformIntegrationTests.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/TransformIntegrationTests.java index ef5edc7a8337f..1f7335aef0df8 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/TransformIntegrationTests.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/TransformIntegrationTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; @@ -51,7 +50,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; -@TestLogging("org.elasticsearch.xpack.watcher:DEBUG,org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE") public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCase { @Override @@ -247,5 +245,4 @@ public void testChainTransform() throws Exception { assertThat(response.getHits().getAt(0).getSourceAsMap().size(), equalTo(1)); assertThat(response.getHits().getAt(0).getSourceAsMap().get("key4").toString(), equalTo("30")); } - }