diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 0b53f29122bc6..3edfadc4fb21e 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -345,8 +345,8 @@ public void testIndividualActionsTimeout() throws Exception { MockTransportService mockTransportService = (MockTransportService) transportService; transportServices.add(mockTransportService); mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false - && action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) { + if (action.equals(GetCcrRestoreFileChunkAction.INTERNAL_NAME) == false + && action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.INTERNAL_NAME)) == false) { connection.sendRequest(requestId, action, request, options); } }); @@ -454,7 +454,7 @@ public void testFollowerMappingIsUpdated() throws IOException { MockTransportService mockTransportService = (MockTransportService) transportService; transportServices.add(mockTransportService); mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.equals(PutCcrRestoreSessionAction.NAME)) { + if (action.equals(PutCcrRestoreSessionAction.INTERNAL_NAME)) { updateMappings.run(); connection.sendRequest(requestId, action, request, options); } else { diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index c9c03ee527c4d..5a0ae0f9d3db2 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -235,8 +235,8 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { senderNode.getName() ); senderTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (ClearCcrRestoreSessionAction.NAME.equals(action) - || TransportActionProxy.getProxyAction(ClearCcrRestoreSessionAction.NAME).equals(action)) { + if (ClearCcrRestoreSessionAction.INTERNAL_NAME.equals(action) + || TransportActionProxy.getProxyAction(ClearCcrRestoreSessionAction.INTERNAL_NAME).equals(action)) { try { latch.await(); } catch (final InterruptedException e) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index cbc40e0e21ea2..ec504ce6c96dc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -244,15 +244,12 @@ public List> getPersistentTasksExecutor( DeleteInternalCcrRepositoryAction.INSTANCE, DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class ), - new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE, PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class), - new ActionHandler<>( - ClearCcrRestoreSessionAction.INSTANCE, - ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class - ), - new ActionHandler<>( - GetCcrRestoreFileChunkAction.INSTANCE, - GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class - ), + new ActionHandler<>(PutCcrRestoreSessionAction.INTERNAL_INSTANCE, PutCcrRestoreSessionAction.InternalTransportAction.class), + new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE, PutCcrRestoreSessionAction.TransportAction.class), + new ActionHandler<>(ClearCcrRestoreSessionAction.INTERNAL_INSTANCE, ClearCcrRestoreSessionAction.InternalTransportAction.class), + new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE, ClearCcrRestoreSessionAction.TransportAction.class), + new ActionHandler<>(GetCcrRestoreFileChunkAction.INTERNAL_INSTANCE, GetCcrRestoreFileChunkAction.InternalTransportAction.class), + new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE, GetCcrRestoreFileChunkAction.TransportAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 547e01398391d..e13f6fd5cac80 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportActionProxy; @@ -21,34 +22,69 @@ public class ClearCcrRestoreSessionAction extends ActionType { - public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(); - public static final String NAME = "internal:admin/ccr/restore/session/clear"; + public static final ClearCcrRestoreSessionAction INTERNAL_INSTANCE = new ClearCcrRestoreSessionAction(); + public static final String INTERNAL_NAME = "internal:admin/ccr/restore/session/clear"; + public static final String NAME = "indices:internal/admin/ccr/restore/session/clear"; + public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(NAME); private ClearCcrRestoreSessionAction() { - super(NAME, in -> ActionResponse.Empty.INSTANCE); + this(INTERNAL_NAME); } - public static class TransportDeleteCcrRestoreSessionAction extends HandledTransportAction< + private ClearCcrRestoreSessionAction(String name) { + super(name, in -> ActionResponse.Empty.INSTANCE); + } + + abstract static class TransportDeleteCcrRestoreSessionAction extends HandledTransportAction< ClearCcrRestoreSessionRequest, ActionResponse.Empty> { - private final CcrRestoreSourceService ccrRestoreService; + protected final CcrRestoreSourceService ccrRestoreService; - @Inject - public TransportDeleteCcrRestoreSessionAction( + private TransportDeleteCcrRestoreSessionAction( + String actionName, ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService ) { - super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ThreadPool.Names.GENERIC); - TransportActionProxy.registerProxyAction(transportService, NAME, false, in -> ActionResponse.Empty.INSTANCE); + super(actionName, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ThreadPool.Names.GENERIC); + TransportActionProxy.registerProxyAction(transportService, actionName, false, in -> ActionResponse.Empty.INSTANCE); this.ccrRestoreService = ccrRestoreService; } @Override protected void doExecute(Task task, ClearCcrRestoreSessionRequest request, ActionListener listener) { + validate(request); ccrRestoreService.closeSession(request.getSessionUUID()); listener.onResponse(ActionResponse.Empty.INSTANCE); } + + // We don't enforce any validation by default so that the internal action stays the same for BWC reasons + protected void validate(ClearCcrRestoreSessionRequest request) {} + } + + public static class InternalTransportAction extends TransportDeleteCcrRestoreSessionAction { + @Inject + public InternalTransportAction( + ActionFilters actionFilters, + TransportService transportService, + CcrRestoreSourceService ccrRestoreService + ) { + super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService); + } + } + + public static class TransportAction extends TransportDeleteCcrRestoreSessionAction { + @Inject + public TransportAction(ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService) { + super(NAME, actionFilters, transportService, ccrRestoreService); + } + + @Override + protected void validate(ClearCcrRestoreSessionRequest request) { + final ShardId shardId = request.getShardId(); + assert shardId != null : "shardId must be specified for the request"; + ccrRestoreService.ensureSessionShardIdConsistency(request.getSessionUUID(), shardId); + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index 14ce8062a5334..3513a6eab2bb5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -7,28 +7,39 @@ package org.elasticsearch.xpack.ccr.action.repositories; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.RemoteClusterAwareRequest; import java.io.IOException; -public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest { +public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest, IndicesRequest { private DiscoveryNode node; private final String sessionUUID; + private final ShardId shardId; ClearCcrRestoreSessionRequest(StreamInput in) throws IOException { super(in); sessionUUID = in.readString(); + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_9_0)) { + shardId = new ShardId(in); + } else { + shardId = null; + } } - public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node) { + public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node, ShardId shardId) { this.sessionUUID = sessionUUID; this.node = node; + this.shardId = shardId; } @Override @@ -40,14 +51,35 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(sessionUUID); + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_9_0)) { + shardId.writeTo(out); + } } String getSessionUUID() { return sessionUUID; } + ShardId getShardId() { + return shardId; + } + @Override public DiscoveryNode getPreferredTargetNode() { return node; } + + @Override + public String[] indices() { + if (shardId == null) { + return null; + } else { + return new String[] { shardId.getIndexName() }; + } + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index 2698a3dfccb6a..be61026a0e1d2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportActionProxy; @@ -29,29 +30,35 @@ public class GetCcrRestoreFileChunkAction extends ActionType { - public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction(); - public static final String NAME = "internal:admin/ccr/restore/file_chunk/get"; + public static final GetCcrRestoreFileChunkAction INTERNAL_INSTANCE = new GetCcrRestoreFileChunkAction(); + public static final String INTERNAL_NAME = "internal:admin/ccr/restore/file_chunk/get"; + public static final String NAME = "indices:internal/admin/ccr/restore/file_chunk/get"; + public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction(NAME); private GetCcrRestoreFileChunkAction() { - super(NAME, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse::new); + this(INTERNAL_NAME); } - public static class TransportGetCcrRestoreFileChunkAction extends HandledTransportAction< + private GetCcrRestoreFileChunkAction(String name) { + super(name, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse::new); + } + + abstract static class TransportGetCcrRestoreFileChunkAction extends HandledTransportAction< GetCcrRestoreFileChunkRequest, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> { - private final CcrRestoreSourceService restoreSourceService; + protected final CcrRestoreSourceService restoreSourceService; private final BigArrays bigArrays; - @Inject - public TransportGetCcrRestoreFileChunkAction( + private TransportGetCcrRestoreFileChunkAction( + String actionName, BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, CcrRestoreSourceService restoreSourceService ) { - super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC); - TransportActionProxy.registerProxyAction(transportService, NAME, false, GetCcrRestoreFileChunkResponse::new); + super(actionName, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC); + TransportActionProxy.registerProxyAction(transportService, actionName, false, GetCcrRestoreFileChunkResponse::new); this.restoreSourceService = restoreSourceService; this.bigArrays = bigArrays; } @@ -62,6 +69,7 @@ protected void doExecute( GetCcrRestoreFileChunkRequest request, ActionListener listener ) { + validate(request); int bytesRequested = request.getSize(); ByteArray array = bigArrays.newByteArray(bytesRequested, false); String fileName = request.getFileName(); @@ -77,6 +85,41 @@ protected void doExecute( listener.onFailure(e); } } + + // We don't enforce any validation by default so that the internal action stays the same for BWC reasons + protected void validate(GetCcrRestoreFileChunkRequest request) {} + } + + public static class InternalTransportAction extends TransportGetCcrRestoreFileChunkAction { + @Inject + public InternalTransportAction( + BigArrays bigArrays, + TransportService transportService, + ActionFilters actionFilters, + CcrRestoreSourceService restoreSourceService + ) { + super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService); + } + } + + public static class TransportAction extends TransportGetCcrRestoreFileChunkAction { + @Inject + public TransportAction( + BigArrays bigArrays, + TransportService transportService, + ActionFilters actionFilters, + CcrRestoreSourceService restoreSourceService + ) { + super(NAME, bigArrays, transportService, actionFilters, restoreSourceService); + } + + @Override + protected void validate(GetCcrRestoreFileChunkRequest request) { + final ShardId shardId = request.getShardId(); + assert shardId != null : "shardId must be specified for the request"; + restoreSourceService.ensureSessionShardIdConsistency(request.getSessionUUID(), shardId); + restoreSourceService.ensureFileNameIsKnownToSession(request.getSessionUUID(), request.getFileName()); + } } public static class GetCcrRestoreFileChunkResponse extends ActionResponse { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java index 6fd372bc13e0f..cfd062675e586 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java @@ -7,33 +7,39 @@ package org.elasticsearch.xpack.ccr.action.repositories; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.RemoteClusterAwareRequest; import java.io.IOException; -public class GetCcrRestoreFileChunkRequest extends ActionRequest implements RemoteClusterAwareRequest { +public class GetCcrRestoreFileChunkRequest extends ActionRequest implements RemoteClusterAwareRequest, IndicesRequest { private final DiscoveryNode node; private final String sessionUUID; private final String fileName; private final int size; + private final ShardId shardId; @Override public ActionRequestValidationException validate() { return null; } - public GetCcrRestoreFileChunkRequest(DiscoveryNode node, String sessionUUID, String fileName, int size) { + public GetCcrRestoreFileChunkRequest(DiscoveryNode node, String sessionUUID, String fileName, int size, ShardId shardId) { this.node = node; this.sessionUUID = sessionUUID; this.fileName = fileName; this.size = size; assert size > -1 : "The file chunk request size must be positive. Found: [" + size + "]."; + this.shardId = shardId; } GetCcrRestoreFileChunkRequest(StreamInput in) throws IOException { @@ -42,6 +48,11 @@ public GetCcrRestoreFileChunkRequest(DiscoveryNode node, String sessionUUID, Str sessionUUID = in.readString(); fileName = in.readString(); size = in.readVInt(); + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_9_0)) { + shardId = new ShardId(in); + } else { + shardId = null; + } } @Override @@ -50,6 +61,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(sessionUUID); out.writeString(fileName); out.writeVInt(size); + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_9_0)) { + shardId.writeTo(out); + } } String getSessionUUID() { @@ -64,9 +78,27 @@ int getSize() { return size; } + ShardId getShardId() { + return shardId; + } + @Override public DiscoveryNode getPreferredTargetNode() { assert node != null : "Target node is null"; return node; } + + @Override + public String[] indices() { + if (shardId == null) { + return null; + } else { + return new String[] { shardId.getIndexName() }; + } + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 2218726252051..0e50d18143f2f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -33,22 +33,28 @@ public class PutCcrRestoreSessionAction extends ActionType { - public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction(); - public static final String NAME = "internal:admin/ccr/restore/session/put"; + public static final PutCcrRestoreSessionAction INTERNAL_INSTANCE = new PutCcrRestoreSessionAction(); + public static final String INTERNAL_NAME = "internal:admin/ccr/restore/session/put"; + public static final String NAME = "indices:internal/admin/ccr/restore/session/put"; + public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction(NAME); private PutCcrRestoreSessionAction() { - super(NAME, PutCcrRestoreSessionResponse::new); + super(INTERNAL_NAME, PutCcrRestoreSessionResponse::new); } - public static class TransportPutCcrRestoreSessionAction extends TransportSingleShardAction< + private PutCcrRestoreSessionAction(String name) { + super(name, PutCcrRestoreSessionResponse::new); + } + + abstract static class TransportPutCcrRestoreSessionAction extends TransportSingleShardAction< PutCcrRestoreSessionRequest, PutCcrRestoreSessionResponse> { private final IndicesService indicesService; private final CcrRestoreSourceService ccrRestoreService; - @Inject - public TransportPutCcrRestoreSessionAction( + private TransportPutCcrRestoreSessionAction( + String actionName, ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, @@ -58,7 +64,7 @@ public TransportPutCcrRestoreSessionAction( CcrRestoreSourceService ccrRestoreService ) { super( - NAME, + actionName, threadPool, clusterService, transportService, @@ -99,6 +105,36 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { } } + public static class InternalTransportAction extends TransportPutCcrRestoreSessionAction { + @Inject + public InternalTransportAction( + ThreadPool threadPool, + ClusterService clusterService, + ActionFilters actionFilters, + IndexNameExpressionResolver resolver, + TransportService transportService, + IndicesService indicesService, + CcrRestoreSourceService ccrRestoreService + ) { + super(INTERNAL_NAME, threadPool, clusterService, actionFilters, resolver, transportService, indicesService, ccrRestoreService); + } + } + + public static class TransportAction extends TransportPutCcrRestoreSessionAction { + @Inject + public TransportAction( + ThreadPool threadPool, + ClusterService clusterService, + ActionFilters actionFilters, + IndexNameExpressionResolver resolver, + TransportService transportService, + IndicesService indicesService, + CcrRestoreSourceService ccrRestoreService + ) { + super(NAME, threadPool, clusterService, actionFilters, resolver, transportService, indicesService, ccrRestoreService); + } + } + public static class PutCcrRestoreSessionResponse extends ActionResponse { private final DiscoveryNode node; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index df7ad846ec0aa..64a4843573ce2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -565,7 +565,7 @@ RestoreSession openSession( ) { String sessionUUID = UUIDs.randomBase64UUID(); PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute( - PutCcrRestoreSessionAction.INSTANCE, + PutCcrRestoreSessionAction.INTERNAL_INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId) ).actionGet(ccrSettings.getRecoveryActionTimeout()); return new RestoreSession( @@ -579,7 +579,8 @@ RestoreSession openSession( response.getMappingVersion(), threadPool, ccrSettings, - throttledTime::inc + throttledTime::inc, + leaderShardId ); } @@ -593,6 +594,7 @@ private static class RestoreSession extends FileRestoreContext implements Closea private final CcrSettings ccrSettings; private final LongConsumer throttleListener; private final ThreadPool threadPool; + private final ShardId leaderShardId; RestoreSession( String repositoryName, @@ -605,7 +607,8 @@ private static class RestoreSession extends FileRestoreContext implements Closea long mappingVersion, ThreadPool threadPool, CcrSettings ccrSettings, - LongConsumer throttleListener + LongConsumer throttleListener, + ShardId leaderShardId ) { super(repositoryName, shardId, SNAPSHOT_ID, recoveryState); this.remoteClient = remoteClient; @@ -616,6 +619,8 @@ private static class RestoreSession extends FileRestoreContext implements Closea this.threadPool = threadPool; this.ccrSettings = ccrSettings; this.throttleListener = throttleListener; + this.leaderShardId = leaderShardId; + } void restoreFiles(Store store, ActionListener listener) { @@ -658,8 +663,8 @@ protected FileChunk nextChunkRequest(StoreFileMetadata md) { @Override protected void executeChunkRequest(FileChunk request, ActionListener listener) { remoteClient.execute( - GetCcrRestoreFileChunkAction.INSTANCE, - new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested), + GetCcrRestoreFileChunkAction.INTERNAL_INSTANCE, + new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested, leaderShardId), ListenerTimeouts.wrapWithTimeout(threadPool, new ActionListener<>() { @Override public void onResponse( @@ -691,7 +696,7 @@ public void onFailure(Exception e) { } }); } - }, ccrSettings.getRecoveryActionTimeout(), ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME) + }, ccrSettings.getRecoveryActionTimeout(), ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.INTERNAL_NAME) ); } @@ -742,8 +747,8 @@ public void close() { @Override public void close() { - ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); - ActionResponse.Empty response = remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest) + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node, leaderShardId); + ActionResponse.Empty response = remoteClient.execute(ClearCcrRestoreSessionAction.INTERNAL_INSTANCE, clearRequest) .actionGet(ccrSettings.getRecoveryActionTimeout()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index ceb71bda53f16..2442a8fb1c6af 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -42,6 +42,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.LongConsumer; @@ -103,7 +104,9 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index if (indexShard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed"); } - restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit(), scheduleTimeout(sessionUUID)); + final Engine.IndexCommitRef commitRef = indexShard.acquireSafeIndexCommit(); + final Set fileNames = Set.copyOf(commitRef.getIndexCommit().getFileNames()); + restore = new RestoreSession(sessionUUID, indexShard, commitRef, fileNames, scheduleTimeout(sessionUUID)); onGoingRestores.put(sessionUUID, restore); HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); sessions.add(sessionUUID); @@ -121,6 +124,32 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index } } + public void ensureSessionShardIdConsistency(String sessionUUID, ShardId shardId) { + final RestoreSession restore = onGoingRestores.get(sessionUUID); + if (restore == null) { + logger.debug("could not get session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } + final ShardId sessionShardId = restore.indexShard.shardId(); + if (false == sessionShardId.equals(shardId)) { + throw new IllegalArgumentException( + "session [" + sessionUUID + "] shardId [" + sessionShardId + "] does not match requested shardId [" + shardId + "]" + ); + } + } + + public void ensureFileNameIsKnownToSession(String sessionUUID, String fileName) { + final RestoreSession restore = onGoingRestores.get(sessionUUID); + if (restore == null) { + logger.debug("could not get session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } + // Ensure no file system traversal is possible by only allowing file names known to the restore session + if (false == restore.fileNames.contains(fileName)) { + throw new IllegalArgumentException("invalid file name [" + fileName + "]"); + } + } + public void closeSession(String sessionUUID) { internalCloseSession(sessionUUID, true); } @@ -185,6 +214,7 @@ private static class RestoreSession extends AbstractRefCounted { private final String sessionUUID; private final IndexShard indexShard; private final Engine.IndexCommitRef commitRef; + private final Set fileNames; private final Scheduler.Cancellable timeoutTask; private final KeyedLock keyedLock = new KeyedLock<>(); private final Map cachedInputs = new ConcurrentHashMap<>(); @@ -194,11 +224,13 @@ private RestoreSession( String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef, + Set fileNames, Scheduler.Cancellable timeoutTask ) { this.sessionUUID = sessionUUID; this.indexShard = indexShard; this.commitRef = commitRef; + this.fileNames = fileNames; this.timeoutTask = timeoutTask; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java new file mode 100644 index 0000000000000..95ab1c2323f7d --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; +import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper; +import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; +import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ClearCcrRestoreSessionActionTests extends ESTestCase { + + public void testPrivilegeForActions() { + // Indices action is granted by the all privilege + assertThat(IndexPrivilege.MANAGE.predicate().test(ClearCcrRestoreSessionAction.NAME), is(false)); + assertThat(IndexPrivilege.READ.predicate().test(ClearCcrRestoreSessionAction.NAME), is(false)); + assertThat(IndexPrivilege.ALL.predicate().test(ClearCcrRestoreSessionAction.NAME), is(true)); + + // Internal action is granted by neither regular indices nor cluster privileges + assertThat(IndexPrivilege.ALL.predicate().test(ClearCcrRestoreSessionAction.INTERNAL_NAME), is(false)); + assertThat( + ClusterPrivilegeResolver.ALL.permission() + .check( + ClearCcrRestoreSessionAction.INTERNAL_NAME, + mock(TransportRequest.class), + AuthenticationTestHelper.builder().build() + ), + is(false) + ); + } + + public void testActionNames() { + final ActionFilters actionFilters = mock(ActionFilters.class); + final TransportService transportService = mock(TransportService.class); + final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); + + final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + assertThat(action.actionName, equalTo(ClearCcrRestoreSessionAction.NAME)); + + final var internalAction = new ClearCcrRestoreSessionAction.InternalTransportAction( + actionFilters, + transportService, + ccrRestoreSourceService + ); + assertThat(internalAction.actionName, equalTo(ClearCcrRestoreSessionAction.INTERNAL_NAME)); + } + + public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + final ActionFilters actionFilters = mock(ActionFilters.class); + final TransportService transportService = mock(TransportService.class); + final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); + + final ShardId expectedShardId = mock(ShardId.class); + final String indexName = randomAlphaOfLengthBetween(3, 8); + when(expectedShardId.getIndexName()).thenReturn(indexName); + + final IllegalArgumentException e = new IllegalArgumentException("inconsistent shard"); + doAnswer(invocation -> { + final ShardId requestedShardId = invocation.getArgument(1); + if (expectedShardId != requestedShardId) { + throw e; + } else { + return null; + } + }).when(ccrRestoreSourceService).ensureSessionShardIdConsistency(anyString(), any()); + + final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + + final String sessionUUID = UUIDs.randomBase64UUID(); + + // 1. Requested ShardId is consistent + final var request1 = new ClearCcrRestoreSessionRequest(sessionUUID, mock(DiscoveryNode.class), expectedShardId); + assertThat(request1.indices(), arrayContaining(indexName)); + final PlainActionFuture future1 = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request1, future1); + assertThat(future1.actionGet(), is(ActionResponse.Empty.INSTANCE)); + + // 2. Inconsistent requested ShardId + final var request3 = new ClearCcrRestoreSessionRequest(sessionUUID, mock(DiscoveryNode.class), mock(ShardId.class)); + assertThat( + expectThrows(IllegalArgumentException.class, () -> action.doExecute(mock(Task.class), request3, new PlainActionFuture<>())), + is(e) + ); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java new file mode 100644 index 0000000000000..7357873bba0f2 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; +import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper; +import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; +import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetCcrRestoreFileChunkActionTests extends ESTestCase { + + public void testPrivilegeForActions() { + // Indices action is granted by the all privilege + assertThat(IndexPrivilege.MANAGE.predicate().test(GetCcrRestoreFileChunkAction.NAME), is(false)); + assertThat(IndexPrivilege.READ.predicate().test(GetCcrRestoreFileChunkAction.NAME), is(false)); + assertThat(IndexPrivilege.ALL.predicate().test(GetCcrRestoreFileChunkAction.NAME), is(true)); + + // Internal action is granted by neither regular indices nor cluster privileges + assertThat(IndexPrivilege.ALL.predicate().test(GetCcrRestoreFileChunkAction.INTERNAL_NAME), is(false)); + assertThat( + ClusterPrivilegeResolver.ALL.permission() + .check( + GetCcrRestoreFileChunkAction.INTERNAL_NAME, + mock(TransportRequest.class), + AuthenticationTestHelper.builder().build() + ), + is(false) + ); + } + + public void testActionNames() { + final ActionFilters actionFilters = mock(ActionFilters.class); + final BigArrays bigArrays = mock(BigArrays.class); + final TransportService transportService = mock(TransportService.class); + final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); + + final var action = new GetCcrRestoreFileChunkAction.TransportAction( + bigArrays, + transportService, + actionFilters, + ccrRestoreSourceService + ); + assertThat(action.actionName, equalTo(GetCcrRestoreFileChunkAction.NAME)); + + final var internalAction = new GetCcrRestoreFileChunkAction.InternalTransportAction( + bigArrays, + transportService, + actionFilters, + ccrRestoreSourceService + ); + assertThat(internalAction.actionName, equalTo(GetCcrRestoreFileChunkAction.INTERNAL_NAME)); + } + + public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + final ActionFilters actionFilters = mock(ActionFilters.class); + final BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), ByteSizeValue.ofBytes(1024)); + final TransportService transportService = mock(TransportService.class); + final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); + + final String sessionUUID = UUIDs.randomBase64UUID(); + final CcrRestoreSourceService.SessionReader sessionReader = mock(CcrRestoreSourceService.SessionReader.class); + when(ccrRestoreSourceService.getSessionReader(sessionUUID)).thenReturn(sessionReader); + + final ShardId expectedShardId = mock(ShardId.class); + final String indexName = randomAlphaOfLengthBetween(3, 8); + when(expectedShardId.getIndexName()).thenReturn(indexName); + + final IllegalArgumentException e = new IllegalArgumentException("inconsistent shard"); + doAnswer(invocation -> { + final ShardId requestedShardId = invocation.getArgument(1); + if (expectedShardId != requestedShardId) { + throw e; + } else { + return null; + } + }).when(ccrRestoreSourceService).ensureSessionShardIdConsistency(anyString(), any()); + + final var action = new GetCcrRestoreFileChunkAction.TransportAction( + bigArrays, + transportService, + actionFilters, + ccrRestoreSourceService + ); + + final String expectedFileName = randomAlphaOfLengthBetween(3, 12); + final int size = randomIntBetween(1, 42); + + // 1. Requested ShardId is consistent + final var request1 = new GetCcrRestoreFileChunkRequest( + mock(DiscoveryNode.class), + sessionUUID, + expectedFileName, + size, + expectedShardId + ); + assertThat(request1.indices(), arrayContaining(indexName)); + final PlainActionFuture future1 = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request1, future1); + // The actual response content does not matter as long as the future executes without any error + future1.actionGet().decRef(); + + // 2. Inconsistent requested ShardId + final var request2 = new GetCcrRestoreFileChunkRequest( + mock(DiscoveryNode.class), + sessionUUID, + expectedFileName, + size, + mock(ShardId.class) + ); + assertThat( + expectThrows(IllegalArgumentException.class, () -> action.doExecute(mock(Task.class), request2, new PlainActionFuture<>())), + is(e) + ); + + // 3. Unknown file name + final IllegalArgumentException e3 = new IllegalArgumentException("invalid fileName"); + doAnswer(invocation -> { + final String requestedFileName = invocation.getArgument(1); + if (false == expectedFileName.equals(requestedFileName)) { + throw e3; + } else { + return null; + } + }).when(ccrRestoreSourceService).ensureFileNameIsKnownToSession(anyString(), anyString()); + final var request3 = new GetCcrRestoreFileChunkRequest( + mock(DiscoveryNode.class), + sessionUUID, + randomValueOtherThan(expectedFileName, () -> randomAlphaOfLengthBetween(3, 12)), + size, + expectedShardId + ); + assertThat( + expectThrows(IllegalArgumentException.class, () -> action.doExecute(mock(Task.class), request3, new PlainActionFuture<>())), + is(e3) + ); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionActionTests.java new file mode 100644 index 0000000000000..4b654221460f2 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionActionTests.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; +import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper; +import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; +import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +public class PutCcrRestoreSessionActionTests extends ESTestCase { + public void testPrivilegeForActions() { + // Indices action is granted by the all privilege + assertThat(IndexPrivilege.MANAGE.predicate().test(PutCcrRestoreSessionAction.NAME), is(false)); + assertThat(IndexPrivilege.READ.predicate().test(PutCcrRestoreSessionAction.NAME), is(false)); + assertThat(IndexPrivilege.ALL.predicate().test(PutCcrRestoreSessionAction.NAME), is(true)); + + // Internal action is granted by neither regular indices nor cluster privileges + assertThat(IndexPrivilege.ALL.predicate().test(PutCcrRestoreSessionAction.INTERNAL_NAME), is(false)); + assertThat( + ClusterPrivilegeResolver.ALL.permission() + .check(PutCcrRestoreSessionAction.INTERNAL_NAME, mock(TransportRequest.class), AuthenticationTestHelper.builder().build()), + is(false) + ); + } + + public void testTransportActionNames() { + final ThreadPool threadPool = mock(ThreadPool.class); + final ClusterService clusterService = mock(ClusterService.class); + final ActionFilters actionFilters = mock(ActionFilters.class); + final IndexNameExpressionResolver indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); + final TransportService transportService = mock(TransportService.class); + final IndicesService indicesService = mock(IndicesService.class); + final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); + + final PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction action1 = new PutCcrRestoreSessionAction.TransportAction( + threadPool, + clusterService, + actionFilters, + indexNameExpressionResolver, + transportService, + indicesService, + ccrRestoreSourceService + ); + assertThat(action1.actionName, equalTo(PutCcrRestoreSessionAction.NAME)); + + final PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction action2 = + new PutCcrRestoreSessionAction.InternalTransportAction( + threadPool, + clusterService, + actionFilters, + indexNameExpressionResolver, + transportService, + indicesService, + ccrRestoreSourceService + ); + assertThat(action2.actionName, equalTo(PutCcrRestoreSessionAction.INTERNAL_NAME)); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 3612285334ae5..e8badfbee1e3e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -19,14 +19,19 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.xpack.ccr.CcrSettings; import org.junit.Before; import java.io.IOException; import java.util.ArrayList; +import java.util.Set; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.containsString; + public class CcrRestoreSourceServiceTests extends IndexShardTestCase { private CcrRestoreSourceService restoreSourceService; @@ -251,4 +256,55 @@ public void testSessionCanTimeout() throws Exception { closeShards(indexShard); } + + public void testConsistencyBetweenRequestAndSession() throws IOException { + IndexShard indexShard = newStartedShard(true); + + final String sessionUUID = UUIDs.randomBase64UUID(); + final Store.MetadataSnapshot metadata = restoreSourceService.openSession(sessionUUID, indexShard); + final Set knownFileNames = metadata.fileMetadataMap().keySet(); + + final String anotherSessionUUID = UUIDs.randomBase64UUID(); + + final IllegalArgumentException e1 = expectThrows( + IllegalArgumentException.class, + () -> restoreSourceService.ensureSessionShardIdConsistency(anotherSessionUUID, indexShard.shardId()) + ); + assertThat(e1.getMessage(), containsString("session [" + anotherSessionUUID + "] not found")); + + final IllegalArgumentException e2 = expectThrows( + IllegalArgumentException.class, + () -> restoreSourceService.ensureFileNameIsKnownToSession(anotherSessionUUID, randomFrom(knownFileNames)) + ); + assertThat(e2.getMessage(), containsString("session [" + anotherSessionUUID + "] not found")); + + final ShardId anotherShardId = new ShardId(randomAlphaOfLengthBetween(3, 12), randomAlphaOfLength(20), randomIntBetween(0, 3)); + final IllegalArgumentException e3 = expectThrows( + IllegalArgumentException.class, + () -> restoreSourceService.ensureSessionShardIdConsistency(sessionUUID, anotherShardId) + ); + assertThat(e3.getMessage(), containsString("does not match requested shardId [" + anotherShardId + "]")); + + final String anotherFileName = randomValueOtherThanMany(knownFileNames::contains, () -> randomAlphaOfLengthBetween(3, 12)); + final IllegalArgumentException e4 = expectThrows( + IllegalArgumentException.class, + () -> restoreSourceService.ensureFileNameIsKnownToSession(sessionUUID, anotherFileName) + ); + assertThat(e4.getMessage(), containsString("invalid file name [" + anotherFileName + "]")); + + try { + restoreSourceService.ensureSessionShardIdConsistency(sessionUUID, indexShard.shardId()); + } catch (Exception e) { + fail("should have succeeded, but got [" + e + "]"); + } + + try { + restoreSourceService.ensureFileNameIsKnownToSession(sessionUUID, randomFrom(knownFileNames)); + } catch (Exception e) { + fail("should have succeeded, but got [" + e + "]"); + } + + restoreSourceService.closeSession(sessionUUID); + closeShards(indexShard); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Authentication.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Authentication.java index e4b5ece58fcc3..b48043978e66c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Authentication.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Authentication.java @@ -732,6 +732,16 @@ public void toXContentFragment(XContentBuilder builder) throws IOException { } } + public static Authentication getAuthenticationFromCrossClusterAccessMetadata(Authentication authentication) { + if (authentication.isCrossClusterAccess()) { + return (Authentication) authentication.getAuthenticatingSubject().getMetadata().get(CROSS_CLUSTER_ACCESS_AUTHENTICATION_KEY); + } else { + String message = "authentication is not cross_cluster_access"; + assert false : message; + throw new IllegalArgumentException(message); + } + } + private static final Map> METADATA_VALUE_READER = Map.of( CROSS_CLUSTER_ACCESS_AUTHENTICATION_KEY, Authentication::new, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/CrossClusterAccessUser.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/CrossClusterAccessUser.java index 6084655f5eb87..f786fffcaded1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/CrossClusterAccessUser.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/CrossClusterAccessUser.java @@ -8,7 +8,9 @@ package org.elasticsearch.xpack.core.security.user; import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.common.Strings; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo; import org.elasticsearch.xpack.core.security.authc.Subject; @@ -23,8 +25,27 @@ public class CrossClusterAccessUser extends User { public static final RoleDescriptor ROLE_DESCRIPTOR = new RoleDescriptor( UsernamesField.CROSS_CLUSTER_ACCESS_ROLE, - new String[] { "cross_cluster_access" }, - null, + new String[] { + "cross_cluster_access", + // TODO: add a named cluster privilege to cover the CCR cluster actions + ClusterStateAction.NAME }, + // Needed for CCR background jobs (with system user) + new RoleDescriptor.IndicesPrivileges[] { + RoleDescriptor.IndicesPrivileges.builder() + .indices("*") + .privileges( + RetentionLeaseActions.Add.ACTION_NAME, + RetentionLeaseActions.Remove.ACTION_NAME, + RetentionLeaseActions.Renew.ACTION_NAME, + "indices:monitor/stats", + "indices:internal/admin/ccr/restore/session/put", + "indices:internal/admin/ccr/restore/session/clear", + "internal:transport/proxy/indices:internal/admin/ccr/restore/session/clear", + "indices:internal/admin/ccr/restore/file_chunk/get", + "internal:transport/proxy/indices:internal/admin/ccr/restore/file_chunk/get" + ) + .allowRestrictedIndices(true) + .build() }, null, null, null, diff --git a/x-pack/plugin/security/qa/multi-cluster/build.gradle b/x-pack/plugin/security/qa/multi-cluster/build.gradle index 31f187bd163ad..003958c01fd95 100644 --- a/x-pack/plugin/security/qa/multi-cluster/build.gradle +++ b/x-pack/plugin/security/qa/multi-cluster/build.gradle @@ -13,7 +13,11 @@ apply plugin: 'elasticsearch.rest-resources' apply plugin: 'elasticsearch.bwc-test' dependencies { + javaRestTestImplementation project(':x-pack:plugin:core') + javaRestTestImplementation project(':x-pack:plugin:security') + javaRestTestImplementation project(':x-pack:plugin:ccr') clusterModules(project(":modules:analysis-common")) + clusterModules project(':x-pack:plugin:ccr') clusterModules(project(":modules:reindex")) // need for deleting transform jobs clusterModules(project(":x-pack:plugin:transform")) } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrFcActionAuthorizationIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrFcActionAuthorizationIT.java new file mode 100644 index 0000000000000..da88206f4f2f9 --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrFcActionAuthorizationIT.java @@ -0,0 +1,323 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Strings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.FeatureFlag; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteConnectionInfo; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; +import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; +import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkRequest; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionRequest; +import org.elasticsearch.xpack.core.security.user.CrossClusterAccessUser; +import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.xpack.remotecluster.AbstractRemoteClusterSecurityTestCase.PASS; +import static org.elasticsearch.xpack.remotecluster.AbstractRemoteClusterSecurityTestCase.USER; +import static org.elasticsearch.xpack.remotecluster.AbstractRemoteClusterSecurityTestCase.createCrossClusterAccessApiKey; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestCase { + + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + + @ClassRule + public static ElasticsearchCluster testCluster = ElasticsearchCluster.local() + .name("test-cluster") + .feature(FeatureFlag.NEW_RCS_MODE) + .module("analysis-common") + .module("x-pack-ccr") + .setting("xpack.license.self_generated.type", "trial") + .setting("xpack.security.enabled", "true") + .setting("xpack.security.http.ssl.enabled", "false") + .setting("xpack.security.transport.ssl.enabled", "false") + .setting("remote_cluster_server.enabled", "true") + .setting("remote_cluster.port", "0") + .setting("xpack.security.remote_cluster_server.ssl.enabled", "false") + .user(USER, PASS.toString()) + .build(); + + // Create an API Key specifically for CCR access + private static Map createCrossClusterAccessCcrApiKey() { + return createCrossClusterAccessApiKey(adminClient(), """ + { + "role": { + "cluster": [ + "cross_cluster_access", + "cluster:monitor/state", + "cluster:monitor/xpack/info" + ], + "index": [ + { + "names": ["leader-index*"], + "privileges": ["manage", "read", + "indices:internal/admin/ccr/restore/*", + "internal:transport/proxy/indices:internal/admin/ccr/restore/*"] + } + ] + } + }"""); + } + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void setUp() throws Exception { + super.setUp(); + initClient(); + API_KEY_MAP_REF.updateAndGet(v -> v != null ? v : createCrossClusterAccessCcrApiKey()); + } + + @Override + protected String getTestRestCluster() { + return testCluster.getHttpAddresses(); + } + + @Override + protected Settings restClientSettings() { + final String token = basicAuthHeaderValue(USER, PASS); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testIndicesPrivilegesAreEnforcedForCcrRestoreSessionActions() throws IOException { + final String leaderIndex1UUID; + final String leaderIndex2UUID; + final String privateIndexUUID; + + // Create indices on the leader cluster + { + final Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(Strings.format(""" + { "index": { "_index": "leader-index-1" } } + { "name": "doc-1" } + { "index": { "_index": "leader-index-2" } } + { "name": "doc-2" } + { "index": { "_index": "private-index" } } + { "name": "doc-3" } + """)); + assertOK(adminClient().performRequest(bulkRequest)); + + final ObjectPath indexSettings = assertOKAndCreateObjectPath( + adminClient().performRequest(new Request("GET", "/leader-index*,private-index/_settings")) + ); + leaderIndex1UUID = indexSettings.evaluate("leader-index-1.settings.index.uuid"); + leaderIndex2UUID = indexSettings.evaluate("leader-index-2.settings.index.uuid"); + privateIndexUUID = indexSettings.evaluate("private-index.settings.index.uuid"); + } + + // Simulate QC behaviours by directly connecting to the FC using a transport service + try (MockTransportService service = startTransport("node", threadPool)) { + final RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + final List remoteConnectionInfos = remoteClusterService.getRemoteConnectionInfos().toList(); + assertThat(remoteConnectionInfos, hasSize(1)); + assertThat(remoteConnectionInfos.get(0).isConnected(), is(true)); + + final Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, "my_remote_cluster"); + + // Creating a restore session fails if index is not accessible + final ShardId privateShardId = new ShardId("private-index", privateIndexUUID, 0); + final PutCcrRestoreSessionRequest request = new PutCcrRestoreSessionRequest(UUIDs.randomBase64UUID(), privateShardId); + final ElasticsearchSecurityException e = expectThrows( + ElasticsearchSecurityException.class, + () -> remoteClusterClient.execute(PutCcrRestoreSessionAction.INSTANCE, request).actionGet() + ); + assertThat( + e.getMessage(), + containsString( + "action [indices:internal/admin/ccr/restore/session/put] towards remote cluster is unauthorized " + + "for user [_cross_cluster_access] with assigned roles [] authenticated by API key id [" + + API_KEY_MAP_REF.get().get("id") + + "] of user [test_user] on indices [private-index], this action is granted by the index privileges [all]" + ) + ); + + // Creating restore sessions succeed when indices are accessible + final String sessionUUID1 = UUIDs.randomBase64UUID(); + final ShardId shardId1 = new ShardId("leader-index-1", leaderIndex1UUID, 0); + final PutCcrRestoreSessionRequest request1 = new PutCcrRestoreSessionRequest(sessionUUID1, shardId1); + final PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response1 = remoteClusterClient.execute( + PutCcrRestoreSessionAction.INSTANCE, + request1 + ).actionGet(); + assertThat(response1.getStoreFileMetadata().fileMetadataMap().keySet(), hasSize(greaterThanOrEqualTo(1))); + final String leaderIndex1FileName = response1.getStoreFileMetadata().fileMetadataMap().keySet().iterator().next(); + + final String sessionUUID2 = UUIDs.randomBase64UUID(); + final ShardId shardId2 = new ShardId("leader-index-2", leaderIndex2UUID, 0); + final PutCcrRestoreSessionRequest request2 = new PutCcrRestoreSessionRequest(sessionUUID2, shardId2); + final PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response2 = remoteClusterClient.execute( + PutCcrRestoreSessionAction.INSTANCE, + request2 + ).actionGet(); + assertThat(response2.getStoreFileMetadata().fileMetadataMap().keySet(), hasSize(greaterThanOrEqualTo(1))); + final String leaderIndex2FileName = response2.getStoreFileMetadata().fileMetadataMap().keySet().iterator().next(); + + // Get file chuck fails if requested index is not authorized + final var e1 = expectThrows( + ElasticsearchSecurityException.class, + () -> remoteClusterClient.execute( + GetCcrRestoreFileChunkAction.INSTANCE, + new GetCcrRestoreFileChunkRequest(response1.getNode(), sessionUUID1, leaderIndex1FileName, 1, privateShardId) + ).actionGet() + ); + assertThat( + e1.getMessage(), + containsString("action [indices:internal/admin/ccr/restore/file_chunk/get] towards remote cluster is unauthorized") + ); + + // Get file chunk fails if requested index does not match session index + final var e2 = expectThrows( + IllegalArgumentException.class, + () -> remoteClusterClient.execute( + GetCcrRestoreFileChunkAction.INSTANCE, + new GetCcrRestoreFileChunkRequest(response1.getNode(), sessionUUID1, leaderIndex1FileName, 1, shardId2) + ).actionGet() + ); + assertThat(e2.getMessage(), containsString("does not match requested shardId")); + + // Get file chunk fails if requested file is not part of the session + final var e3 = expectThrows( + IllegalArgumentException.class, + () -> remoteClusterClient.execute( + GetCcrRestoreFileChunkAction.INSTANCE, + new GetCcrRestoreFileChunkRequest( + response1.getNode(), + sessionUUID1, + randomValueOtherThan(leaderIndex1FileName, () -> randomAlphaOfLengthBetween(3, 20)), + 1, + shardId1 + ) + ).actionGet() + ); + assertThat(e3.getMessage(), containsString("invalid file name")); + + // Get file chunk succeeds + final GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse getChunkResponse = remoteClusterClient.execute( + GetCcrRestoreFileChunkAction.INSTANCE, + new GetCcrRestoreFileChunkRequest(response2.getNode(), sessionUUID2, leaderIndex2FileName, 1, shardId2) + ).actionGet(); + assertThat(getChunkResponse.getChunk().length(), equalTo(1)); + + // Clear restore session fails if index is unauthorized + final var e4 = expectThrows( + ElasticsearchSecurityException.class, + () -> remoteClusterClient.execute( + ClearCcrRestoreSessionAction.INSTANCE, + new ClearCcrRestoreSessionRequest(sessionUUID1, response1.getNode(), privateShardId) + ).actionGet() + ); + assertThat( + e4.getMessage(), + containsString("action [indices:internal/admin/ccr/restore/session/clear] towards remote cluster is unauthorized") + ); + + // Clear restore session fails if requested index does not match session index + final var e5 = expectThrows( + IllegalArgumentException.class, + () -> remoteClusterClient.execute( + ClearCcrRestoreSessionAction.INSTANCE, + new ClearCcrRestoreSessionRequest(sessionUUID1, response1.getNode(), shardId2) + ).actionGet() + ); + assertThat(e5.getMessage(), containsString("does not match requested shardId")); + + // Clear restore sessions succeed + remoteClusterClient.execute( + ClearCcrRestoreSessionAction.INSTANCE, + new ClearCcrRestoreSessionRequest(sessionUUID1, response1.getNode(), shardId1) + ).actionGet(); + remoteClusterClient.execute( + ClearCcrRestoreSessionAction.INSTANCE, + new ClearCcrRestoreSessionRequest(sessionUUID2, response2.getNode(), shardId2) + ).actionGet(); + } + } + + private static MockTransportService startTransport(final String nodeName, final ThreadPool threadPool) { + final String remoteClusterServerEndpoint = testCluster.getRemoteClusterServerEndpoint(0); + + final Settings.Builder builder = Settings.builder() + .put("node.name", nodeName) + .put("xpack.security.remote_cluster_client.ssl.enabled", "false"); + + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote.my_remote_cluster.credentials", (String) API_KEY_MAP_REF.get().get("encoded")); + builder.setSecureSettings(secureSettings); + if (randomBoolean()) { + builder.put("cluster.remote.my_remote_cluster.mode", "sniff") + .put("cluster.remote.my_remote_cluster.seeds", remoteClusterServerEndpoint); + } else { + builder.put("cluster.remote.my_remote_cluster.mode", "proxy") + .put("cluster.remote.my_remote_cluster.proxy_address", remoteClusterServerEndpoint); + } + + final MockTransportService service = MockTransportService.createNewService( + builder.build(), + Version.CURRENT, + TransportVersion.CURRENT, + threadPool, + null + ); + boolean success = false; + try { + service.addSendBehavior((connection, requestId, action, request, options) -> { + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + new CrossClusterAccessHeaders( + "ApiKey " + API_KEY_MAP_REF.get().get("encoded"), + CrossClusterAccessUser.subjectInfo(TransportVersion.CURRENT, nodeName) + ).writeToContext(threadContext); + connection.sendRequest(requestId, action, request, options); + } + }); + service.start(); + success = true; + } finally { + if (success == false) { + service.close(); + } + } + return service; + } +} diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrIT.java new file mode 100644 index 0000000000000..d60366b2b919c --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrIT.java @@ -0,0 +1,364 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.core.Strings; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.util.resource.Resource; +import org.elasticsearch.test.rest.ObjectPath; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.Strings.arrayToCommaDelimitedString; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +public class RemoteClusterSecurityCcrIT extends AbstractRemoteClusterSecurityTestCase { + + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + + static { + fulfillingCluster = ElasticsearchCluster.local() + .name("fulfilling-cluster") + .apply(commonClusterConfig) + .module("x-pack-ccr") + .setting("remote_cluster_server.enabled", "true") + .setting("remote_cluster.port", "0") + .setting("xpack.security.remote_cluster_server.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key") + .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt") + .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password") + .build(); + + queryCluster = ElasticsearchCluster.local() + .name("query-cluster") + .apply(commonClusterConfig) + .rolesFile(Resource.fromClasspath("roles.yml")) + .module("x-pack-ccr") + .setting("xpack.security.remote_cluster_client.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt") + .keystore("cluster.remote.my_remote_cluster.credentials", () -> { + API_KEY_MAP_REF.updateAndGet(v -> v != null ? v : createCrossClusterAccessCcrApiKey()); + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) + .user("ccr_user", PASS.toString(), "ccr_user_role") + .build(); + } + + // Create an API Key specifically for CCR access + private static Map createCrossClusterAccessCcrApiKey() { + initFulfillingClusterClient(); + final var createApiKeyRequest = new Request("POST", "/_security/api_key"); + createApiKeyRequest.setJsonEntity(Strings.format(""" + { + "name": "cross_cluster_access_key", + "role_descriptors": { + "role": { + "cluster": [ + "cross_cluster_access", + "cluster:monitor/state", + "cluster:monitor/xpack/info" + ], + "index": [ + { + "names": ["leader-index", "leader-alias", "metrics-*"], + "privileges": ["manage", "read", + "indices:internal/admin/ccr/restore/*", + "internal:transport/proxy/indices:internal/admin/ccr/restore/*"] + } + ] + } + } + }""")); + try { + final Response createApiKeyResponse = performRequestWithAdminUser(fulfillingClusterClient, createApiKeyRequest); + assertOK(createApiKeyResponse); + return responseAsMap(createApiKeyResponse); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @ClassRule + // Use a RuleChain to ensure that fulfilling cluster is started before query cluster + public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); + + public void testFollow() throws Exception { + configureRemoteCluster(); + + // fulfilling cluster + { + final Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(Strings.format(""" + { "index": { "_index": "leader-index" } } + { "name": "doc-1" } + { "index": { "_index": "leader-index" } } + { "name": "doc-2" } + { "index": { "_index": "leader-index" } } + { "name": "doc-3" } + { "index": { "_index": "leader-index" } } + { "name": "doc-4" } + { "index": { "_index": "private-index" } } + { "name": "doc-5" } + """)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + + final Request putIndexRequest = new Request("PUT", "/shared-index"); + putIndexRequest.setJsonEntity(""" + { + "aliases": { + "shared-alias": {} + } + } + """); + assertOK(performRequestAgainstFulfillingCluster(putIndexRequest)); + } + + // query cluster + { + final String followIndexName = "follower-index"; + final Request putCcrRequest = new Request("PUT", "/" + followIndexName + "/_ccr/follow?wait_for_active_shards=1"); + putCcrRequest.setJsonEntity(""" + { + "remote_cluster": "my_remote_cluster", + "leader_index": "leader-index" + }"""); + + final Response putCcrResponse = performRequestWithCcrUser(putCcrRequest); + assertOK(putCcrResponse); + + final Map responseMap = responseAsMap(putCcrResponse); + responseMap.forEach((k, v) -> assertThat(k, v, is(true))); + + // Ensure data is replicated + verifyReplicatedDocuments(4L, followIndexName); + + assertFollowerInfo(followIndexName, "leader-index", "active"); + assertFollowerStats(followIndexName); + + // unfollow and then follow and then index a few docs in leader index: + pauseFollow(followIndexName); + assertFollowerInfo(followIndexName, "leader-index", "paused"); + resumeFollow(followIndexName); + final Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(Strings.format(""" + { "index": { "_index": "leader-index" } } + { "name": "doc-5" } + { "index": { "_index": "leader-index" } } + { "name": "doc-6" } + """)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + verifyReplicatedDocuments(6L, followIndexName); + + pauseFollow(followIndexName); + closeIndex(followIndexName); + unfollow(followIndexName); + assertNoFollowerInfo(followIndexName); + final var e = expectThrows(ResponseException.class, () -> resumeFollow(followIndexName)); + assertThat(e.getMessage(), containsString("follow index [" + followIndexName + "] does not have ccr metadata")); + } + + // query cluster error cases - no privileges + { + + final Request putCcrRequest = new Request("PUT", "/follower-index-2/_ccr/follow?wait_for_active_shards=1"); + putCcrRequest.setJsonEntity(""" + { + "remote_cluster": "my_remote_cluster", + "leader_index": "private-index" + }"""); + final ResponseException e = expectThrows(ResponseException.class, () -> performRequestWithCcrUser(putCcrRequest)); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(403)); + assertThat(e.getMessage(), containsString("insufficient privileges to follow index [private-index]")); + } + + // query cluster error cases - aliases not supported + { + final Request putCcrRequest = new Request("PUT", "/follower-index-3/_ccr/follow?wait_for_active_shards=1"); + putCcrRequest.setJsonEntity(""" + { + "remote_cluster": "my_remote_cluster", + "leader_index": "shared-alias" + }"""); + final ResponseException e = expectThrows(ResponseException.class, () -> performRequestWithCcrUser(putCcrRequest)); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(e.getMessage(), containsString("cannot follow [shared-alias], because it is a ALIAS")); + } + } + + public void testAutoFollow() throws Exception { + configureRemoteCluster(); + + // follow cluster + { + final var putAllowFollowRequest = new Request("PUT", "/_ccr/auto_follow/my_auto_follow_pattern"); + putAllowFollowRequest.setJsonEntity(""" + { + "remote_cluster" : "my_remote_cluster", + "leader_index_patterns" : [ "metrics-*" ], + "leader_index_exclusion_patterns": [ "metrics-001" ] + }"""); + + final Response putAutoFollowResponse = performRequestWithCcrUser(putAllowFollowRequest); + assertOK(putAutoFollowResponse); + } + + // leader cluster + { + final Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(Strings.format(""" + { "index": { "_index": "metrics-000" } } + { "name": "doc-1" } + { "index": { "_index": "metrics-000" } } + { "name": "doc-2" } + { "index": { "_index": "metrics-001" } } + { "name": "doc-3" } + { "index": { "_index": "metrics-002" } } + { "name": "doc-4" }\n""")); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + } + + // follow cluster + { + assertBusy(() -> { + ensureHealth("", request -> { + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_active_shards", "1"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("wait_for_no_initializing_shards", "true"); + request.addParameter("timeout", "5s"); + request.addParameter("level", "shards"); + }); + }); + verifyReplicatedDocuments(3L, "metrics-000", "metrics-002"); + + final Response statsResponse = performRequestWithCcrUser(new Request("GET", "/_ccr/stats")); + assertOK(statsResponse); + assertThat( + ObjectPath.createFromResponse(statsResponse).evaluate("auto_follow_stats.number_of_successful_follow_indices"), + equalTo(2) + ); + assertFollowerInfo("metrics-000", "metrics-000", "active"); + assertFollowerInfo("metrics-002", "metrics-002", "active"); + + // Pause and resume + pauseAutoFollow("my_auto_follow_pattern"); + resumeAutoFollow("my_auto_follow_pattern"); + final Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(Strings.format(""" + { "index": { "_index": "metrics-000" } } + { "name": "doc-5" } + { "index": { "_index": "metrics-002" } } + { "name": "doc-6" }\n""")); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + verifyReplicatedDocuments(5L, "metrics-000", "metrics-002"); + + // Delete + deleteAutoFollow("my_auto_follow_pattern"); + final ResponseException e = expectThrows( + ResponseException.class, + () -> performRequestWithCcrUser(new Request("GET", "/_ccr/auto_follow/my_auto_follow_pattern")) + ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); + } + } + + private Response performRequestWithCcrUser(final Request request) throws IOException { + request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue("ccr_user", PASS))); + return client().performRequest(request); + } + + private void verifyReplicatedDocuments(long numberOfDocs, String... indices) throws Exception { + final Request searchRequest = new Request("GET", "/" + arrayToCommaDelimitedString(indices) + "/_search"); + assertBusy(() -> { + final Response response = performRequestWithCcrUser(searchRequest); + assertOK(response); + final SearchResponse searchResponse = SearchResponse.fromXContent(responseAsParser(response)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(numberOfDocs)); + assertThat( + Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getIndex).collect(Collectors.toUnmodifiableSet()), + equalTo(Set.of(indices)) + ); + }); + } + + private void assertFollowerInfo(String followIndexName, String leadIndexName, String status) throws IOException { + final Response response = performRequestWithCcrUser(new Request("GET", "/" + followIndexName + "/_ccr/info")); + assertOK(response); + final List> followerIndices = ObjectPath.createFromResponse(response).evaluate("follower_indices"); + assertThat(followerIndices, hasSize(1)); + + final Map follower = followerIndices.get(0); + assertThat(ObjectPath.evaluate(follower, "follower_index"), equalTo(followIndexName)); + assertThat(ObjectPath.evaluate(follower, "leader_index"), equalTo(leadIndexName)); + assertThat(ObjectPath.evaluate(follower, "remote_cluster"), equalTo("my_remote_cluster")); + assertThat(ObjectPath.evaluate(follower, "status"), equalTo(status)); + } + + private void assertNoFollowerInfo(String followIndexName) throws IOException { + final Response response = performRequestWithCcrUser(new Request("GET", "/" + followIndexName + "/_ccr/info")); + assertOK(response); + final List> followerIndices = ObjectPath.createFromResponse(response).evaluate("follower_indices"); + assertThat(followerIndices, empty()); + } + + private void assertFollowerStats(String followIndexName) throws IOException { + final Response response = performRequestWithCcrUser(new Request("GET", "/" + followIndexName + "/_ccr/stats")); + assertOK(response); + final List> followerIndices = ObjectPath.createFromResponse(response).evaluate("indices"); + assertThat(followerIndices, hasSize(1)); + + final Map follower = followerIndices.get(0); + assertThat(ObjectPath.evaluate(follower, "index"), equalTo(followIndexName)); + } + + private void pauseFollow(String followIndexName) throws IOException { + assertOK(performRequestWithCcrUser(new Request("POST", "/" + followIndexName + "/_ccr/pause_follow"))); + } + + private void resumeFollow(String followIndexName) throws IOException { + final Request resumeFollowRequest = new Request("POST", "/" + followIndexName + "/_ccr/resume_follow"); + resumeFollowRequest.setJsonEntity("{\"read_poll_timeout\": \"10ms\"}"); + assertOK(performRequestWithCcrUser(resumeFollowRequest)); + } + + private void unfollow(String followIndexName) throws IOException { + assertOK(performRequestWithCcrUser(new Request("POST", "/" + followIndexName + "/_ccr/unfollow"))); + } + + private void pauseAutoFollow(String name) throws IOException { + assertOK(performRequestWithCcrUser(new Request("POST", "/_ccr/auto_follow/" + name + "/pause"))); + } + + private void resumeAutoFollow(String name) throws IOException { + assertOK(performRequestWithCcrUser(new Request("POST", "/_ccr/auto_follow/" + name + "/resume"))); + } + + private void deleteAutoFollow(String name) throws IOException { + assertOK(performRequestWithCcrUser(new Request("DELETE", "/_ccr/auto_follow/" + name))); + } +} diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/resources/roles.yml b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/resources/roles.yml index 682cbceb40601..1577565e163a3 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/resources/roles.yml +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/resources/roles.yml @@ -13,7 +13,7 @@ read_remote_shared_metrics: transform_remote_shared_index: indices: - names: [ 'simple-remote-transform' ] - privileges: ['create_index', 'index', 'read'] + privileges: [ 'create_index', 'index', 'read' ] remote_indices: - names: [ 'shared-transform-index' ] privileges: [ 'read', 'read_cross_cluster', 'view_index_metadata' ] @@ -28,3 +28,13 @@ ml_jobs_shared_airline_data: - names: [ 'shared-airline-data' ] privileges: [ 'read', 'read_cross_cluster', 'view_index_metadata' ] clusters: [ 'my_*' ] + +ccr_user_role: + cluster: [ 'manage_ccr', 'monitor' ] + indices: + - names: [ 'follower*', 'shared-*', 'metrics-*' ] + privileges: [ 'monitor', 'read', 'write', 'manage_follow_index' ] + remote_indices: + - names: [ 'leader-index', 'shared-*', 'metrics-*' ] + privileges: [ 'monitor', 'read' ] + clusters: [ "*" ] diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 6dfc8eb3b83d6..e26942a713ff9 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -391,6 +391,9 @@ public class Constants { "indices:admin/block/add", "indices:admin/block/add[s]", "indices:admin/cache/clear", + "indices:internal/admin/ccr/restore/file_chunk/get", + "indices:internal/admin/ccr/restore/session/clear", + "indices:internal/admin/ccr/restore/session/put", "indices:admin/close", "indices:admin/close[s]", "indices:admin/create", diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesAction.java index ca8d5b2c90935..06eb4472a9abb 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse; +import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.Subject; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; @@ -30,6 +31,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.security.authc.Authentication.getAuthenticationFromCrossClusterAccessMetadata; + /** * Transport action that tests whether the currently authenticated user has the specified * {@link AuthorizationEngine.PrivilegesToCheck privileges} @@ -57,8 +60,9 @@ public TransportHasPrivilegesAction( @Override protected void doExecute(Task task, HasPrivilegesRequest request, ActionListener listener) { final String username = request.username(); - final Subject subject = securityContext.getAuthentication().getEffectiveSubject(); - if (subject.getUser().principal().equals(username) == false) { + final Authentication authentication = securityContext.getAuthentication(); + + if (isSameUser(authentication, username) == false) { listener.onFailure(new IllegalArgumentException("users may only check the privileges of their own account")); return; } @@ -67,7 +71,7 @@ protected void doExecute(Task task, HasPrivilegesRequest request, ActionListener request, ActionListener.wrap( applicationPrivilegeDescriptors -> authorizationService.checkPrivileges( - subject, + authentication.getEffectiveSubject(), request.getPrivilegesToCheck(), applicationPrivilegeDescriptors, listener.map(privilegesCheckResult -> { @@ -100,4 +104,14 @@ public static Set getApplicationNames(HasPrivilegesRequest request) { .map(RoleDescriptor.ApplicationResourcePrivileges::getApplication) .collect(Collectors.toSet()); } + + private static boolean isSameUser(Authentication authentication, String username) { + final Subject subjectToCheck; + if (authentication.isCrossClusterAccess()) { + subjectToCheck = getAuthenticationFromCrossClusterAccessMetadata(authentication).getEffectiveSubject(); + } else { + subjectToCheck = authentication.getEffectiveSubject(); + } + return subjectToCheck.getUser().principal().equals(username); + } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 9ec9cd7504872..43068e4c87ee7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesResponse; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; import org.elasticsearch.xpack.core.security.action.user.UserRequest; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.Authentication.AuthenticationType; @@ -103,6 +104,7 @@ import static org.elasticsearch.common.Strings.arrayToCommaDelimitedString; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.xpack.core.security.authc.Authentication.getAuthenticationFromCrossClusterAccessMetadata; import static org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail.PRINCIPAL_ROLES_FIELD_NAME; public class RBACEngine implements AuthorizationEngine { @@ -203,6 +205,15 @@ static boolean checkSameUserPermissions(String action, TransportRequest request, return false; } final String username = usernames[0]; + // Cross cluster access user can perform has privilege check + if (authentication.isCrossClusterAccess() && HasPrivilegesAction.NAME.equals(action)) { + assert request instanceof HasPrivilegesRequest; + return getAuthenticationFromCrossClusterAccessMetadata(authentication).getEffectiveSubject() + .getUser() + .principal() + .equals(username); + } + final boolean sameUsername = authentication.getEffectiveSubject().getUser().principal().equals(username); if (sameUsername && ChangePasswordAction.NAME.equals(action)) { return checkChangePasswordAction(authentication); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java index b4eb3ef123c6e..b00b2cafaae34 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java @@ -14,12 +14,14 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; @@ -27,6 +29,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.security.Security; @@ -78,7 +81,11 @@ final class CrossClusterAccessServerTransportFilter extends ServerTransportFilte SearchTransportService.FETCH_ID_ACTION_NAME, SearchTransportService.QUERY_CAN_MATCH_NAME, SearchTransportService.QUERY_CAN_MATCH_NODE_NAME, - TransportOpenPointInTimeAction.OPEN_SHARD_READER_CONTEXT_NAME + TransportOpenPointInTimeAction.OPEN_SHARD_READER_CONTEXT_NAME, + // CCR actions + "indices:data/read/xpack/ccr/shard_changes", + "indices:internal/admin/ccr/restore/session/clear", + "indices:internal/admin/ccr/restore/file_chunk/get" ).flatMap(name -> Stream.of(name, TransportActionProxy.getProxyAction(name))), // These actions don't have proxy equivalents Stream.of( @@ -91,9 +98,15 @@ final class CrossClusterAccessServerTransportFilter extends ServerTransportFilte FieldCapabilitiesAction.NAME, FieldCapabilitiesAction.NAME + "[n]", "indices:data/read/eql", - // transform XPackInfoAction.NAME, - GetCheckpointAction.NAME + GetCheckpointAction.NAME, + // CCR actions + HasPrivilegesAction.NAME, + IndicesStatsAction.NAME, + RetentionLeaseActions.Add.ACTION_NAME, + RetentionLeaseActions.Remove.ACTION_NAME, + RetentionLeaseActions.Renew.ACTION_NAME, + "indices:internal/admin/ccr/restore/session/put" ) ).collect(Collectors.toUnmodifiableSet()); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index b6de4527b7c49..02788c575a3b7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.ssl.SslConfiguration; @@ -66,6 +67,14 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor { private static final Logger logger = LogManager.getLogger(SecurityServerTransportInterceptor.class); + private static final Map RCS_INTERNAL_ACTIONS_REPLACEMENTS = Map.of( + "internal:admin/ccr/restore/session/put", + "indices:internal/admin/ccr/restore/session/put", + "internal:admin/ccr/restore/session/clear", + "indices:internal/admin/ccr/restore/session/clear", + "internal:admin/ccr/restore/file_chunk/get", + "indices:internal/admin/ccr/restore/file_chunk/get" + ); private final AuthenticationService authcService; private final AuthorizationService authzService; @@ -326,14 +335,30 @@ private void sendWithCrossClusterAccessHeaders( assert authentication != null : "authentication must be present in security context"; final User user = authentication.getEffectiveSubject().getUser(); - if (SystemUser.is(user)) { - logger.trace( - "Request [{}] for action [{}] towards [{}] initiated by the system user. " - + "Sending request with internal cross cluster access user headers", - request.getClass(), - action, - remoteClusterAlias - ); + if (User.isInternal(user) && false == SystemUser.is(user)) { + final String message = "Internal user [" + user.principal() + "] should not be used for cross cluster requests"; + assert false : message; + throw illegalArgumentExceptionWithDebugLog(message); + } else if (SystemUser.is(user) || action.equals(ClusterStateAction.NAME)) { + if (SystemUser.is(user)) { + logger.trace( + "Request [{}] for action [{}] towards [{}] initiated by the system user. " + + "Sending request with internal cross cluster access user headers", + request.getClass(), + action, + remoteClusterAlias + ); + } else { + // Use system user for cluster state requests (CCR has many calls of cluster state with end-user context) + logger.trace( + () -> format( + "Switching to internal cross cluster access user for cluster state action towards [{}]. " + + "Original user is [%s]", + remoteClusterAlias, + user + ) + ); + } final var crossClusterAccessHeaders = new CrossClusterAccessHeaders( remoteClusterCredentials.credentials(), CrossClusterAccessUser.subjectInfo( @@ -341,12 +366,16 @@ private void sendWithCrossClusterAccessHeaders( authentication.getEffectiveSubject().getRealm().getNodeName() ) ); - sendWithCrossClusterAccessHeaders(crossClusterAccessHeaders, connection, action, request, options, handler); - } else if (User.isInternal(user)) { - final String message = "Internal user [" + user.principal() + "] should not be used for cross cluster requests"; - assert false : message; - throw illegalArgumentExceptionWithDebugLog(message); + // To be able to enforce index-level privileges under the new remote cluster security model, + // we switch from old-style internal actions to their new equivalent indices actions so that + // they will be checked for index privileges against the index specified in the requests + final String effectiveAction = RCS_INTERNAL_ACTIONS_REPLACEMENTS.getOrDefault(action, action); + if (false == effectiveAction.equals(action)) { + logger.trace("switching internal action from [{}] to [{}]", action, effectiveAction); + } + sendWithCrossClusterAccessHeaders(crossClusterAccessHeaders, connection, effectiveAction, request, options, handler); } else { + assert false == action.startsWith("internal:") : "internal action must be sent with system user"; authzService.getRoleDescriptorsIntersectionForRemoteCluster( remoteClusterAlias, authentication.getEffectiveSubject(), diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesActionTests.java index 28fdf26134979..b4a79e070e927 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportHasPrivilegesActionTests.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.security.action.user; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.bytes.BytesArray; @@ -20,16 +21,28 @@ import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.AuthenticationField; import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper; +import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; +import org.elasticsearch.xpack.core.security.authz.privilege.ApplicationPrivilegeDescriptor; import org.elasticsearch.xpack.core.security.user.User; import org.elasticsearch.xpack.security.authz.AuthorizationService; import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Set; +import static org.elasticsearch.test.ActionListenerUtils.anyActionListener; +import static org.elasticsearch.test.ActionListenerUtils.anyCollection; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class TransportHasPrivilegesActionTests extends ESTestCase { @@ -73,4 +86,94 @@ public void testHasPrivilegesRequestDoesNotAllowDLSRoleQueryBasedIndicesPrivileg assertThat(ile, notNullValue()); assertThat(ile.getMessage(), containsString("may only check index privileges without any DLS query")); } + + public void testRequiresSameUser() { + final SecurityContext context = mock(SecurityContext.class); + + final NativePrivilegeStore privilegeStore = mock(NativePrivilegeStore.class); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + final var listener = (ActionListener>) invocation.getArgument(2); + listener.onResponse(List.of()); + return null; + }).when(privilegeStore).getPrivileges(any(), any(), anyActionListener()); + final AuthorizationService authorizationService = mock(AuthorizationService.class); + final boolean isCompleteMatch = randomBoolean(); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + final var listener = (ActionListener) invocation.getArgument(3); + listener.onResponse( + new AuthorizationEngine.PrivilegesCheckResult( + isCompleteMatch, + new AuthorizationEngine.PrivilegesCheckResult.Details(Map.of(), Map.of(), Map.of()) + ) + ); + return null; + }).when(authorizationService).checkPrivileges(any(), any(), anyCollection(), anyActionListener()); + final var action = new TransportHasPrivilegesAction( + mock(TransportService.class), + new ActionFilters(Set.of()), + authorizationService, + privilegeStore, + context + ); + + final String username = randomAlphaOfLengthBetween(5, 10); + final User user = new User(username); + + // Scenario 1 - regular authentication with wrong username + final Authentication authentication1 = AuthenticationTestHelper.builder().user(user).realm().build(); + when(context.getAuthentication()).thenReturn(authentication1); + final HasPrivilegesRequest request1 = new HasPrivilegesRequest(); + request1.clusterPrivileges("all"); + request1.indexPrivileges(new RoleDescriptor.IndicesPrivileges[0]); + request1.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); + request1.username(randomValueOtherThan(username, () -> randomAlphaOfLengthBetween(5, 10))); + final PlainActionFuture future1 = new PlainActionFuture<>(); + action.execute(mock(Task.class), request1, future1); + final IllegalArgumentException e1 = expectThrows(IllegalArgumentException.class, future1::actionGet); + assertThat(e1.getMessage(), containsString("users may only check the privileges of their own account")); + verifyNoInteractions(privilegeStore, authorizationService); + + // Scenario 2 - cross cluster access authentication with right name on the API key but wrong name on the inner authentication + final String requestedUsername = randomValueOtherThan(username, () -> randomAlphaOfLengthBetween(5, 10)); + final Authentication authentication2 = AuthenticationTestHelper.builder() + .user(new User(requestedUsername)) + .apiKey() + .build() + .toCrossClusterAccess(AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo(authentication1)); + when(context.getAuthentication()).thenReturn(authentication2); + final HasPrivilegesRequest request2 = new HasPrivilegesRequest(); + request2.clusterPrivileges("all"); + request2.indexPrivileges(new RoleDescriptor.IndicesPrivileges[0]); + request2.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); + request2.username(requestedUsername); + final PlainActionFuture future2 = new PlainActionFuture<>(); + action.execute(mock(Task.class), request2, future2); + final IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, future2::actionGet); + assertThat(e2.getMessage(), containsString("users may only check the privileges of their own account")); + verifyNoInteractions(privilegeStore, authorizationService); + + // Scenario 3 (success) - cross cluster access authentication with right name on the inner authentication + final Authentication authentication3 = AuthenticationTestHelper.builder() + .crossClusterAccess( + randomAlphaOfLength(20), + AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo( + AuthenticationTestHelper.builder().user(new User(requestedUsername)).build() + ) + ) + .build(); + when(context.getAuthentication()).thenReturn(authentication3); + final HasPrivilegesRequest request3 = new HasPrivilegesRequest(); + request3.clusterPrivileges("all"); + request3.indexPrivileges(new RoleDescriptor.IndicesPrivileges[0]); + request3.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); + request3.username(requestedUsername); + + final PlainActionFuture future3 = new PlainActionFuture<>(); + action.execute(mock(Task.class), request3, future3); + final HasPrivilegesResponse hasPrivilegesResponse = future3.actionGet(); + assertThat(hasPrivilegesResponse.getUsername(), equalTo(requestedUsername)); + assertThat(hasPrivilegesResponse.isCompleteMatch(), is(isCompleteMatch)); + } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java index 60d9b704255d3..579d889042a48 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ElasticsearchClient; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexAbstraction; @@ -45,6 +46,9 @@ import org.elasticsearch.xpack.core.security.action.user.ChangePasswordRequest; import org.elasticsearch.xpack.core.security.action.user.ChangePasswordRequestBuilder; import org.elasticsearch.xpack.core.security.action.user.DeleteUserAction; +import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesAction; +import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesRequest; +import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesRequestBuilder; import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesResponse; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -54,6 +58,7 @@ import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper; import org.elasticsearch.xpack.core.security.authc.AuthenticationTests; +import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo; import org.elasticsearch.xpack.core.security.authc.esnative.NativeRealmSettings; import org.elasticsearch.xpack.core.security.authc.file.FileRealmSettings; import org.elasticsearch.xpack.core.security.authc.ldap.LdapRealmSettings; @@ -414,6 +419,22 @@ public void testSameUserPermissionDeniesApiKeyInfoRetrievalWhenLookedupByIsPrese assertFalse(RBACEngine.checkSameUserPermissions(GetApiKeyAction.NAME, request, authentication)); } + public void testSameUserPermissionForCrossClusterAccess() { + final CrossClusterAccessSubjectInfo ccaSubjectInfo = AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo(); + final Authentication authentication = AuthenticationTestHelper.builder().apiKey().build().toCrossClusterAccess(ccaSubjectInfo); + + // HasPrivileges is allowed + final HasPrivilegesRequest hasPrivilegesRequest = new HasPrivilegesRequest(); + hasPrivilegesRequest.username(ccaSubjectInfo.getAuthentication().getEffectiveSubject().getUser().principal()); + assertTrue(RBACEngine.checkSameUserPermissions(HasPrivilegesAction.NAME, hasPrivilegesRequest, authentication)); + + // Other actions, e.g. GetUserPrivilegesAction, are not allowed even if they are allowed when performing within a single cluster + final GetUserPrivilegesRequest getUserPrivilegesRequest = new GetUserPrivilegesRequestBuilder(mock(ElasticsearchClient.class)) + .username(ccaSubjectInfo.getAuthentication().getEffectiveSubject().getUser().principal()) + .request(); + assertFalse(RBACEngine.checkSameUserPermissions(GetUserPrivilegesAction.NAME, getUserPrivilegesRequest, authentication)); + } + /** * This tests that action names in the request are considered "matched" by the relevant named privilege * (in this case that {@link DeleteAction} and {@link IndexAction} are satisfied by {@link IndexPrivilege#WRITE}). diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index abedb3d443482..de14c99a089c8 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -10,6 +10,8 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.support.DestructiveOperations; @@ -661,19 +663,66 @@ private RemoteClusterCredentialsResolver mockRemoteClusterCredentialsResolver(St return remoteClusterCredentialsResolver; } - public void testSendWithCrossClusterAccessHeaders() throws Exception { + public void testSendWithCrossClusterAccessHeadersForSystemUserRegularAction() throws Exception { assumeTrue("untrusted remote cluster feature flag must be enabled", TcpTransport.isUntrustedRemoteClusterEnabled()); + final String action; + final TransportRequest request; + if (randomBoolean()) { + action = randomAlphaOfLengthBetween(5, 30); + request = mock(TransportRequest.class); + } else { + action = ClusterStateAction.NAME; + request = mock(ClusterStateRequest.class); + } + doTestSendWithCrossClusterAccessHeaders( + true, + action, + request, + AuthenticationTestHelper.builder().internal(SystemUser.INSTANCE).build() + ); + } - final String authType = randomFrom("internal", "user", "apikey"); - final Authentication authentication = switch (authType) { - case "internal" -> AuthenticationTestHelper.builder().internal(SystemUser.INSTANCE).build(); - case "user" -> AuthenticationTestHelper.builder() - .user(new User(randomAlphaOfLengthBetween(3, 10), randomRoles())) - .realm() - .build(); - case "apikey" -> AuthenticationTestHelper.builder().apiKey().build(); - default -> throw new IllegalStateException("unexpected case"); - }; + public void testSendWithCrossClusterAccessHeadersForSystemUserCcrInternalAction() throws Exception { + final String action = randomFrom( + "internal:admin/ccr/restore/session/put", + "internal:admin/ccr/restore/session/clear", + "internal:admin/ccr/restore/file_chunk/get" + ); + final TransportRequest request = mock(TransportRequest.class); + doTestSendWithCrossClusterAccessHeaders( + true, + action, + request, + AuthenticationTestHelper.builder().internal(SystemUser.INSTANCE).build() + ); + } + + public void testSendWithCrossClusterAccessHeadersForRegularUserRegularAction() throws Exception { + final Authentication authentication = randomValueOtherThanMany( + authc -> authc.getAuthenticationType() == Authentication.AuthenticationType.INTERNAL, + () -> AuthenticationTestHelper.builder().build() + ); + final String action = randomAlphaOfLengthBetween(5, 30); + final TransportRequest request = mock(TransportRequest.class); + doTestSendWithCrossClusterAccessHeaders(false, action, request, authentication); + } + + public void testSendWithCrossClusterAccessHeadersForRegularUserClusterStateAction() throws Exception { + final Authentication authentication = randomValueOtherThanMany( + authc -> authc.getAuthenticationType() == Authentication.AuthenticationType.INTERNAL, + () -> AuthenticationTestHelper.builder().build() + ); + final String action = ClusterStateAction.NAME; + final TransportRequest request = mock(ClusterStateRequest.class); + doTestSendWithCrossClusterAccessHeaders(true, action, request, authentication); + } + + private void doTestSendWithCrossClusterAccessHeaders( + boolean shouldAssertForSystemUser, + String action, + TransportRequest request, + Authentication authentication + ) throws IOException { authentication.writeToContext(threadContext); final String expectedRequestId = AuditUtil.getOrGenerateRequestId(threadContext); final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class); @@ -706,12 +755,13 @@ public void testSendWithCrossClusterAccessHeaders() throws Exception { ); final AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + final AtomicReference sentAction = new AtomicReference<>(); final AtomicReference sentCredential = new AtomicReference<>(); final AtomicReference sentCrossClusterAccessSubjectInfo = new AtomicReference<>(); final AsyncSender sender = interceptor.interceptSender(new AsyncSender() { @Override public void sendRequest( - Transport.Connection connection, + Connection connection, String action, TransportRequest request, TransportRequestOptions options, @@ -722,6 +772,7 @@ public void sendRequest( } assertThat(securityContext.getAuthentication(), nullValue()); assertThat(AuditUtil.extractRequestId(securityContext.getThreadContext()), equalTo(expectedRequestId)); + sentAction.set(action); sentCredential.set(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY)); try { sentCrossClusterAccessSubjectInfo.set( @@ -733,9 +784,10 @@ public void sendRequest( handler.handleResponse(null); } }); - final Transport.Connection connection = mock(Transport.Connection.class); + final Connection connection = mock(Connection.class); when(connection.getTransportVersion()).thenReturn(TransportVersion.CURRENT); - sender.sendRequest(connection, "action", mock(TransportRequest.class), null, new TransportResponseHandler<>() { + + sender.sendRequest(connection, action, request, null, new TransportResponseHandler<>() { @Override public void handleResponse(TransportResponse response) { // Headers should get restored before handle response is called @@ -754,7 +806,7 @@ public TransportResponse read(StreamInput in) { return null; } }); - if (authType.equals("internal")) { + if (shouldAssertForSystemUser) { assertThat( sentCrossClusterAccessSubjectInfo.get(), equalTo( @@ -786,6 +838,11 @@ public TransportResponse read(StreamInput in) { ); } assertTrue(calledWrappedSender.get()); + if (action.startsWith("internal:")) { + assertThat(sentAction.get(), equalTo("indices:internal/" + action.substring("internal:".length()))); + } else { + assertThat(sentAction.get(), equalTo(action)); + } assertThat(sentCredential.get(), equalTo(remoteClusterCredential)); verify(securityContext, never()).executeAsInternalUser(any(), any(), anyConsumer()); verify(remoteClusterCredentialsResolver, times(1)).resolve(eq(remoteClusterAlias));