diff --git a/docs/changelog/108394.yaml b/docs/changelog/108394.yaml new file mode 100644 index 0000000000000..58f48fa548c6e --- /dev/null +++ b/docs/changelog/108394.yaml @@ -0,0 +1,6 @@ +pr: 108394 +summary: Handle `IndexNotFoundException` +area: Transform +type: bug +issues: + - 107263 diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index ed0f721f5f7f0..df8c3f62034e5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -193,7 +193,11 @@ protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener listener) { }, listener::onFailure); var deducedDestIndexMappings = new SetOnce>(); - var shouldMaybeCreateDestIndexForUnattended = context.getCheckpoint() == 0 - && TransformEffectiveSettings.isUnattended(transformConfig.getSettings()); + + // if the unattended transform had not created the destination index yet, or if the destination index was deleted for any + // type of transform during the last run, then we try to create the destination index. + // This is important to create the destination index explicitly before indexing documents. Otherwise, the destination + // index aliases may be missing. + var shouldMaybeCreateDestIndex = isFirstUnattendedRun() || context.shouldRecreateDestinationIndex(); ActionListener> fieldMappingsListener = ActionListener.wrap(destIndexMappings -> { if (destIndexMappings.isEmpty() == false) { @@ -359,11 +363,12 @@ protected void onStart(long now, ActionListener listener) { // ... otherwise we fall back to index mappings deduced based on source indices this.fieldMappings = deducedDestIndexMappings.get(); } - // Since the unattended transform could not have created the destination index yet, we do it here. - // This is important to create the destination index explicitly before indexing first documents. Otherwise, the destination - // index aliases may be missing. - if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndexForUnattended) { - doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener); + + if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndex) { + doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener.delegateFailure((delegate, response) -> { + context.setShouldRecreateDestinationIndex(false); + delegate.onResponse(response); + })); } else { configurationReadyListener.onResponse(null); } @@ -380,7 +385,7 @@ protected void onStart(long now, ActionListener listener) { deducedDestIndexMappings.set(validationResponse.getDestIndexMappings()); if (isContinuous()) { transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> { - if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndexForUnattended == false) { + if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndex == false) { logger.trace("[{}] transform config has not changed.", getJobId()); configurationReadyListener.onResponse(null); } else { @@ -415,7 +420,7 @@ protected void onStart(long now, ActionListener listener) { }, listener::onFailure); Instant instantOfTrigger = Instant.ofEpochMilli(now); - // If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on, + // If we are not on the initial batch checkpoint and it's the first pass of whatever continuous checkpoint we are on, // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit. if (context.getCheckpoint() > 0 && initialRun()) { checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> { @@ -436,8 +441,7 @@ protected void onStart(long now, ActionListener listener) { hasSourceChanged = true; listener.onFailure(failure); })); - } else if (context.getCheckpoint() == 0 && TransformEffectiveSettings.isUnattended(transformConfig.getSettings())) { - // this transform runs in unattended mode and has never run, to go on + } else if (shouldMaybeCreateDestIndex) { validate(changedSourceListener); } else { hasSourceChanged = true; @@ -447,6 +451,13 @@ protected void onStart(long now, ActionListener listener) { } } + /** + * Returns true if this transform runs in unattended mode and has never run. + */ + private boolean isFirstUnattendedRun() { + return context.getCheckpoint() == 0 && TransformEffectiveSettings.isUnattended(transformConfig.getSettings()); + } + protected void initializeFunction() { // create the function function = FunctionFactory.create(getConfig()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java index 8618b01a0440b..8bf859a020ba4 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.tasks.TaskCancelledException; @@ -63,7 +64,7 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti } if (unwrappedThrowable instanceof ElasticsearchException elasticsearchException) { - if (isExceptionIrrecoverable(elasticsearchException)) { + if (isExceptionIrrecoverable(elasticsearchException) && isNotIndexNotFoundException(elasticsearchException)) { return elasticsearchException; } } @@ -72,6 +73,15 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti return null; } + /** + * We can safely recover from IndexNotFoundExceptions on Bulk responses. + * If the transform is running, the next checkpoint will recreate the index. + * If the transform is not running, the next start request will recreate the index. + */ + private static boolean isNotIndexNotFoundException(ElasticsearchException elasticsearchException) { + return elasticsearchException instanceof IndexNotFoundException == false; + } + public static boolean isExceptionIrrecoverable(ElasticsearchException elasticsearchException) { if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index fe54847af0404..f39a4329f2bb1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -10,10 +10,13 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -27,6 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.script.ScriptException; @@ -75,6 +79,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; @@ -85,6 +90,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.matchesRegex; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -101,6 +107,10 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase { private Client client; private ThreadPool threadPool; + private static final Function EMPTY_BULK_RESPONSE = bulkRequest -> new BulkResponse( + new BulkItemResponse[0], + 100 + ); static class MockedTransformIndexer extends ClientTransformIndexer { @@ -110,6 +120,7 @@ static class MockedTransformIndexer extends ClientTransformIndexer { // used for synchronizing with the test private CountDownLatch latch; + private int doProcessCount; MockedTransformIndexer( ThreadPool threadPool, @@ -127,7 +138,8 @@ static class MockedTransformIndexer extends ClientTransformIndexer { TransformContext context, Function searchFunction, Function bulkFunction, - Function deleteByQueryFunction + Function deleteByQueryFunction, + int doProcessCount ) { super( threadPool, @@ -157,6 +169,7 @@ static class MockedTransformIndexer extends ClientTransformIndexer { this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.deleteByQueryFunction = deleteByQueryFunction; + this.doProcessCount = doProcessCount; } public void initialize() { @@ -278,6 +291,17 @@ void doGetFieldMappings(ActionListener> fieldMappingsListene protected void persistState(TransformState state, ActionListener listener) { listener.onResponse(null); } + + @Override + protected IterationResult doProcess(SearchResponse searchResponse) { + if (doProcessCount > 0) { + doProcessCount -= 1; + // pretend that we processed 10k documents for each call + getStats().incrementNumDocuments(10_000); + return new IterationResult<>(Stream.of(new IndexRequest()), new TransformIndexerPosition(null, null), false); + } + return super.doProcess(searchResponse); + } } @Before @@ -936,6 +960,152 @@ public void testHandleFailureAuditing() { auditor.assertAllExpectationsMatched(); } + /** + * Given no bulk upload errors + * When we run the indexer + * Then we should not fail or recreate the destination index + */ + public void testHandleBulkResponseWithNoFailures() throws Exception { + var indexer = runIndexer(createMockIndexer(returnHit(), EMPTY_BULK_RESPONSE)); + assertThat(indexer.getStats().getIndexFailures(), is(0L)); + assertFalse(indexer.context.shouldRecreateDestinationIndex()); + assertNull(indexer.context.getLastFailure()); + } + + private static TransformIndexer runIndexer(MockedTransformIndexer indexer) throws Exception { + var latch = indexer.newLatch(1); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + return indexer; + } + + private MockedTransformIndexer createMockIndexer( + Function searchFunction, + Function bulkFunction + ) { + return createMockIndexer(searchFunction, bulkFunction, mock(TransformContext.Listener.class)); + } + + private static Function returnHit() { + return request -> new SearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + false, + false, + new SearchProfileResults(Collections.emptyMap()), + 1, + "", + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + } + + /** + * Given an irrecoverable bulk upload error + * When we run the indexer + * Then we should fail without retries and not recreate the destination index + */ + public void testHandleBulkResponseWithIrrecoverableFailures() throws Exception { + var failCalled = new AtomicBoolean(); + var indexer = runIndexer( + createMockIndexer( + returnHit(), + bulkResponseWithError(new ResourceNotFoundException("resource not found error")), + createContextListener(failCalled, new AtomicReference<>()) + ) + ); + assertThat(indexer.getStats().getIndexFailures(), is(1L)); + assertFalse(indexer.context.shouldRecreateDestinationIndex()); + assertTrue(failCalled.get()); + } + + private MockedTransformIndexer createMockIndexer( + Function searchFunction, + Function bulkFunction, + TransformContext.Listener listener + ) { + return createMockIndexer( + new TransformConfig( + randomAlphaOfLength(10), + randomSourceConfig(), + randomDestConfig(), + null, + null, + null, + randomPivotConfig(), + null, + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + new SettingsConfig.Builder().setMaxPageSearchSize(randomBoolean() ? null : randomIntBetween(500, 10_000)).build(), + null, + null, + null, + null + ), + new AtomicReference<>(IndexerState.STOPPED), + searchFunction, + bulkFunction, + null, + threadPool, + ThreadPool.Names.GENERIC, + mock(TransformAuditor.class), + new TransformContext(TransformTaskState.STARTED, "", 0, listener), + 1 + ); + } + + private static Function bulkResponseWithError(Exception e) { + return bulkRequest -> new BulkResponse( + new BulkItemResponse[] { + BulkItemResponse.failure(1, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("the_index", "id", e)) }, + 100 + ); + } + + /** + * Given an IndexNotFound bulk upload error + * When we run the indexer + * Then we should fail with retries and recreate the destination index + */ + public void testHandleBulkResponseWithIndexNotFound() throws Exception { + var indexer = runIndexerWithBulkResponseError(new IndexNotFoundException("Some Error")); + assertThat(indexer.getStats().getIndexFailures(), is(1L)); + assertTrue(indexer.context.shouldRecreateDestinationIndex()); + assertFalse(bulkIndexingException(indexer).isIrrecoverable()); + } + + private TransformIndexer runIndexerWithBulkResponseError(Exception e) throws Exception { + return runIndexer(createMockIndexer(returnHit(), bulkResponseWithError(e))); + } + + private static BulkIndexingException bulkIndexingException(TransformIndexer indexer) { + var lastFailure = indexer.context.getLastFailure(); + assertNotNull(lastFailure); + assertThat(lastFailure, instanceOf(BulkIndexingException.class)); + return (BulkIndexingException) lastFailure; + } + + /** + * Given a recoverable bulk upload error + * When we run the indexer + * Then we should fail with retries and not recreate the destination index + */ + public void testHandleBulkResponseWithNoIrrecoverableFailures() throws Exception { + var indexer = runIndexerWithBulkResponseError(new EsRejectedExecutionException("es rejected execution")); + assertThat(indexer.getStats().getIndexFailures(), is(1L)); + assertFalse(indexer.context.shouldRecreateDestinationIndex()); + assertFalse(bulkIndexingException(indexer).isIrrecoverable()); + } + public void testHandleFailure() { testHandleFailure(0, 5, 0, 0); testHandleFailure(5, 0, 5, 2); @@ -1042,11 +1212,36 @@ private MockedTransformIndexer createMockIndexer( String executorName, TransformAuditor auditor, TransformContext context + ) { + return createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + deleteByQueryFunction, + threadPool, + executorName, + auditor, + context, + 0 + ); + } + + private MockedTransformIndexer createMockIndexer( + TransformConfig config, + AtomicReference state, + Function searchFunction, + Function bulkFunction, + Function deleteByQueryFunction, + ThreadPool threadPool, + String executorName, + TransformAuditor auditor, + TransformContext context, + int doProcessCount ) { IndexBasedTransformConfigManager transformConfigManager = mock(IndexBasedTransformConfigManager.class); doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + ActionListener listener = invocationOnMock.getArgument(1); listener.onResponse(config); return null; }).when(transformConfigManager).getTransformConfiguration(any(), any()); @@ -1066,7 +1261,8 @@ private MockedTransformIndexer createMockIndexer( context, searchFunction, bulkFunction, - deleteByQueryFunction + deleteByQueryFunction, + doProcessCount ); indexer.initialize(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java index b71156cad5adf..9a0431d40a972 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.DocumentParsingException; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.shard.ShardId; @@ -27,116 +28,27 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentLocation; +import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; public class ExceptionRootCauseFinderTests extends ESTestCase { public void testGetFirstIrrecoverableExceptionFromBulkResponses() { - Map bulkItemResponses = new HashMap<>(); - - int id = 1; - // 1 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new DocumentParsingException(XContentLocation.UNKNOWN, "document parsing error") - ) - ) - ); - // 2 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new ResourceNotFoundException("resource not found error")) - ) - ); - // 3 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new IllegalArgumentException("illegal argument error")) - ) - ); - // 4 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new EsRejectedExecutionException("es rejected execution")) - ) - ); - // 5 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new TranslogException(new ShardId("the_index", "uid", 0), "translog error")) - ) - ); - // 6 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED) - ) - ) - ); - // 7 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN) - ) - ) - ); - // 8 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS) - ) - ) - ); - // 9 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR) - ) - ) + Map bulkItemResponses = bulkItemResponses( + new DocumentParsingException(XContentLocation.UNKNOWN, "document parsing error"), + new ResourceNotFoundException("resource not found error"), + new IllegalArgumentException("illegal argument error"), + new EsRejectedExecutionException("es rejected execution"), + new TranslogException(new ShardId("the_index", "uid", 0), "translog error"), + new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED), + new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN), + new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS), + new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR), + new IndexNotFoundException("some missing index") ); assertFirstException(bulkItemResponses.values(), DocumentParsingException.class, "document parsing error"); @@ -157,6 +69,14 @@ public void testGetFirstIrrecoverableExceptionFromBulkResponses() { assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values())); } + private static Map bulkItemResponses(Exception... exceptions) { + var id = new AtomicInteger(1); + return Arrays.stream(exceptions) + .map(exception -> new BulkItemResponse.Failure("the_index", "id", exception)) + .map(failure -> BulkItemResponse.failure(id.get(), OpType.INDEX, failure)) + .collect(Collectors.toMap(response -> id.getAndIncrement(), Function.identity())); + } + public void testIsIrrecoverable() { assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new MapperException("mappings problem"))); assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new TaskCancelledException("cancelled task"))); @@ -174,6 +94,7 @@ public void testIsIrrecoverable() { assertTrue( ExceptionRootCauseFinder.isExceptionIrrecoverable(new DocumentParsingException(new XContentLocation(1, 2), "parse error")) ); + assertTrue(ExceptionRootCauseFinder.isExceptionIrrecoverable(new IndexNotFoundException("some missing index"))); } private static void assertFirstException(Collection bulkItemResponses, Class expectedClass, String message) {