From 81508e8f99fef21d74b221024101e3ecab8dfab3 Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Wed, 19 Nov 2025 12:53:52 +0200 Subject: [PATCH 1/2] Fix: Downsample returns appropriate error when target index gets deleted unexpectedly. (#138228) (cherry picked from commit 14f5892ad658df85029d2853e5fd033476101d3f) # Conflicts: # x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java --- docs/changelog/138228.yaml | 6 + .../downsample/TransportDownsampleAction.java | 34 +++--- .../TransportDownsampleActionTests.java | 111 ++++++++++++++++-- 3 files changed, 128 insertions(+), 23 deletions(-) create mode 100644 docs/changelog/138228.yaml diff --git a/docs/changelog/138228.yaml b/docs/changelog/138228.yaml new file mode 100644 index 0000000000000..0136025ddaec8 --- /dev/null +++ b/docs/changelog/138228.yaml @@ -0,0 +1,6 @@ +pr: 138228 +summary: "Fix: Downsample returns appropriate error when target index gets deleted\ + \ unexpectedly." +area: Downsampling +type: bug +issues: [] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 4039a5bf526f3..bcc67585cacf0 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc /** * This is the cluster state task executor for cluster state update actions. + * Visible for testing */ - private static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = - new SimpleBatchedExecutor<>() { - @Override - public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) - throws Exception { - return Tuple.tuple(task.execute(clusterState), null); - } + static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), null); + } - @Override - public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { - task.listener.onResponse(AcknowledgedResponse.TRUE); - } - }; + @Override + public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { + task.listener.onResponse(AcknowledgedResponse.TRUE); + } + }; @Inject public TransportDownsampleAction( @@ -1110,7 +1109,6 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. listener.onFailure(e); } @@ -1165,6 +1163,11 @@ public ClusterState execute(ClusterState currentState) { logger.debug("Updating downsample index status for [{}]", downsampleIndexName); final ProjectMetadata project = currentState.metadata().getProject(projectId); final IndexMetadata downsampleIndex = project.index(downsampleIndexName); + if (downsampleIndex == null) { + throw new IllegalStateException( + "Failed to update downsample status because [" + downsampleIndexName + "] does not exist" + ); + } if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) { return currentState; } @@ -1186,7 +1189,6 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. actionListener.onFailure(e); } @@ -1260,8 +1262,8 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); - logger.debug("Downsampling measured successfully", e); + recordFailureMetrics(startTime); + logger.debug("Downsampling failure measured successfully", e); this.actionListener.onFailure(e); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index a968198098b39..9f6d85e13f93c 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -56,6 +57,7 @@ import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.mockito.Answers; @@ -188,11 +190,14 @@ public void setUp() throws Exception { }; doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any()); doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any()); + doAnswer(invocation -> { var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE); + updateTask.listener.onResponse(AcknowledgedResponse.TRUE); return null; - }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); + + // Mocks for mapping retrieval & merging when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); MappedFieldType timestampFieldMock = mock(MappedFieldType.class); when(timestampFieldMock.meta()).thenReturn(Map.of()); @@ -236,11 +241,6 @@ private void downsample(String mapping) throws IOException { when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); - doAnswer(invocation -> { - var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(AcknowledgedResponse.TRUE); - return null; - }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); Answer mockPersistentTask = invocation -> { ActionListener> listener = invocation.getArgument(4); PersistentTasksCustomMetadata.PersistentTask task = mock(PersistentTasksCustomMetadata.PersistentTask.class); @@ -260,6 +260,7 @@ private void downsample(String mapping) throws IOException { listener.onResponse(AcknowledgedResponse.TRUE); return null; }).when(indicesAdminClient).updateSettings(any(), any()); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -298,6 +299,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() { .build(); when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -359,6 +361,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx ) .build() ); + assertSuccessfulUpdateDownsampleStatus(clusterService.state()); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -377,6 +380,83 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); } + public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException { + String mapping = switch (randomIntBetween(0, 2)) { + case 0 -> NO_METADATA_MAPPING; + case 1 -> OTHER_METADATA_MAPPING; + default -> FORCE_MERGE_ENABLED_MAPPING; + }; + mockGetMapping(mapping); + mockMergedMapping(mapping); + var projectMetadata = ProjectMetadata.builder(projectId) + .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) + .build(); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(projectMetadata) + .blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + + Answer mockPersistentTask = invocation -> { + ActionListener> listener = invocation.getArgument(4); + PersistentTasksCustomMetadata.PersistentTask task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class); + when(task1.getId()).thenReturn(randomAlphaOfLength(10)); + DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.COMPLETED, + null + ); + when(task1.getState()).thenReturn(runningTaskState); + listener.onResponse(task1); + return null; + }; + doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any()); + doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any()); + doAnswer(invocation -> { + var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeHandlingResults( + clusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask), + task1 -> {}, + TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + IllegalStateException error = safeAwaitFailure( + IllegalStateException.class, + AcknowledgedResponse.class, + listener -> action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.ONE_HOUR, + new DownsampleConfig(new DateHistogramInterval("5m")) + ), + clusterState, + listener + ) + ); + assertThat( + error.getMessage(), + Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist") + ); + verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED)); + verify(indicesAdminClient).refresh(any(), any()); + verify(indicesAdminClient, never()).flush(any(), any()); + verify(indicesAdminClient, never()).forceMerge(any(), any()); + } + private void mockGetMapping(String mapping) { doAnswer(invocation -> { @SuppressWarnings("unchecked") @@ -532,4 +612,21 @@ public void testGetSupportedMetrics() { assertThat(supported.defaultMetric(), is("max")); assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs()))); } + + private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) { + var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId)) + .put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards)) + .build(); + + var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build(); + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + updatedClusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask) + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + } } From df0f87b8e8184096755665e101e04ba3a77b4414 Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Wed, 19 Nov 2025 18:39:49 +0200 Subject: [PATCH 2/2] Update TransportDownsampleActionTests.java --- .../xpack/downsample/TransportDownsampleActionTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index 9f6d85e13f93c..a644f6a7b353b 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -196,8 +196,6 @@ public void setUp() throws Exception { updateTask.listener.onResponse(AcknowledgedResponse.TRUE); return null; }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); - - // Mocks for mapping retrieval & merging when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); MappedFieldType timestampFieldMock = mock(MappedFieldType.class); when(timestampFieldMock.meta()).thenReturn(Map.of());