diff --git a/docs/changelog/106793.yaml b/docs/changelog/106793.yaml new file mode 100644 index 0000000000000..cf44f5a74d621 --- /dev/null +++ b/docs/changelog/106793.yaml @@ -0,0 +1,7 @@ +pr: 106793 +summary: Fail checkpoint on missing clusters +area: Transform +type: bug +issues: + - 104533 + - 106790 diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java index 7cf1f3b9adb0f..7726ec056863d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java @@ -10,6 +10,10 @@ import org.elasticsearch.ElasticsearchException; class CheckpointException extends ElasticsearchException { + CheckpointException(String msg, Object... params) { + super(msg, params); + } + CheckpointException(String msg, Throwable cause, Object... params) { super(msg, cause, params); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index f60429f954b78..106ff2fe57192 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -116,6 +116,12 @@ protected void getIndexCheckpoints(TimeValue timeout, ActionListener> groupedListener = listener; + if (resolvedIndexes.numClusters() == 0) { + var indices = String.join(",", transformConfig.getSource().getIndex()); + listener.onFailure(new CheckpointException("No clusters exist for [{}]", indices)); + return; + } + if (resolvedIndexes.numClusters() > 1) { ActionListener>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> { listener.onResponse( @@ -228,10 +234,7 @@ private static void getCheckpointsFromOneClusterV2( ); ActionListener checkpointListener; if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) { - checkpointListener = ActionListener.wrap( - checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()), - listener::onFailure - ); + checkpointListener = listener.safeMap(GetCheckpointAction.Response::getCheckpoints); } else { checkpointListener = ActionListener.wrap( checkpointResponse -> listener.onResponse( @@ -395,12 +398,12 @@ public void getCheckpointingInfo( long timestamp = clock.millis(); - getIndexCheckpoints(timeout, ActionListener.wrap(checkpointsByIndex -> { + getIndexCheckpoints(timeout, listener.delegateFailure((l, checkpointsByIndex) -> { TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L); checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint); checkpointingInfoBuilder.setOperationsBehind(TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint)); - listener.onResponse(checkpointingInfoBuilder); - }, listener::onFailure)); + l.onResponse(checkpointingInfoBuilder); + })); } @Override diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 6535afebdd2f9..3107aa3c9f06c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.MockLogAppender.LoggingExpectation; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -67,7 +68,7 @@ public class DefaultCheckpointProviderTests extends ESTestCase { - private static Logger checkpointProviderLogger = LogManager.getLogger(DefaultCheckpointProvider.class); + private static final Logger checkpointProviderLogger = LogManager.getLogger(DefaultCheckpointProvider.class); private Clock clock; private Client client; @@ -96,7 +97,7 @@ public void setUpMocks() { transformAuditor = MockTransformAuditor.createMockAuditor(); } - public void testReportSourceIndexChangesRunsEmpty() throws Exception { + public void testReportSourceIndexChangesRunsEmpty() { String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig); @@ -138,7 +139,7 @@ public void testReportSourceIndexChangesRunsEmpty() throws Exception { ); } - public void testReportSourceIndexChangesAddDelete() throws Exception { + public void testReportSourceIndexChangesAddDelete() { String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig); @@ -197,7 +198,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception { ); } - public void testReportSourceIndexChangesAddDeleteMany() throws Exception { + public void testReportSourceIndexChangesAddDeleteMany() { String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig); @@ -231,20 +232,56 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception { } public void testHandlingShardFailures() throws Exception { - String transformId = getTestName(); - String indexName = "some-index"; + var transformId = getTestName(); + var indexName = "some-index"; TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource( new SourceConfig(indexName) ).build(); - RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); + var remoteClusterResolver = mock(RemoteClusterResolver.class); doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName))).when( remoteClusterResolver ).resolve(transformConfig.getSource().getIndex()); + mockGetIndexResponse(indexName); + mockIndicesStatsResponse(indexName); + mockGetCheckpointAction(); + + var provider = new DefaultCheckpointProvider( + clock, + parentTaskClient, + remoteClusterResolver, + transformConfigManager, + transformAuditor, + transformConfig + ); + + var latch = new CountDownLatch(1); + provider.createNextCheckpoint( + null, + new LatchedActionListener<>( + ActionListener.wrap( + response -> fail("This test case must fail"), + e -> assertThat( + e.getMessage(), + startsWith( + "Source has [7] failed shards, first shard failure: [some-index][3] failed, " + + "reason [java.lang.Exception: something's wrong" + ) + ) + ), + latch + ) + ); + assertTrue(latch.await(1, TimeUnit.MILLISECONDS)); + } + + private void mockGetIndexResponse(String indexName) { GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null); doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); + } + private void mockIndicesStatsResponse(String indexName) { IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); doReturn(7).when(indicesStatsResponse).getFailedShards(); doReturn( @@ -252,8 +289,31 @@ public void testHandlingShardFailures() throws Exception { new DefaultShardOperationFailedException(indexName, 3, new Exception("something's wrong")) } ).when(indicesStatsResponse).getShardFailures(); doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); + } - DefaultCheckpointProvider provider = new DefaultCheckpointProvider( + private void mockGetCheckpointAction() { + doAnswer(invocationOnMock -> { + ActionListener listener = invocationOnMock.getArgument(2); + listener.onFailure(new ActionNotFoundTransportException("This should fail.")); + return null; + }).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); + } + + public void testHandlingNoClusters() throws Exception { + var transformId = getTestName(); + var indexName = "some-missing-index"; + var transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource( + new SourceConfig(indexName) + ).build(); + + var remoteClusterResolver = mock(RemoteClusterResolver.class); + doReturn(new RemoteClusterResolver.ResolvedIndices(Map.of(), List.of())).when(remoteClusterResolver) + .resolve(transformConfig.getSource().getIndex()); + + mockGetIndexResponse(indexName); + mockIndicesStatsResponse(indexName); + + var provider = new DefaultCheckpointProvider( clock, parentTaskClient, remoteClusterResolver, @@ -262,24 +322,18 @@ public void testHandlingShardFailures() throws Exception { transformConfig ); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); provider.createNextCheckpoint( null, new LatchedActionListener<>( ActionListener.wrap( response -> fail("This test case must fail"), - e -> assertThat( - e.getMessage(), - startsWith( - "Source has [7] failed shards, first shard failure: [some-index][3] failed, " - + "reason [java.lang.Exception: something's wrong" - ) - ) + e -> assertThat(e.getMessage(), equalTo("No clusters exist for [some-missing-index]")) ), latch ) ); - latch.await(10, TimeUnit.SECONDS); + assertTrue(latch.await(1, TimeUnit.MILLISECONDS)); } public void testSourceHasChanged() throws InterruptedException { @@ -407,8 +461,7 @@ private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transfor ); } - private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock) - throws IllegalAccessException { + private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock) { MockLogAppender mockLogAppender = new MockLogAppender(); mockLogAppender.start(); @@ -429,10 +482,9 @@ private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpec } } - @SuppressWarnings("unchecked") private static Answer withResponse(Response response) { return invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + ActionListener listener = invocationOnMock.getArgument(2); listener.onResponse(response); return null; };