Skip to content

Commit

Permalink
Indexify CCR internal actions for RCS 2.0 (#95264)
Browse files Browse the repository at this point in the history
This PR adds CCR support for RCS 2.0 by pairing its internal actions
with indices actions which are safe to use across different clusters.
  • Loading branch information
ywangd committed Apr 28, 2023
1 parent 8fa9550 commit a2c9c0b
Show file tree
Hide file tree
Showing 28 changed files with 1,689 additions and 89 deletions.
Expand Up @@ -345,8 +345,8 @@ public void testIndividualActionsTimeout() throws Exception {
MockTransportService mockTransportService = (MockTransportService) transportService;
transportServices.add(mockTransportService);
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false
&& action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) {
if (action.equals(GetCcrRestoreFileChunkAction.INTERNAL_NAME) == false
&& action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.INTERNAL_NAME)) == false) {
connection.sendRequest(requestId, action, request, options);
}
});
Expand Down Expand Up @@ -454,7 +454,7 @@ public void testFollowerMappingIsUpdated() throws IOException {
MockTransportService mockTransportService = (MockTransportService) transportService;
transportServices.add(mockTransportService);
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PutCcrRestoreSessionAction.NAME)) {
if (action.equals(PutCcrRestoreSessionAction.INTERNAL_NAME)) {
updateMappings.run();
connection.sendRequest(requestId, action, request, options);
} else {
Expand Down
Expand Up @@ -235,8 +235,8 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
senderNode.getName()
);
senderTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (ClearCcrRestoreSessionAction.NAME.equals(action)
|| TransportActionProxy.getProxyAction(ClearCcrRestoreSessionAction.NAME).equals(action)) {
if (ClearCcrRestoreSessionAction.INTERNAL_NAME.equals(action)
|| TransportActionProxy.getProxyAction(ClearCcrRestoreSessionAction.INTERNAL_NAME).equals(action)) {
try {
latch.await();
} catch (final InterruptedException e) {
Expand Down
Expand Up @@ -244,15 +244,12 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
DeleteInternalCcrRepositoryAction.INSTANCE,
DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class
),
new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE, PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
new ActionHandler<>(
ClearCcrRestoreSessionAction.INSTANCE,
ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class
),
new ActionHandler<>(
GetCcrRestoreFileChunkAction.INSTANCE,
GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class
),
new ActionHandler<>(PutCcrRestoreSessionAction.INTERNAL_INSTANCE, PutCcrRestoreSessionAction.InternalTransportAction.class),
new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE, PutCcrRestoreSessionAction.TransportAction.class),
new ActionHandler<>(ClearCcrRestoreSessionAction.INTERNAL_INSTANCE, ClearCcrRestoreSessionAction.InternalTransportAction.class),
new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE, ClearCcrRestoreSessionAction.TransportAction.class),
new ActionHandler<>(GetCcrRestoreFileChunkAction.INTERNAL_INSTANCE, GetCcrRestoreFileChunkAction.InternalTransportAction.class),
new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE, GetCcrRestoreFileChunkAction.TransportAction.class),
// stats action
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
Expand Down
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
Expand All @@ -21,34 +22,69 @@

public class ClearCcrRestoreSessionAction extends ActionType<ActionResponse.Empty> {

public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
public static final String NAME = "internal:admin/ccr/restore/session/clear";
public static final ClearCcrRestoreSessionAction INTERNAL_INSTANCE = new ClearCcrRestoreSessionAction();
public static final String INTERNAL_NAME = "internal:admin/ccr/restore/session/clear";
public static final String NAME = "indices:internal/admin/ccr/restore/session/clear";
public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(NAME);

private ClearCcrRestoreSessionAction() {
super(NAME, in -> ActionResponse.Empty.INSTANCE);
this(INTERNAL_NAME);
}

public static class TransportDeleteCcrRestoreSessionAction extends HandledTransportAction<
private ClearCcrRestoreSessionAction(String name) {
super(name, in -> ActionResponse.Empty.INSTANCE);
}

abstract static class TransportDeleteCcrRestoreSessionAction extends HandledTransportAction<
ClearCcrRestoreSessionRequest,
ActionResponse.Empty> {

private final CcrRestoreSourceService ccrRestoreService;
protected final CcrRestoreSourceService ccrRestoreService;

@Inject
public TransportDeleteCcrRestoreSessionAction(
private TransportDeleteCcrRestoreSessionAction(
String actionName,
ActionFilters actionFilters,
TransportService transportService,
CcrRestoreSourceService ccrRestoreService
) {
super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, NAME, false, in -> ActionResponse.Empty.INSTANCE);
super(actionName, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, actionName, false, in -> ActionResponse.Empty.INSTANCE);
this.ccrRestoreService = ccrRestoreService;
}

@Override
protected void doExecute(Task task, ClearCcrRestoreSessionRequest request, ActionListener<ActionResponse.Empty> listener) {
validate(request);
ccrRestoreService.closeSession(request.getSessionUUID());
listener.onResponse(ActionResponse.Empty.INSTANCE);
}

// We don't enforce any validation by default so that the internal action stays the same for BWC reasons
protected void validate(ClearCcrRestoreSessionRequest request) {}
}

public static class InternalTransportAction extends TransportDeleteCcrRestoreSessionAction {
@Inject
public InternalTransportAction(
ActionFilters actionFilters,
TransportService transportService,
CcrRestoreSourceService ccrRestoreService
) {
super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService);
}
}

public static class TransportAction extends TransportDeleteCcrRestoreSessionAction {
@Inject
public TransportAction(ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, actionFilters, transportService, ccrRestoreService);
}

@Override
protected void validate(ClearCcrRestoreSessionRequest request) {
final ShardId shardId = request.getShardId();
assert shardId != null : "shardId must be specified for the request";
ccrRestoreService.ensureSessionShardIdConsistency(request.getSessionUUID(), shardId);
}
}
}
Expand Up @@ -7,28 +7,39 @@

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.RemoteClusterAwareRequest;

import java.io.IOException;

public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest {
public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest, IndicesRequest {

private DiscoveryNode node;
private final String sessionUUID;
private final ShardId shardId;

ClearCcrRestoreSessionRequest(StreamInput in) throws IOException {
super(in);
sessionUUID = in.readString();
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_9_0)) {
shardId = new ShardId(in);
} else {
shardId = null;
}
}

public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node) {
public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node, ShardId shardId) {
this.sessionUUID = sessionUUID;
this.node = node;
this.shardId = shardId;
}

@Override
Expand All @@ -40,14 +51,35 @@ public ActionRequestValidationException validate() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_9_0)) {
shardId.writeTo(out);
}
}

String getSessionUUID() {
return sessionUUID;
}

ShardId getShardId() {
return shardId;
}

@Override
public DiscoveryNode getPreferredTargetNode() {
return node;
}

@Override
public String[] indices() {
if (shardId == null) {
return null;
} else {
return new String[] { shardId.getIndexName() };
}
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
}
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
Expand All @@ -29,29 +30,35 @@

public class GetCcrRestoreFileChunkAction extends ActionType<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {

public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction();
public static final String NAME = "internal:admin/ccr/restore/file_chunk/get";
public static final GetCcrRestoreFileChunkAction INTERNAL_INSTANCE = new GetCcrRestoreFileChunkAction();
public static final String INTERNAL_NAME = "internal:admin/ccr/restore/file_chunk/get";
public static final String NAME = "indices:internal/admin/ccr/restore/file_chunk/get";
public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction(NAME);

private GetCcrRestoreFileChunkAction() {
super(NAME, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse::new);
this(INTERNAL_NAME);
}

public static class TransportGetCcrRestoreFileChunkAction extends HandledTransportAction<
private GetCcrRestoreFileChunkAction(String name) {
super(name, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse::new);
}

abstract static class TransportGetCcrRestoreFileChunkAction extends HandledTransportAction<
GetCcrRestoreFileChunkRequest,
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {

private final CcrRestoreSourceService restoreSourceService;
protected final CcrRestoreSourceService restoreSourceService;
private final BigArrays bigArrays;

@Inject
public TransportGetCcrRestoreFileChunkAction(
private TransportGetCcrRestoreFileChunkAction(
String actionName,
BigArrays bigArrays,
TransportService transportService,
ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService
) {
super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, NAME, false, GetCcrRestoreFileChunkResponse::new);
super(actionName, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, actionName, false, GetCcrRestoreFileChunkResponse::new);
this.restoreSourceService = restoreSourceService;
this.bigArrays = bigArrays;
}
Expand All @@ -62,6 +69,7 @@ protected void doExecute(
GetCcrRestoreFileChunkRequest request,
ActionListener<GetCcrRestoreFileChunkResponse> listener
) {
validate(request);
int bytesRequested = request.getSize();
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
String fileName = request.getFileName();
Expand All @@ -77,6 +85,41 @@ protected void doExecute(
listener.onFailure(e);
}
}

// We don't enforce any validation by default so that the internal action stays the same for BWC reasons
protected void validate(GetCcrRestoreFileChunkRequest request) {}
}

public static class InternalTransportAction extends TransportGetCcrRestoreFileChunkAction {
@Inject
public InternalTransportAction(
BigArrays bigArrays,
TransportService transportService,
ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService
) {
super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService);
}
}

public static class TransportAction extends TransportGetCcrRestoreFileChunkAction {
@Inject
public TransportAction(
BigArrays bigArrays,
TransportService transportService,
ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService
) {
super(NAME, bigArrays, transportService, actionFilters, restoreSourceService);
}

@Override
protected void validate(GetCcrRestoreFileChunkRequest request) {
final ShardId shardId = request.getShardId();
assert shardId != null : "shardId must be specified for the request";
restoreSourceService.ensureSessionShardIdConsistency(request.getSessionUUID(), shardId);
restoreSourceService.ensureFileNameIsKnownToSession(request.getSessionUUID(), request.getFileName());
}
}

public static class GetCcrRestoreFileChunkResponse extends ActionResponse {
Expand Down

0 comments on commit a2c9c0b

Please sign in to comment.