From f0ba732a7cd095c70e7cb039c38006c049950647 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 18 Nov 2025 13:03:41 +0200 Subject: [PATCH 1/3] Create test that captures the NPE --- .../downsample/TransportDownsampleAction.java | 23 ++-- .../TransportDownsampleActionTests.java | 105 ++++++++++++++++-- 2 files changed, 106 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index b8e03a295e75d..50233fd4cb083 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc /** * This is the cluster state task executor for cluster state update actions. + * Visible for testing */ - private static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = - new SimpleBatchedExecutor<>() { - @Override - public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) - throws Exception { - return Tuple.tuple(task.execute(clusterState), null); - } + static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), null); + } - @Override - public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { - task.listener.onResponse(AcknowledgedResponse.TRUE); - } - }; + @Override + public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { + task.listener.onResponse(AcknowledgedResponse.TRUE); + } + }; @Inject public TransportDownsampleAction( diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index e7236f756f9dd..12f6219f680bb 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -56,6 +57,7 @@ import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.mockito.Answers; @@ -63,7 +65,6 @@ import org.mockito.MockitoAnnotations; import org.mockito.stubbing.Answer; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -173,12 +174,11 @@ public void setUp() throws Exception { doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any()); doAnswer(mockBroadcastResponse).when(indicesAdminClient).flush(any(), any()); - // Mocks for updating downsampling metadata doAnswer(invocation -> { var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE); + updateTask.listener.onResponse(AcknowledgedResponse.TRUE); return null; - }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); // Mocks for mapping retrieval & merging when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); @@ -219,11 +219,6 @@ public void testDownsampling() { when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); - doAnswer(invocation -> { - var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(AcknowledgedResponse.TRUE); - return null; - }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); Answer mockPersistentTask = invocation -> { ActionListener> listener = invocation.getArgument(4); PersistentTasksCustomMetadata.PersistentTask task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class); @@ -243,6 +238,7 @@ public void testDownsampling() { listener.onResponse(AcknowledgedResponse.TRUE); return null; }).when(indicesAdminClient).updateSettings(any(), any()); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -273,6 +269,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() { .build(); when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -291,7 +288,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() { verifyIndexFinalisation(); } - public void testDownsamplingWithShortCircuitDuringCreation() throws IOException { + public void testDownsamplingWithShortCircuitDuringCreation() { var projectMetadata = ProjectMetadata.builder(projectId) .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) .build(); @@ -315,6 +312,7 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException ) .build() ); + assertSuccessfulUpdateDownsampleStatus(clusterService.state()); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -333,6 +331,76 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException verifyIndexFinalisation(); } + public void testDownsamplingWhenTargetIndexGetsDeleted() { + var projectMetadata = ProjectMetadata.builder(projectId) + .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) + .build(); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(projectMetadata) + .blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + + Answer mockPersistentTask = invocation -> { + ActionListener> listener = invocation.getArgument(4); + PersistentTasksCustomMetadata.PersistentTask task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class); + when(task1.getId()).thenReturn(randomAlphaOfLength(10)); + DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.COMPLETED, + null + ); + when(task1.getState()).thenReturn(runningTaskState); + listener.onResponse(task1); + return null; + }; + doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any()); + doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any()); + doAnswer(invocation -> { + var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeHandlingResults( + clusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask), + task1 -> {}, + TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + IllegalStateException error = safeAwaitFailure( + IllegalStateException.class, + AcknowledgedResponse.class, + listener -> action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.ONE_HOUR, + new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod()) + ), + clusterState, + listener + ) + ); + assertThat( + error.getMessage(), + Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist") + ); + verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED)); + verify(indicesAdminClient).refresh(any(), any()); + verify(indicesAdminClient, never()).flush(any(), any()); + verify(indicesAdminClient, never()).forceMerge(any(), any()); + } + private void verifyIndexFinalisation() { verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); verify(indicesAdminClient).refresh(any(), any()); @@ -476,4 +544,21 @@ public void testGetSupportedMetrics() { assertThat(supported.defaultMetric(), is("max")); assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs()))); } + + private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) { + var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId)) + .put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards)) + .build(); + + var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build(); + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + updatedClusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask) + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + } } From d34d91cbac58e538244b272b7ca54516b54fa51b Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 18 Nov 2025 14:45:45 +0200 Subject: [PATCH 2/3] Throw a relevant error when the index is missing --- .../xpack/downsample/TransportDownsampleAction.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 50233fd4cb083..ea6f33c903009 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -1105,7 +1105,6 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. listener.onFailure(e); } @@ -1160,6 +1159,11 @@ public ClusterState execute(ClusterState currentState) { logger.debug("Updating downsample index status for [{}]", downsampleIndexName); final ProjectMetadata project = currentState.metadata().getProject(projectId); final IndexMetadata downsampleIndex = project.index(downsampleIndexName); + if (downsampleIndex == null) { + throw new IllegalStateException( + "Failed to update downsample status because [" + downsampleIndexName + "] does not exist" + ); + } if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) { return currentState; } @@ -1181,7 +1185,6 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. actionListener.onFailure(e); } @@ -1254,8 +1257,8 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); - logger.debug("Downsampling measured successfully", e); + recordFailureMetrics(startTime); + logger.debug("Downsampling failure measured successfully", e); this.actionListener.onFailure(e); } From 3fbf1a8975721ebf1eb5f68818a2e00a52d7a593 Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Wed, 19 Nov 2025 11:25:13 +0200 Subject: [PATCH 3/3] Update docs/changelog/138228.yaml --- docs/changelog/138228.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/138228.yaml diff --git a/docs/changelog/138228.yaml b/docs/changelog/138228.yaml new file mode 100644 index 0000000000000..0136025ddaec8 --- /dev/null +++ b/docs/changelog/138228.yaml @@ -0,0 +1,6 @@ +pr: 138228 +summary: "Fix: Downsample returns appropriate error when target index gets deleted\ + \ unexpectedly." +area: Downsampling +type: bug +issues: []