Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void setup() {
mapping.put("field" + i, new EsField("field-" + i, TEXT, emptyMap(), true, EsField.TimeSeriesFieldType.NONE));
}

var esIndex = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD), Set.of());
var esIndex = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD), Map.of(), Map.of(), Set.of());

var functionRegistry = new EsqlFunctionRegistry();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,21 @@ public static List<String> getRemoteIndexExpressions(String... expressions) {
*/
public static String parseClusterAlias(String indexExpression) {
assert indexExpression != null : "Must not pass null indexExpression";
String[] parts = splitIndexName(indexExpression.trim());
if (parts[0] == null) {
return RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
} else {
return parts[0];
}
return getClusterAlias(splitIndexName(indexExpression.trim()));
}

/**
* @return the cluster alias or LOCAL_CLUSTER_GROUP_KEY if the split represents local index
*/
public static String getClusterAlias(String[] split) {
return split[0] == null ? LOCAL_CLUSTER_GROUP_KEY : split[0];
}

/**
* @return the local index name from the qualified index name split by RemoteClusterAware#splitIndexName
*/
public static String getLocalIndexName(String[] split) {
return split[1];
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9226000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
keystore_details_in_reload_secure_settings_response,9225000
esql_es_relation_add_split_indices,9226000
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.esql.core.util;

import java.util.function.Supplier;

/**
* Simply utility class used for setting a state, typically
* for closures (which require outside variables to be final).
Expand Down Expand Up @@ -36,7 +38,20 @@ public void setIfAbsent(T value) {
}
}

/**
* Sets a value in the holder, but only if none has already been set.
* @param value the new value to set.
*/
public void setOnce(T value) {
assert this.value == null : "Value has already been set to " + this.value;
this.value = value;
}

public T get() {
return value;
}

public T getOrDefault(Supplier<T> defaultValue) {
return value != null ? value : defaultValue.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public static EsRelation relation() {
}

public static EsRelation relation(IndexMode mode) {
return new EsRelation(EMPTY, randomIdentifier(), mode, Map.of(), List.of());
return new EsRelation(EMPTY, randomIdentifier(), mode, Map.of(), Map.of(), Map.of(), List.of());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private PhysicalPlan buildGreaterThanFilter(long value) {
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
);
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), List.of());
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of());
Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan);
return new FragmentExec(filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ private LogicalPlan resolveIndex(UnresolvedRelation plan, IndexResolution indexR
plan.source(),
esIndex.name(),
plan.indexMode(),
esIndex.originalIndices(),
esIndex.concreteIndices(),
esIndex.indexNameWithModes(),
attributes.isEmpty() ? NO_FIELDS : attributes
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,12 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
false,
false,
refs.acquire(indexResult -> {
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {
if (indexResult.isValid() && indexResult.get().concreteQualifiedIndices().size() == 1) {
EsIndex esIndex = indexResult.get();
var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteIndices(), 0));
var concreteIndices = Map.of(
request.clusterAlias,
Iterables.get(esIndex.concreteQualifiedIndices(), 0)
);
var resolved = new ResolvedEnrichPolicy(
p.getMatchField(),
p.getType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private static QueryRewriteContext queryRewriteContext(TransportActionServices s

private static Set<String> indexNames(LogicalPlan plan) {
Set<String> indexNames = new HashSet<>();
plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.concreteIndices()));
plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.concreteQualifiedIndices()));
return indexNames;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.type.EsField;

import java.util.List;
import java.util.Map;
import java.util.Set;

public record EsIndex(
String name,
Map<String, EsField> mapping, // keyed by field names
Map<String, IndexMode> indexNameWithModes,
Map<String, List<String>> originalIndices, // keyed by cluster alias
Map<String, List<String>> concreteIndices, // keyed by cluster alias
Set<String> partiallyUnmappedFields
) {

Expand All @@ -29,7 +32,7 @@ public boolean isPartiallyUnmappedField(String fieldName) {
return partiallyUnmappedFields.contains(fieldName);
}

public Set<String> concreteIndices() {
public Set<String> concreteQualifiedIndices() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
Will these all be qualified also in CPS? I mean, will local indices have an _origin: prefix? I'm just asking to learn, but I think concreteQualifiedIndices is a good method name even if they won't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe local indices will not have _origin prefix, only remote will have prefixes. I will confirm this once we are able to run CPS query.

return indexNameWithModes.keySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public static IndexResolution valid(EsIndex index, Set<String> resolvedIndices,
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
*/
public static IndexResolution valid(EsIndex index) {
return valid(index, index.concreteIndices(), Map.of());
return valid(index, index.concreteQualifiedIndices(), Map.of());
}

public static IndexResolution empty(String indexPattern) {
return valid(new EsIndex(indexPattern, Map.of(), Map.of(), Set.of()));
return valid(new EsIndex(indexPattern, Map.of(), Map.of(), Map.of(), Map.of(), Set.of()));
}

public static IndexResolution invalid(String invalid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ public final class SkipQueryOnEmptyMappings extends OptimizerRules.OptimizerRule

@Override
protected LogicalPlan rule(EsRelation plan) {
return plan.concreteIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY) : plan;
return plan.concreteQualifiedIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY) : plan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -25,6 +26,9 @@
import java.util.Set;

public class EsRelation extends LeafPlan {

private static final TransportVersion SPLIT_INDICES = TransportVersion.fromName("esql_es_relation_add_split_indices");

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"EsRelation",
Expand All @@ -33,19 +37,25 @@ public class EsRelation extends LeafPlan {

private final String indexPattern;
private final IndexMode indexMode;
private final Map<String, List<String>> originalIndices; // keyed by cluster alias
private final Map<String, List<String>> concreteIndices; // keyed by cluster alias
private final Map<String, IndexMode> indexNameWithModes;
private final List<Attribute> attrs;

public EsRelation(
Source source,
String indexPattern,
IndexMode indexMode,
Map<String, List<String>> originalIndices,
Map<String, List<String>> concreteIndices,
Map<String, IndexMode> indexNameWithModes,
List<Attribute> attributes
) {
super(source);
this.indexPattern = indexPattern;
this.indexMode = indexMode;
this.originalIndices = originalIndices;
this.concreteIndices = concreteIndices;
this.indexNameWithModes = indexNameWithModes;
this.attrs = attributes;
}
Expand All @@ -57,13 +67,22 @@ private static EsRelation readFrom(StreamInput in) throws IOException {
// this used to be part of EsIndex deserialization
in.readImmutableMap(StreamInput::readString, EsField::readFrom);
}
Map<String, List<String>> originalIndices;
Map<String, List<String>> concreteIndices;
if (in.getTransportVersion().supports(SPLIT_INDICES)) {
originalIndices = in.readMapOfLists(StreamInput::readString);
concreteIndices = in.readMapOfLists(StreamInput::readString);
} else {
originalIndices = Map.of();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scares me a bit, but I think it's fine.

Will a remote need this information to execute the plan fragment?
Looking at the code I'd say no.

One day, when we start supporting nested sub-queries and maybe multi-step coordination (ie. when the remote cluster will become a "secondary" master, potentially sending a fragment to a third cluster), the primary master will still be unable to coordinate such queries if it's on an older version, so we'll always be sure that we have the two maps.

My conclusion is that it's all good, but I wanted to write it down, in case it rings a bell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. Remotes/data nodes do not need this information, at least today. I will add a note that this information is only available in recent versions.

concreteIndices = Map.of();
}
Map<String, IndexMode> indexNameWithModes = in.readMap(IndexMode::readFrom);
List<Attribute> attributes = in.readNamedWriteableCollectionAsList(Attribute.class);
IndexMode indexMode = IndexMode.fromString(in.readString());
if (in.getTransportVersion().supports(TransportVersions.V_8_18_0) == false) {
in.readBoolean();
}
return new EsRelation(source, indexPattern, indexMode, indexNameWithModes, attributes);
return new EsRelation(source, indexPattern, indexMode, originalIndices, concreteIndices, indexNameWithModes, attributes);
}

@Override
Expand All @@ -74,6 +93,10 @@ public void writeTo(StreamOutput out) throws IOException {
// this used to be part of EsIndex serialization
out.writeMap(Map.<String, EsField>of(), (o, x) -> x.writeTo(out));
}
if (out.getTransportVersion().supports(SPLIT_INDICES)) {
out.writeMap(originalIndices, StreamOutput::writeStringCollection);
out.writeMap(concreteIndices, StreamOutput::writeStringCollection);
}
out.writeMap(indexNameWithModes, (o, v) -> IndexMode.writeTo(v, out));
out.writeNamedWriteableCollection(attrs);
out.writeString(indexMode.getName());
Expand All @@ -89,7 +112,7 @@ public String getWriteableName() {

@Override
protected NodeInfo<EsRelation> info() {
return NodeInfo.create(this, EsRelation::new, indexPattern, indexMode, indexNameWithModes, attrs);
return NodeInfo.create(this, EsRelation::new, indexPattern, indexMode, originalIndices, concreteIndices, indexNameWithModes, attrs);
}

public String indexPattern() {
Expand All @@ -100,6 +123,14 @@ public IndexMode indexMode() {
return indexMode;
}

public Map<String, List<String>> originalIndices() {
return originalIndices;
}

public Map<String, List<String>> concreteIndices() {
return concreteIndices;
}

public Map<String, IndexMode> indexNameWithModes() {
return indexNameWithModes;
}
Expand All @@ -109,7 +140,7 @@ public List<Attribute> output() {
return attrs;
}

public Set<String> concreteIndices() {
public Set<String> concreteQualifiedIndices() {
return indexNameWithModes.keySet();
}

Expand All @@ -122,7 +153,7 @@ public boolean expressionsResolved() {

@Override
public int hashCode() {
return Objects.hash(indexPattern, indexMode, indexNameWithModes, attrs);
return Objects.hash(indexPattern, indexMode, originalIndices, concreteIndices, indexNameWithModes, attrs);
}

@Override
Expand All @@ -138,6 +169,8 @@ public boolean equals(Object obj) {
EsRelation other = (EsRelation) obj;
return Objects.equals(indexPattern, other.indexPattern)
&& Objects.equals(indexMode, other.indexMode)
&& Objects.equals(originalIndices, other.originalIndices)
&& Objects.equals(concreteIndices, other.concreteIndices)
&& Objects.equals(indexNameWithModes, other.indexNameWithModes)
&& Objects.equals(attrs, other.attrs);
}
Expand All @@ -153,10 +186,10 @@ public String nodeString() {
}

public EsRelation withAttributes(List<Attribute> newAttributes) {
return new EsRelation(source(), indexPattern, indexMode, indexNameWithModes, newAttributes);
return new EsRelation(source(), indexPattern, indexMode, originalIndices, concreteIndices, indexNameWithModes, newAttributes);
}

public EsRelation withIndexMode(IndexMode indexMode) {
return new EsRelation(source(), indexPattern, indexMode, indexNameWithModes, attrs);
return new EsRelation(source(), indexPattern, indexMode, originalIndices, concreteIndices, indexNameWithModes, attrs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public class EsSourceExec extends LeafExec {

protected static final TransportVersion REMOVE_NAME_WITH_MODS = TransportVersion.fromName("esql_es_source_remove_name_with_mods");
private static final TransportVersion REMOVE_NAME_WITH_MODS = TransportVersion.fromName("esql_es_source_remove_name_with_mods");

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.aggregation.AggregatorMode;
Expand Down Expand Up @@ -61,14 +60,12 @@
import org.elasticsearch.xpack.esql.stats.SearchStats;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.EXTRACT_SPATIAL_BOUNDS;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.NONE;
Expand Down Expand Up @@ -155,30 +152,6 @@ private static ReducedPlan getPhysicalPlanReduction(int estimatedRowSize, Physic
return new ReducedPlan(EstimatesRowSize.estimateRowSize(estimatedRowSize, plan));
}

/**
* Returns a set of concrete indices after resolving the original indices specified in the FROM command.
*/
public static Set<String> planConcreteIndices(PhysicalPlan plan) {
if (plan == null) {
return Set.of();
}
var indices = new LinkedHashSet<String>();
forEachRelation(plan, relation -> indices.addAll(relation.concreteIndices()));
return indices;
}

/**
* Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters.
*/
public static String[] planOriginalIndices(PhysicalPlan plan) {
if (plan == null) {
return Strings.EMPTY_ARRAY;
}
var indices = new LinkedHashSet<String>();
forEachRelation(plan, relation -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(relation.indexPattern()))));
return indices.toArray(String[]::new);
}

public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) {
return plan.anyMatch(e -> {
if (e instanceof FragmentExec f) {
Expand All @@ -188,7 +161,7 @@ public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) {
});
}

private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
public static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> {
if (r.indexMode() != IndexMode.LOOKUP) {
action.accept(r);
Expand Down
Loading