diff --git a/docs/changelog/138228.yaml b/docs/changelog/138228.yaml new file mode 100644 index 0000000000000..0136025ddaec8 --- /dev/null +++ b/docs/changelog/138228.yaml @@ -0,0 +1,6 @@ +pr: 138228 +summary: "Fix: Downsample returns appropriate error when target index gets deleted\ + \ unexpectedly." +area: Downsampling +type: bug +issues: [] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 4039a5bf526f3..bcc67585cacf0 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc /** * This is the cluster state task executor for cluster state update actions. + * Visible for testing */ - private static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = - new SimpleBatchedExecutor<>() { - @Override - public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) - throws Exception { - return Tuple.tuple(task.execute(clusterState), null); - } + static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), null); + } - @Override - public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { - task.listener.onResponse(AcknowledgedResponse.TRUE); - } - }; + @Override + public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { + task.listener.onResponse(AcknowledgedResponse.TRUE); + } + }; @Inject public TransportDownsampleAction( @@ -1110,7 +1109,6 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. listener.onFailure(e); } @@ -1165,6 +1163,11 @@ public ClusterState execute(ClusterState currentState) { logger.debug("Updating downsample index status for [{}]", downsampleIndexName); final ProjectMetadata project = currentState.metadata().getProject(projectId); final IndexMetadata downsampleIndex = project.index(downsampleIndexName); + if (downsampleIndex == null) { + throw new IllegalStateException( + "Failed to update downsample status because [" + downsampleIndexName + "] does not exist" + ); + } if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) { return currentState; } @@ -1186,7 +1189,6 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. actionListener.onFailure(e); } @@ -1260,8 +1262,8 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); - logger.debug("Downsampling measured successfully", e); + recordFailureMetrics(startTime); + logger.debug("Downsampling failure measured successfully", e); this.actionListener.onFailure(e); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index a968198098b39..a644f6a7b353b 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -56,6 +57,7 @@ import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.mockito.Answers; @@ -188,11 +190,12 @@ public void setUp() throws Exception { }; doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any()); doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any()); + doAnswer(invocation -> { var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE); + updateTask.listener.onResponse(AcknowledgedResponse.TRUE); return null; - }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); MappedFieldType timestampFieldMock = mock(MappedFieldType.class); when(timestampFieldMock.meta()).thenReturn(Map.of()); @@ -236,11 +239,6 @@ private void downsample(String mapping) throws IOException { when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); - doAnswer(invocation -> { - var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(AcknowledgedResponse.TRUE); - return null; - }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); Answer mockPersistentTask = invocation -> { ActionListener> listener = invocation.getArgument(4); PersistentTasksCustomMetadata.PersistentTask task = mock(PersistentTasksCustomMetadata.PersistentTask.class); @@ -260,6 +258,7 @@ private void downsample(String mapping) throws IOException { listener.onResponse(AcknowledgedResponse.TRUE); return null; }).when(indicesAdminClient).updateSettings(any(), any()); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -298,6 +297,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() { .build(); when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -359,6 +359,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx ) .build() ); + assertSuccessfulUpdateDownsampleStatus(clusterService.state()); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -377,6 +378,83 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); } + public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException { + String mapping = switch (randomIntBetween(0, 2)) { + case 0 -> NO_METADATA_MAPPING; + case 1 -> OTHER_METADATA_MAPPING; + default -> FORCE_MERGE_ENABLED_MAPPING; + }; + mockGetMapping(mapping); + mockMergedMapping(mapping); + var projectMetadata = ProjectMetadata.builder(projectId) + .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) + .build(); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(projectMetadata) + .blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + + Answer mockPersistentTask = invocation -> { + ActionListener> listener = invocation.getArgument(4); + PersistentTasksCustomMetadata.PersistentTask task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class); + when(task1.getId()).thenReturn(randomAlphaOfLength(10)); + DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.COMPLETED, + null + ); + when(task1.getState()).thenReturn(runningTaskState); + listener.onResponse(task1); + return null; + }; + doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any()); + doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any()); + doAnswer(invocation -> { + var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeHandlingResults( + clusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask), + task1 -> {}, + TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + IllegalStateException error = safeAwaitFailure( + IllegalStateException.class, + AcknowledgedResponse.class, + listener -> action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.ONE_HOUR, + new DownsampleConfig(new DateHistogramInterval("5m")) + ), + clusterState, + listener + ) + ); + assertThat( + error.getMessage(), + Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist") + ); + verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED)); + verify(indicesAdminClient).refresh(any(), any()); + verify(indicesAdminClient, never()).flush(any(), any()); + verify(indicesAdminClient, never()).forceMerge(any(), any()); + } + private void mockGetMapping(String mapping) { doAnswer(invocation -> { @SuppressWarnings("unchecked") @@ -532,4 +610,21 @@ public void testGetSupportedMetrics() { assertThat(supported.defaultMetric(), is("max")); assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs()))); } + + private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) { + var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId)) + .put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards)) + .build(); + + var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build(); + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + updatedClusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask) + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + } }