Skip to content

Commit

Permalink
Prepare enrich plan to support multi clusters (#104355)
Browse files Browse the repository at this point in the history
This is a prerequisite to support enrich in cross-clusters query. The 
main change in this pull request is to replace esIndex in logical and
physical Enrich plan with a map from the cluster to concrete enrich
indices so that each cluster can select its own concrete enrich index
for performing lookups.
  • Loading branch information
dnhatn committed Jan 17, 2024
1 parent 32ace95 commit 876e701
Show file tree
Hide file tree
Showing 26 changed files with 387 additions and 289 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104355.yaml
@@ -0,0 +1,5 @@
pr: 104355
summary: Prepare enrich plan to support multi clusters
area: ES|QL
type: enhancement
issues: []
Expand Up @@ -185,6 +185,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_ENRICH_POLICY_CCQ_MODE = def(8_573_00_0);
public static final TransportVersion DATE_HISTOGRAM_SUPPORT_DOWNSAMPLED_TZ = def(8_574_00_0);
public static final TransportVersion PEERFINDER_REPORTS_PEERS_MASTERS = def(8_575_00_0);
public static final TransportVersion ESQL_MULTI_CLUSTERS_ENRICH = def(8_576_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Expand Up @@ -145,7 +145,7 @@ public static Map<String, EsField> loadMapping(String name) {
}

public static EnrichResolution emptyPolicyResolution() {
return new EnrichResolution(Set.of(), Set.of());
return new EnrichResolution();
}

public static SearchStats statsForMissingField(String... names) {
Expand Down
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolution;
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
Expand Down Expand Up @@ -43,7 +42,6 @@
import org.elasticsearch.xpack.ql.expression.function.UnresolvedFunction;
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.BinaryComparison;
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.index.IndexResolution;
import org.elasticsearch.xpack.ql.plan.TableIdentifier;
import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
Expand Down Expand Up @@ -209,52 +207,35 @@ protected LogicalPlan rule(Enrich plan, AnalyzerContext context) {
// the policy does not exist
return plan;
}
String policyName = (String) plan.policyName().fold();
EnrichPolicyResolution policyRes = context.enrichResolution()
.resolvedPolicies()
.stream()
.filter(x -> x.policyName().equals(policyName))
.findFirst()
.orElse(new EnrichPolicyResolution(policyName, null, null));

IndexResolution idx = policyRes.index();
EnrichPolicy policy = policyRes.policy();

var policyNameExp = policy == null || idx == null
? new UnresolvedAttribute(
plan.policyName().source(),
policyName,
null,
unresolvedPolicyError(policyName, context.enrichResolution())
)
: plan.policyName();

var matchField = policy != null && (plan.matchField() == null || plan.matchField() instanceof EmptyAttribute)
? new UnresolvedAttribute(plan.source(), policy.getMatchField())
: plan.matchField();

List<NamedExpression> enrichFields = policy == null || idx == null
? (plan.enrichFields() == null ? List.of() : plan.enrichFields())
: calculateEnrichFields(
final String policyName = (String) plan.policyName().fold();
final EnrichResolution.ResolvedPolicy resolvedPolicy = context.enrichResolution().getResolvedPolicy(policyName);
if (resolvedPolicy != null) {
EnrichPolicy policy = resolvedPolicy.policy();
var matchField = plan.matchField() == null || plan.matchField() instanceof EmptyAttribute
? new UnresolvedAttribute(plan.source(), policy.getMatchField())
: plan.matchField();
List<NamedExpression> enrichFields = calculateEnrichFields(
plan.source(),
policyName,
mappingAsAttributes(plan.source(), idx.get().mapping()),
mappingAsAttributes(plan.source(), resolvedPolicy.mapping()),
plan.enrichFields(),
policy
);

return new Enrich(plan.source(), plan.child(), plan.mode(), policyNameExp, matchField, policyRes, enrichFields);
}

private String unresolvedPolicyError(String policyName, EnrichResolution enrichResolution) {
List<String> potentialMatches = StringUtils.findSimilar(policyName, enrichResolution.existingPolicies());
String msg = "unresolved enrich policy [" + policyName + "]";
if (CollectionUtils.isEmpty(potentialMatches) == false) {
msg += ", did you mean "
+ (potentialMatches.size() == 1 ? "[" + potentialMatches.get(0) + "]" : "any of " + potentialMatches)
+ "?";
return new Enrich(
plan.source(),
plan.child(),
plan.mode(),
plan.policyName(),
matchField,
policy,
resolvedPolicy.concreteIndices(),
enrichFields
);
} else {
String error = context.enrichResolution().getError(policyName);
var policyNameExp = new UnresolvedAttribute(plan.policyName().source(), policyName, null, error);
return new Enrich(plan.source(), plan.child(), plan.mode(), policyNameExp, plan.matchField(), null, Map.of(), List.of());
}
return msg;
}

public static List<NamedExpression> calculateEnrichFields(
Expand Down Expand Up @@ -589,6 +570,7 @@ private LogicalPlan resolveEnrich(Enrich enrich, List<Attribute> childrenOutput)
enrich.policyName(),
resolved,
enrich.policy(),
enrich.concreteIndices(),
enrich.enrichFields()
);
}
Expand Down
Expand Up @@ -7,8 +7,73 @@

package org.elasticsearch.xpack.esql.analysis;

import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolution;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.ql.type.EsField;
import org.elasticsearch.xpack.ql.util.CollectionUtils;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

public record EnrichResolution(Set<EnrichPolicyResolution> resolvedPolicies, Set<String> existingPolicies) {}
/**
* Holds the resolution results of the enrich polices.
* The results and errors are collected via {@link #addResolvedPolicy} and {@link #addError}.
* And the results can be retrieved via {@link #getResolvedPolicy} and {@link #getError}
*/
public final class EnrichResolution {

private final Map<String, ResolvedPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap(); // policy name -> resolved policy
private final Map<String, String> errors = ConcurrentCollections.newConcurrentMap(); // policy to error
private final Set<String> existingPolicies = ConcurrentCollections.newConcurrentSet(); // for suggestion

public ResolvedPolicy getResolvedPolicy(String policyName) {
return resolvedPolicies.get(policyName);
}

public Collection<EnrichPolicy> resolvedEnrichPolicies() {
return resolvedPolicies.values().stream().map(r -> r.policy).toList();
}

public String getError(String policyName) {
final String error = errors.get(policyName);
if (error != null) {
return error;
}
return notFoundError(policyName);
}

public void addResolvedPolicy(
String policyName,
EnrichPolicy policy,
Map<String, String> concreteIndices,
Map<String, EsField> mapping
) {
resolvedPolicies.put(policyName, new ResolvedPolicy(policy, concreteIndices, mapping));
}

public void addError(String policyName, String reason) {
errors.put(policyName, reason);
}

public void addExistingPolicies(Set<String> policyNames) {
existingPolicies.addAll(policyNames);
}

private String notFoundError(String policyName) {
List<String> potentialMatches = StringUtils.findSimilar(policyName, existingPolicies);
String msg = "unresolved enrich policy [" + policyName + "]";
if (CollectionUtils.isEmpty(potentialMatches) == false) {
msg += ", did you mean "
+ (potentialMatches.size() == 1 ? "[" + potentialMatches.get(0) + "]" : "any of " + potentialMatches)
+ "?";
}
return msg;
}

public record ResolvedPolicy(EnrichPolicy policy, Map<String, String> concreteIndices, Map<String, EsField> mapping) {

}
}

This file was deleted.

Expand Up @@ -11,12 +11,16 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
Expand All @@ -25,10 +29,14 @@
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.index.IndexResolver;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -48,36 +56,69 @@ public EnrichPolicyResolver(ClusterService clusterService, TransportService tran
transportService.registerRequestHandler(
RESOLVE_ACTION_NAME,
threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME),
ResolveRequest::new,
LookupRequest::new,
new RequestHandler()
);
}

public void resolvePolicy(String policyName, ActionListener<EnrichPolicyResolution> listener) {
public void resolvePolicy(Collection<String> policyNames, ActionListener<EnrichResolution> listener) {
if (policyNames.isEmpty()) {
listener.onResponse(new EnrichResolution());
return;
}
transportService.sendRequest(
clusterService.localNode(),
RESOLVE_ACTION_NAME,
new ResolveRequest(policyName),
new ActionListenerResponseHandler<>(
listener.map(r -> r.resolution),
ResolveResponse::new,
threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME)
)
new LookupRequest(policyNames),
new ActionListenerResponseHandler<>(listener.delegateFailureAndWrap((l, lookup) -> {
final EnrichResolution resolution = new EnrichResolution();
resolution.addExistingPolicies(lookup.allPolicies);
try (RefCountingListener refs = new RefCountingListener(l.map(unused -> resolution))) {
for (Map.Entry<String, EnrichPolicy> e : lookup.policies.entrySet()) {
resolveOnePolicy(e.getKey(), e.getValue(), resolution, refs.acquire());
}
}
}), LookupResponse::new, threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME))
);
}

private void resolveOnePolicy(String policyName, EnrichPolicy policy, EnrichResolution resolution, ActionListener<Void> listener) {
ThreadContext threadContext = threadPool.getThreadContext();
listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
indexResolver.resolveAsMergedMapping(
EnrichPolicy.getBaseName(policyName),
IndexResolver.ALL_FIELDS,
false,
Map.of(),
listener.map(indexResult -> {
if (indexResult.isValid()) {
EsIndex esIndex = indexResult.get();
Set<String> indices = esIndex.concreteIndices();
var concreteIndices = Map.of(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, Iterables.get(indices, 0));
resolution.addResolvedPolicy(policyName, policy, concreteIndices, esIndex.mapping());
} else {
resolution.addError(policyName, indexResult.toString());
}
return null;
}),
EsqlSession::specificValidity
);
}
}

private static UnsupportedOperationException unsupported() {
return new UnsupportedOperationException("local node transport action");
}

private static class ResolveRequest extends TransportRequest {
private final String policyName;
private static class LookupRequest extends TransportRequest {
private final Collection<String> policyNames;

ResolveRequest(String policyName) {
this.policyName = policyName;
LookupRequest(Collection<String> policyNames) {
this.policyNames = policyNames;
}

ResolveRequest(StreamInput in) {
LookupRequest(StreamInput in) {
throw unsupported();
}

Expand All @@ -87,14 +128,16 @@ public void writeTo(StreamOutput out) {
}
}

private static class ResolveResponse extends TransportResponse {
private final EnrichPolicyResolution resolution;
private static class LookupResponse extends TransportResponse {
final Map<String, EnrichPolicy> policies;
final Set<String> allPolicies;

ResolveResponse(EnrichPolicyResolution resolution) {
this.resolution = resolution;
LookupResponse(Map<String, EnrichPolicy> policies, Set<String> allPolicies) {
this.policies = policies;
this.allPolicies = allPolicies;
}

ResolveResponse(StreamInput in) {
LookupResponse(StreamInput in) {
throw unsupported();
}

Expand All @@ -104,38 +147,19 @@ public void writeTo(StreamOutput out) {
}
}

private class RequestHandler implements TransportRequestHandler<ResolveRequest> {
private class RequestHandler implements TransportRequestHandler<LookupRequest> {
@Override
public void messageReceived(ResolveRequest request, TransportChannel channel, Task task) throws Exception {
String policyName = request.policyName;
EnrichPolicy policy = policies().get(policyName);
ThreadContext threadContext = threadPool.getThreadContext();
ActionListener<ResolveResponse> listener = new ChannelActionListener<>(channel);
listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
indexResolver.resolveAsMergedMapping(
EnrichPolicy.getBaseName(policyName),
IndexResolver.ALL_FIELDS,
false,
Map.of(),
listener.map(indexResult -> new ResolveResponse(new EnrichPolicyResolution(policyName, policy, indexResult))),
EsqlSession::specificValidity
);
public void messageReceived(LookupRequest request, TransportChannel channel, Task task) throws Exception {
final EnrichMetadata metadata = clusterService.state().metadata().custom(EnrichMetadata.TYPE);
final Map<String, EnrichPolicy> policies = metadata == null ? Map.of() : metadata.getPolicies();
final Map<String, EnrichPolicy> results = Maps.newMapWithExpectedSize(request.policyNames.size());
for (String policyName : request.policyNames) {
EnrichPolicy p = policies.get(policyName);
if (p != null) {
results.put(policyName, new EnrichPolicy(p.getType(), null, List.of(), p.getMatchField(), p.getEnrichFields()));
}
}
new ChannelActionListener<>(channel).onResponse(new LookupResponse(results, policies.keySet()));
}
}

public Set<String> allPolicyNames() {
// TODO: remove this suggestion as it exposes policy names without the right permission
return policies().keySet();
}

private Map<String, EnrichPolicy> policies() {
if (clusterService == null || clusterService.state() == null) {
return Map.of();
}
EnrichMetadata metadata = clusterService.state().metadata().custom(EnrichMetadata.TYPE);
return metadata == null ? Map.of() : metadata.getPolicies();
}

}

0 comments on commit 876e701

Please sign in to comment.