diff --git a/docs/changelog/138228.yaml b/docs/changelog/138228.yaml new file mode 100644 index 0000000000000..0136025ddaec8 --- /dev/null +++ b/docs/changelog/138228.yaml @@ -0,0 +1,6 @@ +pr: 138228 +summary: "Fix: Downsample returns appropriate error when target index gets deleted\ + \ unexpectedly." +area: Downsampling +type: bug +issues: [] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 801995b06d8f8..2ebd153adaab5 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -130,20 +130,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc /** * This is the cluster state task executor for cluster state update actions. + * Visible for testing */ - private static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = - new SimpleBatchedExecutor<>() { - @Override - public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) - throws Exception { - return Tuple.tuple(task.execute(clusterState), null); - } + static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), null); + } - @Override - public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { - task.listener.onResponse(AcknowledgedResponse.TRUE); - } - }; + @Override + public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { + task.listener.onResponse(AcknowledgedResponse.TRUE); + } + }; @Inject public TransportDownsampleAction( @@ -1086,7 +1085,6 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. listener.onFailure(e); } @@ -1137,7 +1135,12 @@ public void onResponse(final BroadcastResponse response) { public ClusterState execute(ClusterState currentState) { logger.debug("Updating downsample index status for [{}]", downsampleIndexName); final Metadata metadata = currentState.metadata(); - final IndexMetadata downsampleIndex = metadata.index(metadata.index(downsampleIndexName).getIndex()); + final IndexMetadata downsampleIndex = metadata.index(downsampleIndexName); + if (downsampleIndex == null) { + throw new IllegalStateException( + "Failed to update downsample status because [" + downsampleIndexName + "] does not exist" + ); + } if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) { return currentState; } @@ -1159,7 +1162,6 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. actionListener.onFailure(e); } @@ -1233,8 +1235,8 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); - logger.debug("Downsampling measured successfully", e); + recordFailureMetrics(startTime); + logger.debug("Downsampling failure measured successfully", e); this.actionListener.onFailure(e); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index 8d1c6812ff07b..e587101a9111d 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -54,6 +55,7 @@ import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.mockito.Answers; @@ -169,11 +171,13 @@ public void setUp() throws Exception { replicaShards = randomIntBetween(0, 3); task = new Task(1, "type", "action", "description", null, null); + // Initialise mocks for thread pool and cluster service var threadContext = new ThreadContext(Settings.EMPTY); when(threadPool.getThreadContext()).thenReturn(threadContext); when(clusterService.localNode()).thenReturn(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "node_name")); when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + // Mock refresh & force merge requests Answer mockBroadcastResponse = invocation -> { @SuppressWarnings("unchecked") var listener = (ActionListener) invocation.getArgument(1, ActionListener.class); @@ -184,9 +188,9 @@ public void setUp() throws Exception { doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any()); doAnswer(invocation -> { var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE); + updateTask.listener.onResponse(AcknowledgedResponse.TRUE); return null; - }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); MappedFieldType timestampFieldMock = mock(MappedFieldType.class); when(timestampFieldMock.meta()).thenReturn(Map.of()); @@ -224,11 +228,6 @@ private void downsample(String mapping) throws IOException { .blocks(ClusterBlocks.builder().addIndexBlock(sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) .build(); - doAnswer(invocation -> { - var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(AcknowledgedResponse.TRUE); - return null; - }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); Function> mockPersistentTask = i -> invocation -> { ActionListener> listener = invocation.getArgument(i); PersistentTasksCustomMetadata.PersistentTask task = mock(PersistentTasksCustomMetadata.PersistentTask.class); @@ -248,6 +247,7 @@ private void downsample(String mapping) throws IOException { listener.onResponse(AcknowledgedResponse.TRUE); return null; }).when(indicesAdminClient).updateSettings(any(), any()); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -283,6 +283,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() { ) .blocks(ClusterBlocks.builder().addIndexBlock(sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) .build(); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -338,6 +339,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx ) .build() ); + assertSuccessfulUpdateDownsampleStatus(clusterService.state()); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -356,6 +358,78 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); } + public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException { + String mapping = switch (randomIntBetween(0, 2)) { + case 0 -> NO_METADATA_MAPPING; + case 1 -> OTHER_METADATA_MAPPING; + default -> FORCE_MERGE_ENABLED_MAPPING; + }; + mockGetMapping(mapping); + mockMergedMapping(mapping); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder().put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))) + .blocks(ClusterBlocks.builder().addIndexBlock(sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + Function> mockPersistentTask = i -> invocation -> { + ActionListener> listener = invocation.getArgument(i); + PersistentTasksCustomMetadata.PersistentTask task = mock(PersistentTasksCustomMetadata.PersistentTask.class); + when(task.getId()).thenReturn(randomAlphaOfLength(10)); + DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.COMPLETED, + null + ); + when(task.getState()).thenReturn(runningTaskState); + listener.onResponse(task); + return null; + }; + doAnswer(mockPersistentTask.apply(4)).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any()); + doAnswer(mockPersistentTask.apply(3)).when(persistentTaskService).waitForPersistentTaskCondition(anyString(), any(), any(), any()); + doAnswer(invocation -> { + var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeHandlingResults( + clusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask), + task1 -> {}, + TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + IllegalStateException error = safeAwaitFailure( + IllegalStateException.class, + AcknowledgedResponse.class, + listener -> action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.timeValueHours(1), + new DownsampleConfig(new DateHistogramInterval("5m")) + ), + clusterState, + listener + ) + ); + assertThat( + error.getMessage(), + Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist") + ); + verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED)); + verify(indicesAdminClient).refresh(any(), any()); + verify(indicesAdminClient, never()).flush(any(), any()); + verify(indicesAdminClient, never()).forceMerge(any(), any()); + } + private void mockGetMapping(String mapping) { doAnswer(invocation -> { @SuppressWarnings("unchecked") @@ -511,4 +585,21 @@ public void testGetSupportedMetrics() { assertThat(supported.defaultMetric(), is("max")); assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs()))); } + + private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) { + var metadata = Metadata.builder(clusterState.metadata()) + .put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards)) + .build(); + + var updatedClusterState = ClusterState.builder(clusterState).metadata(metadata).build(); + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + updatedClusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask) + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + } }