diff --git a/docs/changelog/137966.yaml b/docs/changelog/137966.yaml new file mode 100644 index 0000000000000..fa2df8e351511 --- /dev/null +++ b/docs/changelog/137966.yaml @@ -0,0 +1,5 @@ +pr: 137966 +summary: Allows PIT to be cross project +area: Search +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java index aa707d72bc6f1..3b8947e28dfc7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.LegacyActionRequest; +import org.elasticsearch.action.ResolvedIndexExpressions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -40,11 +41,16 @@ public final class OpenPointInTimeRequest extends LegacyActionRequest implements @Nullable private String preference; + private ResolvedIndexExpressions resolvedIndexExpressions; + @Nullable + private String projectRouting; + private QueryBuilder indexFilter; private boolean allowPartialSearchResults = false; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = SearchRequest.DEFAULT_INDICES_OPTIONS; + public static final IndicesOptions DEFAULT_CPS_INDICES_OPTIONS = SearchRequest.DEFAULT_CPS_INDICES_OPTIONS; public OpenPointInTimeRequest(String... indices) { this.indices = Objects.requireNonNull(indices, "[index] is not specified"); @@ -186,6 +192,33 @@ public boolean allowsRemoteIndices() { return true; } + @Override + public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) { + this.resolvedIndexExpressions = expressions; + } + + @Override + public ResolvedIndexExpressions getResolvedIndexExpressions() { + return resolvedIndexExpressions; + } + + @Override + public boolean allowsCrossProject() { + return true; + } + + @Override + public String getProjectRouting() { + return projectRouting; + } + + public void projectRouting(@Nullable String projectRouting) { + if (this.projectRouting != null) { + throw new IllegalArgumentException("project_routing is already set to [" + this.projectRouting + "]"); + } + this.projectRouting = projectRouting; + } + @Override public boolean includeDataStreams() { return true; diff --git a/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java index b76e5a549a0d0..5d50f4ce58eb9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; @@ -31,10 +32,10 @@ @ServerlessScope(Scope.PUBLIC) public class RestOpenPointInTimeAction extends BaseRestHandler { - private final Settings settings; + private final CrossProjectModeDecider crossProjectModeDecider; public RestOpenPointInTimeAction(Settings settings) { - this.settings = settings; + this.crossProjectModeDecider = new CrossProjectModeDecider(settings); } @Override @@ -49,14 +50,16 @@ public List routes() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) { - // accept but drop project_routing param until fully supported - request.param("project_routing"); - } final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices); - openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS)); + final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled(); + if (crossProjectEnabled) { + openRequest.projectRouting(request.param("project_routing", null)); + openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_CPS_INDICES_OPTIONS)); + } else { + openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS)); + } openRequest.routing(request.param("routing")); openRequest.preference(request.param("preference")); openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive")); @@ -80,8 +83,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC private static final ObjectParser PARSER = new ObjectParser<>("open_point_in_time_request"); private static final ParseField INDEX_FILTER_FIELD = new ParseField("index_filter"); + private static final ParseField PROJECT_ROUTING = new ParseField("project_routing"); static { PARSER.declareObject(OpenPointInTimeRequest::indexFilter, (p, c) -> parseTopLevelQuery(p), INDEX_FILTER_FIELD); + PARSER.declareString(OpenPointInTimeRequest::projectRouting, PROJECT_ROUTING); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index c611462079f63..3b0d4b758e6a5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -102,6 +102,7 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest private boolean ccsMinimizeRoundtrips; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); + public static final IndicesOptions DEFAULT_CPS_INDICES_OPTIONS = IndicesOptions.cpsStrictExpandOpenAndForbidClosedIgnoreThrottled(); private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; @@ -380,9 +381,13 @@ public ActionRequestValidationException validate() { validationException ); } - if (indicesOptions().equals(DEFAULT_INDICES_OPTIONS) == false) { + if (indicesOptions().equals(DEFAULT_INDICES_OPTIONS) == false + && indicesOptions().equals(DEFAULT_CPS_INDICES_OPTIONS) == false) { validationException = addValidationError("[indicesOptions] cannot be used with point in time", validationException); } + if (getProjectRouting() != null) { + validationException = addValidationError("[projectRouting] cannot be used with point in time", validationException); + } if (routing() != null) { validationException = addValidationError("[routing] cannot be used with point in time", validationException); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index d2497048d376b..23e0f15fa2202 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -19,10 +19,15 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.ResolvedIndexExpression; +import org.elasticsearch.action.ResolvedIndexExpressions; +import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; @@ -38,25 +43,36 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; +import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout; +import static org.elasticsearch.transport.RemoteClusterAware.buildRemoteIndexName; public class TransportOpenPointInTimeAction extends HandledTransportAction { @@ -72,6 +88,8 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction listener + ) { + String[] indices = request.indices(); + IndicesOptions originalIndicesOptions = request.indicesOptions(); + // in CPS before executing the open pit request we need to get index resolution and possibly throw based on merged project view + // rules. This should happen only if either ignore_unavailable or allow_no_indices is set to false (strict). + // If instead both are true we can continue with the "normal" pit execution. + if (originalIndicesOptions.ignoreUnavailable() && originalIndicesOptions.allowNoIndices()) { + // lenient indicesOptions thus execute standard pit + executeOpenPit(task, request, listener); + return; + } + + // ResolvedIndexExpression for the origin cluster (only) as determined by the Security Action Filter + final ResolvedIndexExpressions localResolvedIndexExpressions = request.getResolvedIndexExpressions(); + + RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService(); + final Map indicesPerCluster = remoteClusterService.groupIndices( + indicesOptionsForCrossProjectFanout(originalIndicesOptions), + indices + ); + // local indices resolution was already taken care of by the Security Action Filter + indicesPerCluster.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + + if (indicesPerCluster.isEmpty()) { + // for CPS requests that are targeting origin only, could be because of project_routing or other reasons, execute standard pit. + final Exception ex = CrossProjectIndexResolutionValidator.validate( + originalIndicesOptions, + request.getProjectRouting(), + localResolvedIndexExpressions, + Map.of() + ); + if (ex != null) { + listener.onFailure(ex); + return; + } + executeOpenPit(task, request, listener); + return; + } + + // CPS + final int linkedProjectsToQuery = indicesPerCluster.size(); + ActionListener>> responsesListener = listener.delegateFailureAndWrap( + (l, responses) -> { + Map resolvedRemoteExpressions = responses.stream() + .filter(e -> e.getValue().getResolvedIndexExpressions() != null) + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().getResolvedIndexExpressions() + + ) + ); + final Exception ex = CrossProjectIndexResolutionValidator.validate( + originalIndicesOptions, + request.getProjectRouting(), + localResolvedIndexExpressions, + resolvedRemoteExpressions + ); + if (ex != null) { + listener.onFailure(ex); + return; + } + Set collectedIndices = new HashSet<>(indices.length); + + for (Map.Entry resolvedRemoteExpressionEntry : resolvedRemoteExpressions.entrySet()) { + String remoteAlias = resolvedRemoteExpressionEntry.getKey(); + for (ResolvedIndexExpression expression : resolvedRemoteExpressionEntry.getValue().expressions()) { + ResolvedIndexExpression.LocalExpressions oneRemoteExpression = expression.localExpressions(); + if (false == oneRemoteExpression.indices().isEmpty() + && oneRemoteExpression + .localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS) { + collectedIndices.addAll( + oneRemoteExpression.indices() + .stream() + .map(i -> buildRemoteIndexName(remoteAlias, i)) + .collect(Collectors.toSet()) + ); + } + } + } + if (localResolvedIndexExpressions != null) { // this should never be null in CPS + collectedIndices.addAll(localResolvedIndexExpressions.getLocalIndicesList()); + } + request.indices(collectedIndices.toArray(String[]::new)); + executeOpenPit(task, request, listener); + } + ); + ActionListener> groupedListener = new GroupedActionListener<>( + linkedProjectsToQuery, + responsesListener + ); + + // make CPS calls + for (Map.Entry remoteClusterIndices : indicesPerCluster.entrySet()) { + String clusterAlias = remoteClusterIndices.getKey(); + OriginalIndices originalIndices = remoteClusterIndices.getValue(); + IndicesOptions relaxedFanoutIdxOptions = originalIndices.indicesOptions(); // from indicesOptionsForCrossProjectFanout + ResolveIndexAction.Request remoteRequest = new ResolveIndexAction.Request(originalIndices.indices(), relaxedFanoutIdxOptions); + + SubscribableListener connectionListener = new SubscribableListener<>(); + connectionListener.addTimeout(forceConnectTimeoutSecs, transportService.getThreadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE); + + connectionListener.addListener(groupedListener.delegateResponse((l, failure) -> { + logger.info("failed to resolve indices on remote cluster [" + clusterAlias + "]", failure); + l.onFailure(failure); + }) + .delegateFailure( + (ignored, connection) -> transportService.sendRequest( + connection, + ResolveIndexAction.REMOTE_TYPE.name(), + remoteRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(groupedListener.delegateResponse((l, failure) -> { + logger.info("Error occurred on remote cluster [" + clusterAlias + "]", failure); + l.onFailure(failure); + }).map(resolveIndexResponse -> Map.entry(clusterAlias, resolveIndexResponse)), + ResolveIndexAction.Response::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ) + ) + )); + + remoteClusterService.maybeEnsureConnectedAndGetConnection(clusterAlias, true, connectionListener); + } + } + + private void executeOpenPit(SearchTask task, OpenPointInTimeRequest request, ActionListener listener) { final SearchRequest searchRequest = new SearchRequest().indices(request.indices()) .indicesOptions(request.indicesOptions()) .preference(request.preference()) @@ -132,7 +293,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests()); searchRequest.setCcsMinimizeRoundtrips(false); - transportSearchAction.executeOpenPit((SearchTask) task, searchRequest, listener.map(r -> { + transportSearchAction.executeOpenPit(task, searchRequest, listener.map(r -> { assert r.pointInTimeId() != null : r; return new OpenPointInTimeResponse( r.pointInTimeId(), diff --git a/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java b/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java index f10a5092ea4b2..289331c0ab423 100644 --- a/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java +++ b/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java @@ -750,6 +750,25 @@ private enum Option { .allowAliasToMultipleIndices(true) ) .build(); + public static final IndicesOptions CPS_STRICT_EXPAND_OPEN_FORBID_CLOSED_IGNORE_THROTTLED = IndicesOptions.builder() + .concreteTargetOptions(ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS) + .wildcardOptions( + WildcardOptions.builder() + .matchOpen(true) + .matchClosed(false) + .includeHidden(false) + .allowEmptyExpressions(true) + .resolveAliases(true) + ) + .gatekeeperOptions( + GatekeeperOptions.builder() + .ignoreThrottled(true) + .allowClosedIndices(false) + .allowSelectors(true) + .allowAliasToMultipleIndices(true) + ) + .crossProjectModeOptions(new CrossProjectModeOptions(true)) + .build(); public static final IndicesOptions STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED = IndicesOptions.builder() .concreteTargetOptions(ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS) .wildcardOptions( @@ -1393,6 +1412,16 @@ public static IndicesOptions strictExpandOpenAndForbidClosedIgnoreThrottled() { return STRICT_EXPAND_OPEN_FORBID_CLOSED_IGNORE_THROTTLED; } + /** + * @return indices options that requires every specified index to exist, expands wildcards only to open indices, + * allows that no indices are resolved from wildcard expressions (not returning an error), + * forbids the use of closed indices by throwing an error and ignores indices that are throttled, + * and has CrossProjectModeOptions set to true. + */ + public static IndicesOptions cpsStrictExpandOpenAndForbidClosedIgnoreThrottled() { + return CPS_STRICT_EXPAND_OPEN_FORBID_CLOSED_IGNORE_THROTTLED; + } + /** * @return indices option that requires every specified index to exist, expands wildcards to both open and closed * indices and allows that no indices are resolved from wildcard expressions (not returning an error). diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 287e42f31dcf2..0c51f4a886a0e 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -262,7 +262,7 @@ public static void parseSearchRequest( searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, searchRequest.indicesOptions()); - if (crossProjectEnabled.orElse(false) && searchRequest.allowsCrossProject() && searchRequest.pointInTimeBuilder() == null) { + if (crossProjectEnabled.orElse(false) && searchRequest.allowsCrossProject()) { indicesOptions = IndicesOptions.builder(indicesOptions) .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) .build(); diff --git a/server/src/main/java/org/elasticsearch/search/crossproject/CrossProjectIndexResolutionValidator.java b/server/src/main/java/org/elasticsearch/search/crossproject/CrossProjectIndexResolutionValidator.java index 9e9291335da17..23eadce73afaf 100644 --- a/server/src/main/java/org/elasticsearch/search/crossproject/CrossProjectIndexResolutionValidator.java +++ b/server/src/main/java/org/elasticsearch/search/crossproject/CrossProjectIndexResolutionValidator.java @@ -204,7 +204,7 @@ private static ElasticsearchException checkSingleRemoteExpression( ); } - private static String[] splitQualifiedResource(String resource) { + public static String[] splitQualifiedResource(String resource) { String[] splitResource = RemoteClusterAware.splitIndexName(resource); assert splitResource.length == 2 : "Expected two strings (project and indexExpression) for a qualified resource ["