Skip to content

Commit

Permalink
Use retention lease in peer recovery of closed indices (elastic#48430)
Browse files Browse the repository at this point in the history
Today we do not use retention leases in peer recovery for closed indices
because we can't sync retention leases on closed indices. This change
allows that ability and adjusts peer recovery to use retention leases
for all indices with soft-deletes enabled.

Relates elastic#45136

Co-authored-by: David Turner <david.turner@elastic.co>
  • Loading branch information
dnhatn and DaveCTurner committed Nov 24, 2019
1 parent 777f6d5 commit 94c7f57
Show file tree
Hide file tree
Showing 16 changed files with 258 additions and 623 deletions.
Expand Up @@ -91,7 +91,7 @@ public synchronized void rescheduleIfNecessary() {
if (logger.isTraceEnabled()) {
logger.trace("scheduling {} every {}", toString(), interval);
}
cancellable = threadPool.schedule(this, interval, getThreadPool());
cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool());
isScheduledOrRunning = true;
} else {
logger.trace("scheduled {} disabled", toString());
Expand Down
Expand Up @@ -838,10 +838,7 @@ private boolean invariant() {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}

if (primaryMode
&& indexSettings.isSoftDeleteEnabled()
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases) {
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
Expand Down Expand Up @@ -909,7 +906,9 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
Expand Down Expand Up @@ -1022,34 +1021,32 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() {
assert primaryMode;
assert Thread.holdsLock(this);

if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
hasAllPeerRecoveryRetentionLeases = true;
}
}

Expand Down Expand Up @@ -1371,10 +1368,7 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() {
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
*/
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (indexSettings().isSoftDeleteEnabled()
&& indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases == false) {

if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases();
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -37,17 +38,21 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -61,8 +66,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
RetentionLeaseBackgroundSyncAction.Request,
ReplicationResponse> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";

public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
Expand Down Expand Up @@ -94,37 +98,50 @@ public RetentionLeaseBackgroundSyncAction(
ThreadPool.Names.MANAGEMENT);
}

/**
* Background sync the specified retention leases for the specified shard.
*
* @param shardId the shard to sync
* @param retentionLeases the retention leases to sync
*/
public void backgroundSync(
final ShardId shardId,
final RetentionLeases retentionLeases) {
Objects.requireNonNull(shardId);
Objects.requireNonNull(retentionLeases);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
execute(
new Request(shardId, retentionLeases),
ActionListener.wrap(
r -> {},
e -> {
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
// the shard is closed
return;
}
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
}));
}
@Override
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
assert false : "use RetentionLeaseBackgroundSyncAction#backgroundSync";
}

final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
taskManager.unregister(task);
}

@Override
public void handleException(TransportException e) {
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
// the shard is closed
return;
}
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
}
});
}

@Override
Expand Down Expand Up @@ -174,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException {
retentionLeases.writeTo(out);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, "retention_lease_background_sync shardId=" + shardId, parentTaskId, headers);
}

@Override
public String toString() {
return "RetentionLeaseBackgroundSyncAction.Request{" +
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -39,16 +40,20 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -58,8 +63,7 @@
public class RetentionLeaseSyncAction extends
TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";

public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
Expand Down Expand Up @@ -91,35 +95,47 @@ public RetentionLeaseSyncAction(
ThreadPool.Names.MANAGEMENT, false);
}

/**
* Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails.
*
* @param shardId the shard to sync
* @param retentionLeases the retention leases to sync
* @param listener the callback to invoke when the sync completes normally or abnormally
*/
public void sync(
final ShardId shardId,
final RetentionLeases retentionLeases,
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(shardId);
Objects.requireNonNull(retentionLeases);
Objects.requireNonNull(listener);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
execute(
new RetentionLeaseSyncAction.Request(shardId, retentionLeases),
ActionListener.wrap(
listener::onResponse,
e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
}
listener.onFailure(e);
}));
}
@Override
protected void doExecute(Task parentTask, Request request, ActionListener<Response> listener) {
assert false : "use RetentionLeaseSyncAction#sync";
}

final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener) {
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
taskManager.unregister(task);
listener.onResponse(response);
}

@Override
public void handleException(TransportException e) {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
}
task.setPhase("finished");
taskManager.unregister(task);
listener.onFailure(e);
}
});
}

@Override
Expand Down Expand Up @@ -175,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException {
retentionLeases.writeTo(out);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, "retention_lease_sync shardId=" + shardId, parentTaskId, headers);
}

@Override
public String toString() {
return "RetentionLeaseSyncAction.Request{" +
Expand Down

0 comments on commit 94c7f57

Please sign in to comment.