diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java index 1d44ed5a1f8ef..40eb2e2ad294a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java @@ -14,6 +14,9 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; @@ -60,6 +63,7 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.transform.TransformField; @@ -76,10 +80,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.BiConsumer; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Place of all interactions with the internal transforms index. For configuration and mappings see @link{TransformInternalIndex} @@ -135,9 +139,7 @@ public void putTransformCheckpoint(TransformCheckpoint checkpoint, ActionListene .id(TransformCheckpoint.documentId(checkpoint.getTransformId(), checkpoint.getCheckpoint())) .source(source); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportIndexAction.TYPE, indexRequest, ActionListener.wrap(r -> { - listener.onResponse(true); - }, listener::onFailure)); + executeAsyncWithOrigin(TransportIndexAction.TYPE, indexRequest, listener.delegateFailureAndWrap((l, r) -> l.onResponse(true))); } catch (IOException e) { // not expected to happen but for the sake of completeness listener.onFailure(e); @@ -180,22 +182,16 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener< ) ); - executeAsyncWithOrigin( - client, - TRANSFORM_ORIGIN, - DeleteByQueryAction.INSTANCE, - deleteByQueryRequest, - ActionListener.wrap(response -> { - if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { - Tuple statusAndReason = getStatusAndReason(response); - listener.onFailure( - new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()) - ); - return; - } - listener.onResponse(true); - }, listener::onFailure) - ); + executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, response) -> { + if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { + Tuple statusAndReason = getStatusAndReason(response); + l.onFailure( + new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()) + ); + return; + } + l.onResponse(true); + })); } @Override @@ -212,22 +208,7 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener .filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId))) ) ); - executeAsyncWithOrigin( - client, - TRANSFORM_ORIGIN, - DeleteByQueryAction.INSTANCE, - deleteByQueryRequest, - ActionListener.wrap(response -> { - if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { - Tuple statusAndReason = getStatusAndReason(response); - listener.onFailure( - new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()) - ); - return; - } - listener.onResponse(response.getDeleted()); - }, listener::onFailure) - ); + deleteByQuery(listener, deleteByQueryRequest); } @Override @@ -247,22 +228,20 @@ public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow ) ); logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest()); - executeAsyncWithOrigin( - client, - TRANSFORM_ORIGIN, - DeleteByQueryAction.INSTANCE, - deleteByQueryRequest, - ActionListener.wrap(response -> { - if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { - Tuple statusAndReason = getStatusAndReason(response); - listener.onFailure( - new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()) - ); - return; - } - listener.onResponse(response.getDeleted()); - }, listener::onFailure) - ); + deleteByQuery(listener, deleteByQueryRequest); + } + + private void deleteByQuery(ActionListener listener, DeleteByQueryRequest deleteByQueryRequest) { + executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, response) -> { + if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { + Tuple statusAndReason = getStatusAndReason(response); + l.onFailure( + new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()) + ); + return; + } + l.onResponse(response.getDeleted()); + })); } @Override @@ -304,13 +283,13 @@ public void deleteOldIndices(ActionListener listener) { IndicesOptions.LENIENT_EXPAND_OPEN ); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportDeleteIndexAction.TYPE, deleteRequest, ActionListener.wrap(response -> { + executeAsyncWithOrigin(TransportDeleteIndexAction.TYPE, deleteRequest, listener.delegateFailureAndWrap((l, response) -> { if (response.isAcknowledged() == false) { - listener.onFailure(new ElasticsearchStatusException("Failed to delete internal indices", RestStatus.INTERNAL_SERVER_ERROR)); + l.onFailure(new ElasticsearchStatusException("Failed to delete internal indices", RestStatus.INTERNAL_SERVER_ERROR)); return; } - listener.onResponse(true); - }, listener::onFailure)); + l.onResponse(true); + })); } private void putTransformConfiguration( @@ -331,9 +310,7 @@ private void putTransformConfiguration( if (seqNoPrimaryTermAndIndex != null) { indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo()).setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm()); } - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportIndexAction.TYPE, indexRequest, ActionListener.wrap(r -> { - listener.onResponse(true); - }, e -> { + executeAsyncWithOrigin(TransportIndexAction.TYPE, indexRequest, ActionListener.wrap(r -> listener.onResponse(true), e -> { if (e instanceof VersionConflictEngineException) { if (DocWriteRequest.OpType.CREATE.equals(opType)) { // we want to create the transform but it already exists listener.onFailure( @@ -378,22 +355,16 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi .setAllowPartialSearchResults(false) .request(); - executeAsyncWithOrigin( - client, - TRANSFORM_ORIGIN, - TransportSearchAction.TYPE, - searchRequest, - ActionListener.wrap(searchResponse -> { - if (searchResponse.getHits().getHits().length == 0) { - // do not fail if checkpoint does not exist but return an empty checkpoint - logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint"); - resultListener.onResponse(TransformCheckpoint.EMPTY); - return; - } - BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); - parseCheckpointsLenientlyFromSource(source, transformId, resultListener); - }, resultListener::onFailure) - ); + executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, resultListener.delegateFailureAndWrap((l, searchResponse) -> { + if (searchResponse.getHits().getHits().length == 0) { + // do not fail if checkpoint does not exist but return an empty checkpoint + logger.trace("found no checkpoint for transform [{}], returning empty checkpoint", transformId); + l.onResponse(TransformCheckpoint.EMPTY); + return; + } + BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); + parseCheckpointsLenientlyFromSource(source, transformId, l); + })); } @Override @@ -416,14 +387,12 @@ public void getTransformCheckpointForUpdate( .request(); executeAsyncWithOrigin( - client, - TRANSFORM_ORIGIN, TransportSearchAction.TYPE, searchRequest, - ActionListener.wrap(searchResponse -> { + checkpointAndVersionListener.delegateFailureAndWrap((l, searchResponse) -> { if (searchResponse.getHits().getHits().length == 0) { // do not fail, this _must_ be handled by the caller - checkpointAndVersionListener.onResponse(null); + l.onResponse(null); return; } SearchHit hit = searchResponse.getHits().getHits()[0]; @@ -431,17 +400,16 @@ public void getTransformCheckpointForUpdate( parseCheckpointsLenientlyFromSource( source, transformId, - ActionListener.wrap( - parsedCheckpoint -> checkpointAndVersionListener.onResponse( + l.delegateFailureAndWrap( + (ll, parsedCheckpoint) -> ll.onResponse( Tuple.tuple( parsedCheckpoint, new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex()) ) - ), - checkpointAndVersionListener::onFailure + ) ) ); - }, checkpointAndVersionListener::onFailure) + }) ); } @@ -459,22 +427,16 @@ public void getTransformConfiguration(String transformId, ActionListenerwrap(searchResponse -> { - if (searchResponse.getHits().getHits().length == 0) { - resultListener.onFailure( - new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) - ); - return; - } - BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); - parseTransformLenientlyFromSource(source, transformId, resultListener); - }, resultListener::onFailure) - ); + executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, resultListener.delegateFailureAndWrap((l, searchResponse) -> { + if (searchResponse.getHits().getHits().length == 0) { + l.onFailure( + new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) + ); + return; + } + BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); + parseTransformLenientlyFromSource(source, transformId, l); + })); } @Override @@ -495,26 +457,29 @@ public void getTransformConfigurationForUpdate( .seqNoAndPrimaryTerm(true) .request(); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportSearchAction.TYPE, searchRequest, ActionListener.wrap(searchResponse -> { - if (searchResponse.getHits().getHits().length == 0) { - configAndVersionListener.onFailure( - new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) + executeAsyncWithOrigin( + TransportSearchAction.TYPE, + searchRequest, + configAndVersionListener.delegateFailureAndWrap((l, searchResponse) -> { + if (searchResponse.getHits().getHits().length == 0) { + l.onFailure( + new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) + ); + return; + } + SearchHit hit = searchResponse.getHits().getHits()[0]; + BytesReference source = hit.getSourceRef(); + parseTransformLenientlyFromSource( + source, + transformId, + l.delegateFailureAndWrap( + (ll, config) -> ll.onResponse( + Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex())) + ) + ) ); - return; - } - SearchHit hit = searchResponse.getHits().getHits()[0]; - BytesReference source = hit.getSourceRef(); - parseTransformLenientlyFromSource( - source, - transformId, - ActionListener.wrap( - config -> configAndVersionListener.onResponse( - Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex())) - ), - configAndVersionListener::onFailure - ) - ); - }, configAndVersionListener::onFailure)); + }) + ); } @Override @@ -543,48 +508,40 @@ public void expandTransformIds( final ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(idTokens, allowNoMatch); - executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - TRANSFORM_ORIGIN, - request, - ActionListener.wrap(searchResponse -> { - long totalHits = searchResponse.getHits().getTotalHits().value; - // important: preserve order - Set ids = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length); - Set configs = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length); - for (SearchHit hit : searchResponse.getHits().getHits()) { - try (XContentParser parser = createParser(hit)) { - TransformConfig config = TransformConfig.fromXContent(parser, null, true); - if (ids.add(config.getId())) { - configs.add(config); - } - } catch (IOException e) { - foundConfigsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e)); - return; + executeAsyncWithOrigin(request, foundConfigsListener.delegateFailureAndWrap((l, searchResponse) -> { + long totalHits = searchResponse.getHits().getTotalHits().value; + // important: preserve order + Set ids = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length); + Set configs = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length); + for (SearchHit hit : searchResponse.getHits().getHits()) { + try (XContentParser parser = createParser(hit)) { + TransformConfig config = TransformConfig.fromXContent(parser, null, true); + if (ids.add(config.getId())) { + configs.add(config); } - } - requiredMatches.filterMatchedIds(ids); - if (requiredMatches.hasUnmatchedIds()) { - // some required Ids were not found - foundConfigsListener.onFailure( - new ResourceNotFoundException( - TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, requiredMatches.unmatchedIdsString()) - ) - ); + } catch (IOException e) { + l.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e)); return; } - // if only exact ids have been given, take the count from docs to avoid potential duplicates - // in versioned indexes (like transform) - if (requiredMatches.isOnlyExact()) { - foundConfigsListener.onResponse( - new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))) - ); - } else { - foundConfigsListener.onResponse(new Tuple<>(totalHits, Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs)))); - } - }, foundConfigsListener::onFailure), - client::search - ); + } + requiredMatches.filterMatchedIds(ids); + if (requiredMatches.hasUnmatchedIds()) { + // some required Ids were not found + l.onFailure( + new ResourceNotFoundException( + TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, requiredMatches.unmatchedIdsString()) + ) + ); + return; + } + // if only exact ids have been given, take the count from docs to avoid potential duplicates + // in versioned indexes (like transform) + if (requiredMatches.isOnlyExact()) { + l.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs)))); + } else { + l.onResponse(new Tuple<>(totalHits, Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs)))); + } + }), client::search); } private XContentParser createParser(BytesReference source) throws IOException { @@ -601,12 +558,7 @@ private XContentParser createParser(SearchHit hit) throws IOException { @Override public void getAllTransformIds(TimeValue timeout, ActionListener> listener) { - expandAllTransformIds( - false, - MAX_RESULTS_WINDOW, - timeout, - ActionListener.wrap(r -> listener.onResponse(r.v2()), listener::onFailure) - ); + expandAllTransformIds(false, MAX_RESULTS_WINDOW, timeout, listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.v2()))); } @Override @@ -616,7 +568,7 @@ public void getAllOutdatedTransformIds(TimeValue timeout, ActionListener listener) { - ActionListener deleteListener = ActionListener.wrap(dbqResponse -> { listener.onResponse(true); }, e -> { + ActionListener deleteListener = ActionListener.wrap(dbqResponse -> listener.onResponse(true), e -> { if (e.getClass() == IndexNotFoundException.class) { listener.onFailure( new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) @@ -636,7 +588,7 @@ public void resetTransform(String transformId, ActionListener listener) .query(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId)) .trackTotalHitsUpTo(1) ); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportSearchAction.TYPE, searchRequest, ActionListener.wrap(searchResponse -> { + executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, deleteListener.delegateFailureAndWrap((l, searchResponse) -> { if (searchResponse.getHits().getTotalHits().value == 0) { listener.onFailure( new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) @@ -655,8 +607,8 @@ public void resetTransform(String transformId, ActionListener listener) TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED ).setQuery(dbqQuery).setRefresh(true); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteByQueryAction.INSTANCE, dbqRequest, deleteListener); - }, deleteListener::onFailure)); + executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, dbqRequest, l); + })); } @Override @@ -668,7 +620,7 @@ public void deleteTransform(String transformId, ActionListener listener request.setQuery(query); request.setRefresh(true); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> { + executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> { if (deleteResponse.getDeleted() == 0) { listener.onFailure( new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) @@ -714,8 +666,6 @@ public void putOrUpdateTransformStoredDoc( } executeAsyncWithOrigin( - client, - TRANSFORM_ORIGIN, TransportIndexAction.TYPE, indexRequest, ActionListener.wrap( @@ -758,38 +708,30 @@ public void getTransformStoredDoc( .seqNoAndPrimaryTerm(true) .request(); - executeAsyncWithOrigin( - client, - TRANSFORM_ORIGIN, - TransportSearchAction.TYPE, - searchRequest, - ActionListener.wrap(searchResponse -> { - if (searchResponse.getHits().getHits().length == 0) { - if (allowNoMatch) { - resultListener.onResponse(null); - } else { - resultListener.onFailure( - new ResourceNotFoundException( - TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId) - ) - ); - } - return; - } - SearchHit searchHit = searchResponse.getHits().getHits()[0]; - try (XContentParser parser = createParser(searchHit)) { - resultListener.onResponse( - Tuple.tuple(TransformStoredDoc.fromXContent(parser), SeqNoPrimaryTermAndIndex.fromSearchHit(searchHit)) - ); - } catch (Exception e) { - logger.error( - TransformMessages.getMessage(TransformMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId), - e + executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, resultListener.delegateFailureAndWrap((l, searchResponse) -> { + if (searchResponse.getHits().getHits().length == 0) { + if (allowNoMatch) { + l.onResponse(null); + } else { + l.onFailure( + new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId)) ); - resultListener.onFailure(e); } - }, resultListener::onFailure) - ); + return; + } + SearchHit searchHit = searchResponse.getHits().getHits()[0]; + try (XContentParser parser = createParser(searchHit)) { + resultListener.onResponse( + Tuple.tuple(TransformStoredDoc.fromXContent(parser), SeqNoPrimaryTermAndIndex.fromSearchHit(searchHit)) + ); + } catch (Exception e) { + logger.error( + TransformMessages.getMessage(TransformMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId), + e + ); + resultListener.onFailure(e); + } + })); } @Override @@ -816,43 +758,50 @@ public void getTransformStoredDocs( .setTimeout(timeout) .request(); - executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - TRANSFORM_ORIGIN, - searchRequest, - ActionListener.wrap(searchResponse -> { - List stats = new ArrayList<>(); - String previousId = null; - for (SearchHit hit : searchResponse.getHits().getHits()) { - // skip old versions - if (hit.getId().equals(previousId) == false) { - previousId = hit.getId(); - try (XContentParser parser = createParser(hit)) { - stats.add(TransformStoredDoc.fromXContent(parser)); - } catch (IOException e) { - listener.onFailure(new ElasticsearchParseException("failed to parse transform stats from search hit", e)); - return; - } + executeAsyncWithOrigin(searchRequest, listener.delegateFailureAndWrap((l, searchResponse) -> { + List stats = new ArrayList<>(); + String previousId = null; + for (SearchHit hit : searchResponse.getHits().getHits()) { + // skip old versions + if (hit.getId().equals(previousId) == false) { + previousId = hit.getId(); + try (XContentParser parser = createParser(hit)) { + stats.add(TransformStoredDoc.fromXContent(parser)); + } catch (IOException e) { + l.onFailure(new ElasticsearchParseException("failed to parse transform stats from search hit", e)); + return; } } - - listener.onResponse(stats); - }, listener::onFailure), - client::search - ); + } + l.onResponse(stats); + }), client::search); } @Override public void refresh(ActionListener listener) { executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - TRANSFORM_ORIGIN, new RefreshRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME), - ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure), + listener.delegateFailureAndWrap((l, r) -> l.onResponse(true)), client.admin().indices()::refresh ); } + private void executeAsyncWithOrigin( + Request request, + ActionListener listener, + BiConsumer> consumer + ) { + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request, listener, consumer); + } + + private void executeAsyncWithOrigin( + ActionType action, + Request request, + ActionListener listener + ) { + ClientHelper.executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, action, request, listener); + } + private void parseTransformLenientlyFromSource( BytesReference source, String transformId, @@ -950,51 +899,45 @@ private void recursiveExpandAllTransformIds( ) .request(); - executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - TRANSFORM_ORIGIN, - request, - ActionListener.wrap(searchResponse -> { - long totalHits = total; - String idOfLastHit = lastId; - - for (SearchHit hit : searchResponse.getHits().getHits()) { - String id = hit.field(TransformField.ID.getPreferredName()).getValue(); - - // paranoia - if (Strings.isNullOrEmpty(id)) { - continue; - } + executeAsyncWithOrigin(request, listener.delegateFailureAndWrap((l, searchResponse) -> { + long totalHits = total; + String idOfLastHit = lastId; - // only count hits if looking for outdated transforms - if (filterForOutdated && hit.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) { - ++totalHits; - } else if (id.equals(idOfLastHit) == false && collectedIds.add(id)) { - ++totalHits; - } - idOfLastHit = id; + for (SearchHit hit : searchResponse.getHits().getHits()) { + String id = hit.field(TransformField.ID.getPreferredName()).getValue(); + + // paranoia + if (Strings.isNullOrEmpty(id)) { + continue; } - if (searchResponse.getHits().getHits().length == page.getSize()) { - PageParams nextPage = new PageParams(page.getFrom() + page.getSize(), maxResultWindow); - - recursiveExpandAllTransformIds( - collectedIds, - totalHits, - filterForOutdated, - maxResultWindow, - idOfLastHit, - nextPage, - timeout, - listener - ); - return; + // only count hits if looking for outdated transforms + if (filterForOutdated && hit.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) { + ++totalHits; + } else if (id.equals(idOfLastHit) == false && collectedIds.add(id)) { + ++totalHits; } + idOfLastHit = id; + } - listener.onResponse(new Tuple<>(totalHits, collectedIds)); - }, listener::onFailure), - client::search - ); + if (searchResponse.getHits().getHits().length == page.getSize()) { + PageParams nextPage = new PageParams(page.getFrom() + page.getSize(), maxResultWindow); + + recursiveExpandAllTransformIds( + collectedIds, + totalHits, + filterForOutdated, + maxResultWindow, + idOfLastHit, + nextPage, + timeout, + l + ); + return; + } + + l.onResponse(new Tuple<>(totalHits, collectedIds)); + }), client::search); } private static Tuple getStatusAndReason(final BulkByScrollResponse response) {