Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove retention leases when unfollowing #39088

Merged
merged 11 commits into from
Feb 20, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,20 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.ShardId;

import java.util.Locale;
import java.util.Optional;

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;

public class CcrRetentionLeases {

Expand Down Expand Up @@ -37,4 +48,113 @@ public static String retentionLeaseId(
leaderIndex.getUUID());
}

/**
* Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseAlreadyExistsException e) {
return Optional.of(e);
}
}

/**
* Asynchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response
* or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.AddRequest request =
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
}

/**
* Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseNotFoundException e) {
return Optional.of(e);
}
}

/**
* Asynchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the
* given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
* response or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RenewRequest request =
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
}

/**
* Asynchronously requests to remove a retention lease with the specified retention lease ID on the specified leader shard using the
* given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
* response or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncRemoveRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RemoveRequest request = new RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId);
remoteClient.execute(RetentionLeaseActions.Remove.INSTANCE, request, listener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@

package org.elasticsearch.xpack.ccr.action;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -20,22 +26,46 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class TransportUnfollowAction extends TransportMasterNodeAction<UnfollowAction.Request, AcknowledgedResponse> {

private final Client client;

@Inject
public TransportUnfollowAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
UnfollowAction.Request::new, indexNameExpressionResolver);
public TransportUnfollowAction(
final TransportService transportService,
final ClusterService clusterService,
final ThreadPool threadPool,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Client client) {
super(
UnfollowAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
UnfollowAction.Request::new,
indexNameExpressionResolver);
this.client = Objects.requireNonNull(client);
}

@Override
Expand All @@ -49,26 +79,128 @@ protected AcknowledgedResponse newResponse() {
}

@Override
protected void masterOperation(UnfollowAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
protected void masterOperation(
final UnfollowAction.Request request,
final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState current) throws Exception {
public ClusterState execute(final ClusterState current) {
String followerIndex = request.getFollowerIndex();
return unfollow(followerIndex, current);
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
final IndexMetaData indexMetaData = oldState.metaData().index(request.getFollowerIndex());
final Map<String, String> ccrCustomMetaData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final String remoteClusterName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY);
final Client remoteClient = client.getRemoteClusterClient(remoteClusterName);
final String leaderIndexName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
final String leaderIndexUuid = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
final Index leaderIndex = new Index(leaderIndexName, leaderIndexUuid);
final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId(
oldState.getClusterName().value(),
indexMetaData.getIndex(),
remoteClusterName,
leaderIndex);
final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings());

final GroupedActionListener<RetentionLeaseActions.Response> groupListener = new GroupedActionListener<>(
new ActionListener<Collection<RetentionLeaseActions.Response>>() {

@Override
public void onResponse(final Collection<RetentionLeaseActions.Response> responses) {
logger.trace(
"[{}] removed retention lease [{}] on all leader primary shards",
indexMetaData.getIndex(),
retentionLeaseId);
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(final Exception e) {
logger.warn(new ParameterizedMessage(
"[{}] failure while removing retention lease [{}] on leader primary shards",
indexMetaData.getIndex(),
retentionLeaseId),
e);
final ElasticsearchException wrapper = new ElasticsearchException(e);
wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId);
listener.onFailure(wrapper);
}

},
numberOfShards,
Collections.emptyList());
for (int i = 0; i < numberOfShards; i++) {
final ShardId followerShardId = new ShardId(indexMetaData.getIndex(), i);
final ShardId leaderShardId = new ShardId(leaderIndex, i);
removeRetentionLeaseForShard(
followerShardId,
leaderShardId,
retentionLeaseId,
remoteClient,
ActionListener.wrap(
groupListener::onResponse,
e -> handleException(
followerShardId,
retentionLeaseId,
leaderShardId,
groupListener,
e)));
}
}

private void removeRetentionLeaseForShard(
final ShardId followerShardId,
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
// we have to execute under the system context so that if security is enabled the removal is authorized
threadContext.markAsSystemContext();
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener);
}
}

private void handleException(
final ShardId followerShardId,
final String retentionLeaseId,
final ShardId leaderShardId,
final ActionListener<RetentionLeaseActions.Response> listener,
final Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
assert cause instanceof ElasticsearchSecurityException == false : e;
if (cause instanceof RetentionLeaseNotFoundException) {
// treat as success
logger.trace(new ParameterizedMessage(
"{} retention lease [{}] not found on {} while unfollowing",
followerShardId,
retentionLeaseId,
leaderShardId),
e);
listener.onResponse(new RetentionLeaseActions.Response());
} else {
logger.warn(new ParameterizedMessage(
"{} failed to remove retention lease [{}] on {} while unfollowing",
followerShardId,
retentionLeaseId,
leaderShardId),
e);
listener.onFailure(e);
}
}

});
}

Expand Down
Loading