Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public final class EnrichMetadata extends AbstractNamedDiffable<Metadata.Custom>

static final ParseField POLICIES = new ParseField("policies");

public static final EnrichMetadata EMPTY = new EnrichMetadata(Collections.emptyMap());

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<EnrichMetadata, Void> PARSER = new ConstructingObjectParser<>(
"enrich_metadata",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public static void putPolicy(
}

updateClusterState(clusterService, handler, current -> {
final Map<String, EnrichPolicy> originalPolicies = getPolicies(current);
if (originalPolicies.containsKey(name)) {
throw new ResourceAlreadyExistsException("policy [{}] already exists", name);
}
for (String indexExpression : policy.getIndices()) {
// indices field in policy can contain wildcards, aliases etc.
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(
Expand All @@ -101,12 +105,9 @@ public static void putPolicy(
}
}

final Map<String, EnrichPolicy> policies = getPolicies(current);
EnrichPolicy existing = policies.putIfAbsent(name, policy);
if (existing != null) {
throw new ResourceAlreadyExistsException("policy [{}] already exists", name);
}
return policies;
final Map<String, EnrichPolicy> updatedPolicies = new HashMap<>(originalPolicies);
updatedPolicies.put(name, policy);
return updatedPolicies;
});
}

Expand All @@ -125,13 +126,14 @@ public static void deletePolicy(String name, ClusterService clusterService, Cons
}

updateClusterState(clusterService, handler, current -> {
final Map<String, EnrichPolicy> policies = getPolicies(current);
if (policies.containsKey(name) == false) {
final Map<String, EnrichPolicy> originalPolicies = getPolicies(current);
if (originalPolicies.containsKey(name) == false) {
throw new ResourceNotFoundException("policy [{}] not found", name);
}

policies.remove(name);
return policies;
final Map<String, EnrichPolicy> updatedPolicies = new HashMap<>(originalPolicies);
updatedPolicies.remove(name);
return updatedPolicies;
});
}

Expand All @@ -153,18 +155,11 @@ public static EnrichPolicy getPolicy(String name, ClusterState state) {
* Gets all policies in the cluster.
*
* @param state the cluster state
* @return a Map of <code>policyName, EnrichPolicy</code> of the policies
* @return a read-only Map of <code>policyName, EnrichPolicy</code> of the policies
*/
public static Map<String, EnrichPolicy> getPolicies(ClusterState state) {
final Map<String, EnrichPolicy> policies;
final EnrichMetadata enrichMetadata = state.metadata().custom(EnrichMetadata.TYPE);
if (enrichMetadata != null) {
// Make a copy, because policies map inside custom metadata is read only:
policies = new HashMap<>(enrichMetadata.getPolicies());
} else {
policies = new HashMap<>();
}
return policies;
final EnrichMetadata metadata = state.metadata().custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
return metadata.getPolicies();
}

private static void updateClusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
}

protected Map<String, EnrichPolicy> availablePolicies() {
final EnrichMetadata metadata = clusterService.state().metadata().custom(EnrichMetadata.TYPE);
return metadata == null ? Map.of() : metadata.getPolicies();
final EnrichMetadata metadata = clusterService.state().metadata().custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
return metadata.getPolicies();
}

protected void getRemoteConnection(String cluster, ActionListener<Transport.Connection> listener) {
Expand Down