Skip to content

Commit

Permalink
[Transform] Improve robustness of checkpointing (#80984)
Browse files Browse the repository at this point in the history
rewrites checkpointing as internal actions, reducing several sub-calls to
only 1 per data node that has at least 1 primary shard of the indexes of
interest.

Robustness: The current checkpointing sends a request to every shard
 - primary and replica - and collects the results. If 1 request fails, even 
for a replica, checkpointing fails. See #75780 for details.

Performance: The current checkpointing is wasteful, it uses get index 
and get index stats which results in a lot more calls and executes a 
lot more code which produces results we are not interested in.

Number of requests before and after:
before: 1 + #shards * #indices * (#replicas + 1)
after: #data_nodes_holding_gt1_shard

Fixes #75780
  • Loading branch information
Hendrik Muhs committed Feb 17, 2022
1 parent 6121477 commit 48e562a
Show file tree
Hide file tree
Showing 18 changed files with 1,401 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.security.support.Automatons;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -99,7 +100,8 @@ public final class IndexPrivilege extends Privilege {
GetDataStreamAction.NAME,
ResolveIndexAction.NAME,
FieldCapabilitiesAction.NAME + "*",
GetRollupIndexCapsAction.NAME + "*"
GetRollupIndexCapsAction.NAME + "*",
GetCheckpointAction.NAME + "*" // transform internal action
);
private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(
PutFollowAction.NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

/**
* Transform internal API (no REST layer) to retrieve index checkpoints.
*/
public class GetCheckpointAction extends ActionType<GetCheckpointAction.Response> {

public static final GetCheckpointAction INSTANCE = new GetCheckpointAction();

// note: this is an index action and requires `view_index_metadata`
public static final String NAME = "indices:internal/transform/checkpoint";

private GetCheckpointAction() {
super(NAME, GetCheckpointAction.Response::new);
}

public static class Request extends ActionRequest implements IndicesRequest.Replaceable {

private String[] indices;
private final IndicesOptions indicesOptions;

public Request(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

public Request(String[] indices, IndicesOptions indicesOptions) {
this.indices = indices != null ? indices : Strings.EMPTY_ARRAY;
this.indicesOptions = indicesOptions;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request that = (Request) obj;

return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}

@Override
public IndicesRequest indices(String... indices) {
this.indices = indices;
return this;
}

// this action does not allow remote indices, but they have to be resolved upfront, see {@link DefaultCheckpointProvider}
@Override
public boolean allowsRemoteIndices() {
return false;
}
}

public static class Response extends ActionResponse {

private final Map<String, long[]> checkpoints;

public Response(Map<String, long[]> checkpoints) {
this.checkpoints = checkpoints;
}

public Response(StreamInput in) throws IOException {
this.checkpoints = in.readOrderedMap(StreamInput::readString, StreamInput::readLongArray);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(getCheckpoints(), StreamOutput::writeString, StreamOutput::writeLongArray);
}

public Map<String, long[]> getCheckpoints() {
return Collections.unmodifiableMap(checkpoints);
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Response that = (Response) obj;

return this.checkpoints.size() == that.checkpoints.size()
&& this.checkpoints.entrySet().stream().allMatch(e -> Arrays.equals(e.getValue(), that.checkpoints.get(e.getKey())));
}

@Override
public int hashCode() {
int hash = 1;

for (Entry<String, long[]> e : checkpoints.entrySet()) {
hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue()));
}

return hash;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;

public class GetCheckpointNodeAction extends ActionType<GetCheckpointNodeAction.Response> {

public static final GetCheckpointNodeAction INSTANCE = new GetCheckpointNodeAction();

// note: this is an index action and requires `view_index_metadata`
public static final String NAME = GetCheckpointAction.NAME + "[n]";

private GetCheckpointNodeAction() {
super(NAME, GetCheckpointNodeAction.Response::new);
}

public static class Response extends ActionResponse {
private final Map<String, long[]> checkpoints;

public Response(Map<String, long[]> checkpoints) {
this.checkpoints = checkpoints;
}

public Response(StreamInput in) throws IOException {
this.checkpoints = in.readOrderedMap(StreamInput::readString, StreamInput::readLongArray);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(getCheckpoints(), StreamOutput::writeString, StreamOutput::writeLongArray);
}

public Map<String, long[]> getCheckpoints() {
return checkpoints;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Response that = (Response) obj;

return this.checkpoints.size() == that.checkpoints.size()
&& this.checkpoints.entrySet().stream().allMatch(e -> Arrays.equals(e.getValue(), that.checkpoints.get(e.getKey())));
}

@Override
public int hashCode() {
int hash = 1;

for (Entry<String, long[]> e : checkpoints.entrySet()) {
hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue()));
}

return hash;
}
}

public static class Request extends ActionRequest implements IndicesRequest {

private final Set<ShardId> shards;
private final OriginalIndices originalIndices;

public Request(Set<ShardId> shards, OriginalIndices originalIndices) {
this.shards = shards;
this.originalIndices = originalIndices;
}

public Request(StreamInput in) throws IOException {
super(in);
this.shards = Collections.unmodifiableSet(in.readSet(ShardId::new));
this.originalIndices = OriginalIndices.readOriginalIndices(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeCollection(shards);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}

public Set<ShardId> getShards() {
return shards;
}

public OriginalIndices getOriginalIndices() {
return originalIndices;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request that = (Request) obj;

return Objects.equals(shards, that.shards) && Objects.equals(originalIndices, that.originalIndices);
}

@Override
public int hashCode() {
return Objects.hash(shards, originalIndices);
}

@Override
public String[] indices() {
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;

public class GetCheckpointActionRequestTests extends AbstractWireSerializingTestCase<Request> {

@Override
protected Request createTestInstance() {
return new Request(
randomBoolean() ? null : generateRandomStringArray(10, 10, false, false),
IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
SearchRequest.DEFAULT_INDICES_OPTIONS
)
);
}

@Override
protected Reader<Request> instanceReader() {
return Request::new;
}

@Override
protected Request mutateInstance(Request instance) throws IOException {
List<String> indices = instance.indices() != null ? new ArrayList<>(Arrays.asList(instance.indices())) : new ArrayList<>();
IndicesOptions indicesOptions = instance.indicesOptions();

switch (between(0, 1)) {
case 0:
indices.add(randomAlphaOfLengthBetween(1, 20));
break;
case 1:
indicesOptions = IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(instance.indicesOptions().ignoreUnavailable() == false),
Boolean.toString(instance.indicesOptions().allowNoIndices() == false),
Boolean.toString(instance.indicesOptions().ignoreThrottled() == false),
SearchRequest.DEFAULT_INDICES_OPTIONS
);
break;
default:
throw new AssertionError("Illegal randomization branch");
}

return new Request(indices.toArray(new String[0]), indicesOptions);
}
}

0 comments on commit 48e562a

Please sign in to comment.