Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137966.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137966
summary: Allows PIT to be cross project
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.action.ResolvedIndexExpressions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -40,11 +41,16 @@ public final class OpenPointInTimeRequest extends LegacyActionRequest implements
@Nullable
private String preference;

private ResolvedIndexExpressions resolvedIndexExpressions;
@Nullable
private String projectRouting;

private QueryBuilder indexFilter;

private boolean allowPartialSearchResults = false;

public static final IndicesOptions DEFAULT_INDICES_OPTIONS = SearchRequest.DEFAULT_INDICES_OPTIONS;
public static final IndicesOptions DEFAULT_CPS_INDICES_OPTIONS = SearchRequest.DEFAULT_CPS_INDICES_OPTIONS;

public OpenPointInTimeRequest(String... indices) {
this.indices = Objects.requireNonNull(indices, "[index] is not specified");
Expand Down Expand Up @@ -186,6 +192,33 @@ public boolean allowsRemoteIndices() {
return true;
}

@Override
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
this.resolvedIndexExpressions = expressions;
}

@Override
public ResolvedIndexExpressions getResolvedIndexExpressions() {
return resolvedIndexExpressions;
}

@Override
public boolean allowsCrossProject() {
return true;
}

@Override
public String getProjectRouting() {
return projectRouting;
}

public void projectRouting(@Nullable String projectRouting) {
if (this.projectRouting != null) {
throw new IllegalArgumentException("project_routing is already set to [" + this.projectRouting + "]");
}
this.projectRouting = projectRouting;
}

@Override
public boolean includeDataStreams() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;

Expand All @@ -31,10 +32,10 @@
@ServerlessScope(Scope.PUBLIC)
public class RestOpenPointInTimeAction extends BaseRestHandler {

private final Settings settings;
private final CrossProjectModeDecider crossProjectModeDecider;

public RestOpenPointInTimeAction(Settings settings) {
this.settings = settings;
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
}

@Override
Expand All @@ -49,14 +50,16 @@ public List<Route> routes() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
}

final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices);
openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS));
final boolean crossProjectEnabled = crossProjectModeDecider.crossProjectEnabled();
if (crossProjectEnabled) {
openRequest.projectRouting(request.param("project_routing", null));
openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_CPS_INDICES_OPTIONS));
} else {
openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS));
}
openRequest.routing(request.param("routing"));
openRequest.preference(request.param("preference"));
openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"));
Expand All @@ -80,8 +83,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

private static final ObjectParser<OpenPointInTimeRequest, Void> PARSER = new ObjectParser<>("open_point_in_time_request");
private static final ParseField INDEX_FILTER_FIELD = new ParseField("index_filter");
private static final ParseField PROJECT_ROUTING = new ParseField("project_routing");

static {
PARSER.declareObject(OpenPointInTimeRequest::indexFilter, (p, c) -> parseTopLevelQuery(p), INDEX_FILTER_FIELD);
PARSER.declareString(OpenPointInTimeRequest::projectRouting, PROJECT_ROUTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest
private boolean ccsMinimizeRoundtrips;

public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
public static final IndicesOptions DEFAULT_CPS_INDICES_OPTIONS = IndicesOptions.cpsStrictExpandOpenAndForbidClosedIgnoreThrottled();

private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;

Expand Down Expand Up @@ -380,9 +381,13 @@ public ActionRequestValidationException validate() {
validationException
);
}
if (indicesOptions().equals(DEFAULT_INDICES_OPTIONS) == false) {
if (indicesOptions().equals(DEFAULT_INDICES_OPTIONS) == false
&& indicesOptions().equals(DEFAULT_CPS_INDICES_OPTIONS) == false) {
validationException = addValidationError("[indicesOptions] cannot be used with point in time", validationException);
}
if (getProjectRouting() != null) {
validationException = addValidationError("[projectRouting] cannot be used with point in time", validationException);
}
if (routing() != null) {
validationException = addValidationError("[routing] cannot be used with point in time", validationException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.ResolvedIndexExpression;
import org.elasticsearch.action.ResolvedIndexExpressions;
import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,25 +43,36 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout;
import static org.elasticsearch.transport.RemoteClusterAware.buildRemoteIndexName;

public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenPointInTimeRequest, OpenPointInTimeResponse> {

Expand All @@ -72,6 +88,8 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
private final SearchService searchService;
private final ClusterService clusterService;
private final SearchResponseMetrics searchResponseMetrics;
private final CrossProjectModeDecider crossProjectModeDecider;
private final TimeValue forceConnectTimeoutSecs;

@Inject
public TransportOpenPointInTimeAction(
Expand All @@ -92,6 +110,9 @@ public TransportOpenPointInTimeAction(
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterService = clusterService;
this.searchResponseMetrics = searchResponseMetrics;
this.crossProjectModeDecider = new CrossProjectModeDecider(clusterService.getSettings());
this.forceConnectTimeoutSecs = clusterService.getSettings()
.getAsTime("search.ccs.force_connect_timeout", TimeValue.timeValueSeconds(3L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(asking to learn) - When is this set vs. taking the default of 3 seconds? I think for CPS we want to always ensure it is 3 seconds, so is search.ccs.force_connect_timeout always defined (and set to 3 seconds) for CPS or should we be using the CrossProjectModeDecider here to determine what timeout setting to use?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only being used by CPS because we only call it in executeOpenPitCrossProject, so I don't think we need to have another if statement for CrossProjectModeDecider.
On when/if is set I think we (ES eng) are the only one that can override/set this setting through gitops repo

Copy link
Contributor

@pawankartik-elastic pawankartik-elastic Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this set

When we manually "inject" the corresponding value from our end for Serverless CPS. I had a brief chat with Matteo about this, and there's scope for improvement/refactoring here: we'll abstract this away and move it elsewhere so that we won't have to:

  1. Hardcode the setting in n number of places across the codebase,
  2. Repeatedly check for the existence of this setting, and,
  3. Repeatedly declare the fallback values.

Currently, it's being used in Search, Field Caps, and PIT (this).

transportService.registerRequestHandler(
OPEN_SHARD_READER_CONTEXT_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Expand Down Expand Up @@ -123,6 +144,146 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
);
return;
}

final boolean resolveCrossProject = crossProjectModeDecider.resolvesCrossProject(request);
if (resolveCrossProject) {
executeOpenPitCrossProject((SearchTask) task, request, listener);
} else {
executeOpenPit((SearchTask) task, request, listener);
}
}

private void executeOpenPitCrossProject(
SearchTask task,
OpenPointInTimeRequest request,
ActionListener<OpenPointInTimeResponse> listener
) {
String[] indices = request.indices();
IndicesOptions originalIndicesOptions = request.indicesOptions();
// in CPS before executing the open pit request we need to get index resolution and possibly throw based on merged project view
// rules. This should happen only if either ignore_unavailable or allow_no_indices is set to false (strict).
// If instead both are true we can continue with the "normal" pit execution.
if (originalIndicesOptions.ignoreUnavailable() && originalIndicesOptions.allowNoIndices()) {
// lenient indicesOptions thus execute standard pit
executeOpenPit(task, request, listener);
return;
}

// ResolvedIndexExpression for the origin cluster (only) as determined by the Security Action Filter
final ResolvedIndexExpressions localResolvedIndexExpressions = request.getResolvedIndexExpressions();

RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService();
final Map<String, OriginalIndices> indicesPerCluster = remoteClusterService.groupIndices(
indicesOptionsForCrossProjectFanout(originalIndicesOptions),
indices
);
// local indices resolution was already taken care of by the Security Action Filter
indicesPerCluster.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

if (indicesPerCluster.isEmpty()) {
// for CPS requests that are targeting origin only, could be because of project_routing or other reasons, execute standard pit.
final Exception ex = CrossProjectIndexResolutionValidator.validate(
originalIndicesOptions,
request.getProjectRouting(),
localResolvedIndexExpressions,
Map.of()
);
if (ex != null) {
listener.onFailure(ex);
return;
}
executeOpenPit(task, request, listener);
return;
}

// CPS
final int linkedProjectsToQuery = indicesPerCluster.size();
ActionListener<Collection<Map.Entry<String, ResolveIndexAction.Response>>> responsesListener = listener.delegateFailureAndWrap(
(l, responses) -> {
Map<String, ResolvedIndexExpressions> resolvedRemoteExpressions = responses.stream()
.filter(e -> e.getValue().getResolvedIndexExpressions() != null)
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().getResolvedIndexExpressions()

)
);
final Exception ex = CrossProjectIndexResolutionValidator.validate(
originalIndicesOptions,
request.getProjectRouting(),
localResolvedIndexExpressions,
resolvedRemoteExpressions
);
if (ex != null) {
listener.onFailure(ex);
return;
}
Set<String> collectedIndices = new HashSet<>(indices.length);

for (Map.Entry<String, ResolvedIndexExpressions> resolvedRemoteExpressionEntry : resolvedRemoteExpressions.entrySet()) {
String remoteAlias = resolvedRemoteExpressionEntry.getKey();
for (ResolvedIndexExpression expression : resolvedRemoteExpressionEntry.getValue().expressions()) {
ResolvedIndexExpression.LocalExpressions oneRemoteExpression = expression.localExpressions();
if (false == oneRemoteExpression.indices().isEmpty()
&& oneRemoteExpression
.localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS) {
collectedIndices.addAll(
oneRemoteExpression.indices()
.stream()
.map(i -> buildRemoteIndexName(remoteAlias, i))
.collect(Collectors.toSet())
);
}
}
}
if (localResolvedIndexExpressions != null) { // this should never be null in CPS
collectedIndices.addAll(localResolvedIndexExpressions.getLocalIndicesList());
}
request.indices(collectedIndices.toArray(String[]::new));
executeOpenPit(task, request, listener);
}
);
ActionListener<Map.Entry<String, ResolveIndexAction.Response>> groupedListener = new GroupedActionListener<>(
linkedProjectsToQuery,
responsesListener
);

// make CPS calls
for (Map.Entry<String, OriginalIndices> remoteClusterIndices : indicesPerCluster.entrySet()) {
String clusterAlias = remoteClusterIndices.getKey();
OriginalIndices originalIndices = remoteClusterIndices.getValue();
IndicesOptions relaxedFanoutIdxOptions = originalIndices.indicesOptions(); // from indicesOptionsForCrossProjectFanout
ResolveIndexAction.Request remoteRequest = new ResolveIndexAction.Request(originalIndices.indices(), relaxedFanoutIdxOptions);

SubscribableListener<Transport.Connection> connectionListener = new SubscribableListener<>();
connectionListener.addTimeout(forceConnectTimeoutSecs, transportService.getThreadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);

connectionListener.addListener(groupedListener.delegateResponse((l, failure) -> {
logger.info("failed to resolve indices on remote cluster [" + clusterAlias + "]", failure);
l.onFailure(failure);
})
.delegateFailure(
(ignored, connection) -> transportService.sendRequest(
connection,
ResolveIndexAction.REMOTE_TYPE.name(),
remoteRequest,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(groupedListener.delegateResponse((l, failure) -> {
logger.info("Error occurred on remote cluster [" + clusterAlias + "]", failure);
l.onFailure(failure);
}).map(resolveIndexResponse -> Map.entry(clusterAlias, resolveIndexResponse)),
ResolveIndexAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
)
)
));

remoteClusterService.maybeEnsureConnectedAndGetConnection(clusterAlias, true, connectionListener);
}
}

private void executeOpenPit(SearchTask task, OpenPointInTimeRequest request, ActionListener<OpenPointInTimeResponse> listener) {
final SearchRequest searchRequest = new SearchRequest().indices(request.indices())
.indicesOptions(request.indicesOptions())
.preference(request.preference())
Expand All @@ -132,7 +293,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
searchRequest.setCcsMinimizeRoundtrips(false);

transportSearchAction.executeOpenPit((SearchTask) task, searchRequest, listener.map(r -> {
transportSearchAction.executeOpenPit(task, searchRequest, listener.map(r -> {
assert r.pointInTimeId() != null : r;
return new OpenPointInTimeResponse(
r.pointInTimeId(),
Expand Down
Loading