Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate retention leases to recovery from remote #38829

Merged
merged 40 commits into from
Feb 16, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
82d464d
Integrate retention leases to recovery from remote
jasontedor Feb 13, 2019
6138176
Fix accidental import
jasontedor Feb 13, 2019
9ca7526
Update x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/re…
jasontedor Feb 13, 2019
530b55b
Update x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/re…
jasontedor Feb 13, 2019
f6e129e
Force follower to fall behind
jasontedor Feb 13, 2019
d294e99
Add more tests
jasontedor Feb 13, 2019
11120d4
Merge branch 'master' into retention-lease-ccr
jasontedor Feb 13, 2019
b13c6c5
Make linter happy
jasontedor Feb 13, 2019
ab4d61c
Fix lost import
jasontedor Feb 13, 2019
30f1366
Finally
jasontedor Feb 13, 2019
e58bbbc
Merge branch 'master' into retention-lease-ccr
jasontedor Feb 13, 2019
d5dee9b
Tweak some log messages
jasontedor Feb 13, 2019
ae9cff2
wip
jasontedor Feb 14, 2019
d027302
Merge remote-tracking branch 'elastic/master' into retention-lease-ccr
jasontedor Feb 14, 2019
a42d229
Fix under security
jasontedor Feb 14, 2019
2e6e705
Shift log message
jasontedor Feb 14, 2019
cbe044c
More tests
jasontedor Feb 14, 2019
0ac5533
Fix comment
jasontedor Feb 14, 2019
6791827
Fix imports
jasontedor Feb 14, 2019
0dcf6c9
Remove newline
jasontedor Feb 14, 2019
b8b2537
Merge remote-tracking branch 'elastic/master' into retention-lease-ccr
jasontedor Feb 14, 2019
1633dfb
Merge branch 'master' into retention-lease-ccr
jasontedor Feb 14, 2019
4005309
Merge branch 'master' into retention-lease-ccr
jasontedor Feb 14, 2019
086e87e
Refactor to helper
jasontedor Feb 14, 2019
69d757c
Enhance logging messages
jasontedor Feb 15, 2019
cdf9bdc
Refactor
jasontedor Feb 15, 2019
fa7b4df
Merge remote-tracking branch 'elastic/master' into retention-lease-ccr
jasontedor Feb 15, 2019
5bbb07f
Fix imports
jasontedor Feb 15, 2019
6fc440b
Background syncs should be async
jasontedor Feb 15, 2019
30232bb
Remove leftover comment from previous iteration
jasontedor Feb 15, 2019
d45d829
Merge branch 'master' into retention-lease-ccr
jasontedor Feb 15, 2019
8fd90fe
Fix imports
jasontedor Feb 15, 2019
21edf1d
Fix mocking tests
jasontedor Feb 16, 2019
7e547ee
Refactor
jasontedor Feb 16, 2019
d471872
Update x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/re…
jasontedor Feb 16, 2019
fc9a300
Update x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/re…
jasontedor Feb 16, 2019
2c77ad4
Merge remote-tracking branch 'elastic/master' into retention-lease-ccr
jasontedor Feb 16, 2019
6da443a
Tweak logging message
jasontedor Feb 16, 2019
4262372
Merge branch 'master' into retention-lease-ccr
jasontedor Feb 16, 2019
dec3d28
Test awaits fix
jasontedor Feb 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse> {

public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/clear";
public static final String NAME = "internal:admin/ccr/restore/session/clear";

private ClearCcrRestoreSessionAction() {
super(NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -29,13 +32,17 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -55,6 +62,7 @@
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
Expand All @@ -77,11 +85,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;


Expand All @@ -91,6 +102,8 @@
*/
public class CcrRepository extends AbstractLifecycleComponent implements Repository {

private static final Logger logger = LogManager.getLogger(CcrRepository.class);

public static final String LATEST = "_latest_";
public static final String TYPE = "_ccr_";
public static final String NAME_PREFIX = "_ccr_";
Expand Down Expand Up @@ -291,12 +304,47 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
store.decRef();
}

Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);

Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());

Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
final String retentionLeaseId = indexShard.shardId().getIndex().getUUID() + "-following-" + leaderUUID;
logger.trace(
() -> new ParameterizedMessage("{} requesting leader primary to add retention lease [" + retentionLeaseId + "]", shardId));
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
final Optional<RetentionLeaseAlreadyExistsException> maybeAddAlready =
addRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
maybeAddAlready.ifPresent(addAlready -> {
logger.trace(() -> new ParameterizedMessage("{} retention lease already exists, requesting a renewal", shardId), addAlready);
final Optional<RetentionLeaseNotFoundException> maybeRenewNotFound =
renewRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
maybeRenewNotFound.ifPresent(renewNotFound -> {
logger.trace(() -> new ParameterizedMessage(
"{} retention lease not found while attempting to renew, attempting a final add", shardId), renewNotFound);
final Optional<RetentionLeaseAlreadyExistsException> maybeFallbackAddAlready =
addRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> {
/*
* At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the
* lease, it expired or was removed. We tried to add the lease again and it already exists? Bail.
*/
assert false : fallbackAddAlready;
throw fallbackAddAlready;
});
});
});

// schedule renewals to run during the restore
final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay(
() -> {
logger.info("{} background renewing retention lease during restore", shardId);
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
renewRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we except this to possibly throw an exception, should we log it here instead of bubbling it up to uncaught exception handler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I took care of this. There was more to do here, including authorizing for security. See a42d229.

},
RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()),
Ccr.CCR_THREAD_POOL_NAME);

// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
// response, we should be able to retry by creating a new session.
String name = metadata.name();
Expand All @@ -305,9 +353,48 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index());
} catch (Exception e) {
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
} finally {
renewable.cancel();
}
}

private Optional<RetentionLeaseAlreadyExistsException> addRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient) {
try {
final RetentionLeaseActions.AddRequest request =
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout());
return Optional.empty();
} catch (final RetentionLeaseAlreadyExistsException e) {
return Optional.of(e);
}
}

private Optional<RetentionLeaseNotFoundException> renewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient) {
try {
final RetentionLeaseActions.RenewRequest request =
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout());
return Optional.empty();
} catch (final RetentionLeaseNotFoundException e) {
return Optional.of(e);
}
}

// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
Setting.timeSetting(
"index.ccr.retention_lease.renew_interval",
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic,
Setting.Property.IndexScope);

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
Expand All @@ -22,8 +23,11 @@
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand All @@ -35,6 +39,7 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -58,6 +63,9 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -99,10 +107,12 @@
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.empty;
Expand All @@ -114,6 +124,10 @@ public abstract class CcrIntegTestCase extends ESTestCase {

private static ClusterGroup clusterGroup;

protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.emptyList();
}

@Before
public final void startClusters() throws Exception {
if (clusterGroup != null && reuseClusters()) {
Expand Down Expand Up @@ -224,7 +238,10 @@ public Path nodeConfigPath(int nodeOrdinal) {

@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class);
return Stream.concat(
Stream.of(LocalStateCcr.class, CommonAnalysisPlugin.class),
CcrIntegTestCase.this.nodePlugins().stream())
.collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -639,6 +656,61 @@ public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTim
return lastKnownCount.get();
}

protected ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(
final ClusterService clusterService,
final ActionListener<RestoreInfo> listener) {
return new ActionListener<RestoreService.RestoreCompletionResponse>() {

@Override
public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
final String uuid = restoreCompletionResponse.getUuid();

final ClusterStateListener clusterStateListener = new ClusterStateListener() {

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
/*
* When there is a master failure after a restore has been started, this listener might not be registered
* on the current master and as such it might miss some intermediary cluster states due to batching.
* Clean up the listener in that case and acknowledge completion of restore operation to client.
*/
clusterService.removeListener(this);
listener.onResponse(null);
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
logger.debug("restore of [{}] completed", snapshot);
listener.onResponse(ri);
} else {
// restore not completed yet, wait for next cluster state update
}
}

};

clusterService.addListener(clusterStateListener);
} else {
listener.onResponse(restoreCompletionResponse.getRestoreInfo());
}
}

@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}

};
}

static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {
Expand Down
Loading