Skip to content

Commit

Permalink
Operator/slm policy (#89567)
Browse files Browse the repository at this point in the history
Add file based settings handler for SLM
  • Loading branch information
grcevski committed Sep 20, 2022
1 parent ce82323 commit f0e498b
Show file tree
Hide file tree
Showing 9 changed files with 1,124 additions and 80 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/89567.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89567
summary: Operator/slm policy
area: Infra/Core
type: enhancement
issues: []

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.elasticsearch.xpack.slm.SnapshotLifecycleTask;
import org.elasticsearch.xpack.slm.SnapshotRetentionService;
import org.elasticsearch.xpack.slm.SnapshotRetentionTask;
import org.elasticsearch.xpack.slm.action.ReservedSnapshotAction;
import org.elasticsearch.xpack.slm.action.RestDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotRetentionAction;
Expand Down Expand Up @@ -420,7 +421,7 @@ public List<RestHandler> getRestHandlers(
}

List<ReservedClusterStateHandler<?>> reservedClusterStateHandlers() {
return List.of(reservedLifecycleAction.get());
return List.of(reservedLifecycleAction.get(), new ReservedSnapshotAction());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,9 @@ public List<LifecyclePolicy> fromXContent(XContentParser parser) throws IOExcept

return result;
}

@Override
public Collection<String> optionalDependencies() {
return List.of("snapshot_repositories");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.slm.action;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.slm.action.PutSnapshotLifecycleAction;
import org.elasticsearch.xpack.slm.SnapshotLifecycleService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentHelper.mapToXContentParser;

/**
* This {@link ReservedClusterStateHandler} is responsible for reserved state
* CRUD operations on SLM policies in, e.g. file based settings.
* <p>
* Internally it uses {@link TransportPutSnapshotLifecycleAction} and
* {@link TransportDeleteSnapshotLifecycleAction} to add, update and delete ILM policies.
*/
public class ReservedSnapshotAction implements ReservedClusterStateHandler<List<SnapshotLifecyclePolicy>> {

public static final String NAME = "slm";

public ReservedSnapshotAction() {}

@Override
public String name() {
return NAME;
}

private Collection<PutSnapshotLifecycleAction.Request> prepare(List<SnapshotLifecyclePolicy> policies, ClusterState state) {
List<PutSnapshotLifecycleAction.Request> result = new ArrayList<>();

List<Exception> exceptions = new ArrayList<>();

for (var policy : policies) {
PutSnapshotLifecycleAction.Request request = new PutSnapshotLifecycleAction.Request(policy.getId(), policy);
try {
validate(request);
SnapshotLifecycleService.validateRepositoryExists(request.getLifecycle().getRepository(), state);
SnapshotLifecycleService.validateMinimumInterval(request.getLifecycle(), state);
result.add(request);
} catch (Exception e) {
exceptions.add(e);
}
}

if (exceptions.isEmpty() == false) {
var illegalArgumentException = new IllegalArgumentException("Error on validating SLM requests");
exceptions.forEach(illegalArgumentException::addSuppressed);
throw illegalArgumentException;
}

return result;
}

@Override
public TransformState transform(Object source, TransformState prevState) throws Exception {
@SuppressWarnings("unchecked")
var requests = prepare((List<SnapshotLifecyclePolicy>) source, prevState.state());

ClusterState state = prevState.state();

for (var request : requests) {
TransportPutSnapshotLifecycleAction.UpdateSnapshotPolicyTask task =
new TransportPutSnapshotLifecycleAction.UpdateSnapshotPolicyTask(request);

state = task.execute(state);
}

Set<String> entities = requests.stream().map(r -> r.getLifecycle().getId()).collect(Collectors.toSet());

Set<String> toDelete = new HashSet<>(prevState.keys());
toDelete.removeAll(entities);

for (var policyToDelete : toDelete) {
var task = new TransportDeleteSnapshotLifecycleAction.DeleteSnapshotPolicyTask(policyToDelete);
state = task.execute(state);
}

return new TransformState(state, entities);
}

@Override
public List<SnapshotLifecyclePolicy> fromXContent(XContentParser parser) throws IOException {
List<SnapshotLifecyclePolicy> result = new ArrayList<>();

Map<String, ?> source = parser.map();

for (String name : source.keySet()) {
@SuppressWarnings("unchecked")
Map<String, ?> content = (Map<String, ?>) source.get(name);
try (XContentParser policyParser = mapToXContentParser(XContentParserConfiguration.EMPTY, content)) {
result.add(SnapshotLifecyclePolicy.parse(policyParser, name));
}
}

return result;
}

@Override
public Collection<String> optionalDependencies() {
return List.of("snapshot_repositories");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -29,6 +30,8 @@
import org.elasticsearch.xpack.core.slm.action.DeleteSnapshotLifecycleAction;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class TransportDeleteSnapshotLifecycleAction extends TransportMasterNodeAction<
Expand Down Expand Up @@ -63,48 +66,72 @@ protected void masterOperation(
ClusterState state,
ActionListener<DeleteSnapshotLifecycleAction.Response> listener
) throws Exception {
submitUnbatchedTask("delete-snapshot-lifecycle-" + request.getLifecycleId(), new AckedClusterStateUpdateTask(request, listener) {
submitUnbatchedTask("delete-snapshot-lifecycle-" + request.getLifecycleId(), new DeleteSnapshotPolicyTask(request, listener) {
@Override
protected DeleteSnapshotLifecycleAction.Response newResponse(boolean acknowledged) {
return new DeleteSnapshotLifecycleAction.Response(acknowledged);
}
});
}

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotLifecycleMetadata snapMeta = currentState.metadata().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta == null) {
throw new ResourceNotFoundException("snapshot lifecycle policy not found: {}", request.getLifecycleId());
}
// Check that the policy exists in the first place
snapMeta.getSnapshotConfigurations()
.entrySet()
.stream()
.filter(e -> e.getValue().getPolicy().getId().equals(request.getLifecycleId()))
.findAny()
.orElseThrow(() -> new ResourceNotFoundException("snapshot lifecycle policy not found: {}", request.getLifecycleId()));

Map<String, SnapshotLifecyclePolicyMetadata> newConfigs = snapMeta.getSnapshotConfigurations()
.entrySet()
.stream()
.filter(e -> e.getKey().equals(request.getLifecycleId()) == false)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Metadata metadata = currentState.metadata();
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(metadata)
.putCustom(
SnapshotLifecycleMetadata.TYPE,
new SnapshotLifecycleMetadata(
newConfigs,
snapMeta.getOperationMode(),
snapMeta.getStats().removePolicy(request.getLifecycleId())
)
)
)
.build();
/**
* Extracted extension of {@link AckedClusterStateUpdateTask} with only the execute method
* implementation, so that the execute() transformation can be reused for {@link ReservedSnapshotAction}
*/
public static class DeleteSnapshotPolicyTask extends AckedClusterStateUpdateTask {
private final DeleteSnapshotLifecycleAction.Request request;

DeleteSnapshotPolicyTask(
DeleteSnapshotLifecycleAction.Request request,
ActionListener<DeleteSnapshotLifecycleAction.Response> listener
) {
super(request, listener);
this.request = request;
}

/**
* Used by the {@link ReservedClusterStateHandler} for SLM
* {@link ReservedSnapshotAction}
*/
DeleteSnapshotPolicyTask(String policyId) {
this(new DeleteSnapshotLifecycleAction.Request(policyId), null);
}

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotLifecycleMetadata snapMeta = currentState.metadata().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta == null) {
throw new ResourceNotFoundException("snapshot lifecycle policy not found: {}", request.getLifecycleId());
}
});
// Check that the policy exists in the first place
snapMeta.getSnapshotConfigurations()
.entrySet()
.stream()
.filter(e -> e.getValue().getPolicy().getId().equals(request.getLifecycleId()))
.findAny()
.orElseThrow(() -> new ResourceNotFoundException("snapshot lifecycle policy not found: {}", request.getLifecycleId()));

Map<String, SnapshotLifecyclePolicyMetadata> newConfigs = snapMeta.getSnapshotConfigurations()
.entrySet()
.stream()
.filter(e -> e.getKey().equals(request.getLifecycleId()) == false)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Metadata metadata = currentState.metadata();
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(metadata)
.putCustom(
SnapshotLifecycleMetadata.TYPE,
new SnapshotLifecycleMetadata(
newConfigs,
snapMeta.getOperationMode(),
snapMeta.getStats().removePolicy(request.getLifecycleId())
)
)
)
.build();
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
Expand All @@ -116,4 +143,14 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
protected ClusterBlockException checkBlock(DeleteSnapshotLifecycleAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected Optional<String> reservedStateHandlerName() {
return Optional.of(ReservedSnapshotAction.NAME);
}

@Override
protected Set<String> modifiedKeys(DeleteSnapshotLifecycleAction.Request request) {
return Set.of(request.getLifecycleId());
}
}

0 comments on commit f0e498b

Please sign in to comment.