Be more resilient to partial network partitions #8720

Closed
wants to merge 1 commit into
from

Projects

None yet

3 participants

@bleskes
Member
bleskes commented Dec 1, 2014

When a node is experience network issues, the master detects it and removes the node from the cluster. That cause all ongoing recoveries from and to that nodes to be stopped and a new location is found for the relevant shards. However, in the case partial network partition, where there a connectivity issues between the source and target node of a recovery but not between those nodes and the current master things may go wrong. While the nodes successfully restore the connection, the on going recoveries may have encounter issues.

This PR adds a test that simulate disconnecting nodes and dropping requests during the various stages of recovery and solves all the issues that were raised by it. In short:

  1. On going recoveries will be scheduled for retry upon network disconnect. The default retry period is 5s (cross node connections are checked every 10s by default).
  2. Sometimes the disconnect happens after the target engine has started (but the shard is still in recovery). For simplicity, I opted to restart the recovery from scratch (where little to no files will be copied again, because there were just synced).
  3. To protected against dropped requests, a Recovery Monitor was added that fails a recovery if no progress has been made in the last 30m (by default), which is equivalent to the long time outs we use in recovery requests.
  4. When a shard fails on a node, we try to assign it to another node. If no such node is available, the shard will remain unassigned, causing the target node to clean any in memory state for it (files on disk remain). At the moment the shard will remain unassigned until another cluster state change happens, which will re-assigned it to the node in question but if no such change happens the shard will remain stuck at unassigned. The commits adds an extra delayed reroute in such cases to make sure the shard will be reassinged
  5. Moved all recovery related settings to the RecoverySettings.

I'd love it if extra focus was give to the engine changes while reviewing - I'm not 100% familiar with the implications of the code to the underlying lucene state.

There is also one nocommit regarding the Java serializability of a Version object (used by DiscoveryNode). We rely on Java serialization for exceptions and this makes the ConnectTransportException unserializable because of it's DiscoveryNode field. This can be fixed in another change, but we need to discuss how.

One more todo left - add a reference to the resiliency page

@s1monw s1monw and 1 other commented on an outdated diff Dec 1, 2014
src/main/java/org/elasticsearch/Version.java
@@ -587,4 +589,16 @@ protected void configure() {
bind(Version.class).toInstance(version);
}
}
+
+ // we need custom serialization logic because org.apache.lucene.util.Version is not serializable
+ // nocommit - do we want to push this down to lucene?
+ private void writeObject(java.io.ObjectOutputStream out)
@s1monw
s1monw Dec 1, 2014 Contributor

I don't think this is going to happen - lucene opted out of Serializable for a long time now I don't think we should add it back. I'd rather drop our dependency on it to be honest!

@bleskes
bleskes Dec 2, 2014 Member

I'm all for controlling the serialization better (i.e., having versioning support) but that's a hard thing. At the moment we can't serialize transport related exceptions so imho we should fix this first. Since the version's variables are final, I had to move this to the disco node level.

@s1monw s1monw and 1 other commented on an outdated diff Dec 1, 2014
...asticsearch/index/engine/internal/InternalEngine.java
@@ -354,6 +316,39 @@ public void start() throws EngineException {
}
}
+ @Override
+ public void stop() {
@s1monw
s1monw Dec 1, 2014 Contributor

I see what you want to do here - honestly I'd really want to get away from the fact that this thing needs to be "started" and can be stopped. I think what we should do is to factor this logic out into a sep class. The class can then be called InternalEngineDelegate that is used in the entire lifecycle shenanigans. We'd have our InternalEngine as we have it today but it initializes everything at construction times, everything is final that can be final etc. We only implement closeable etc. that way we can just create a new InternalEngine when we call start() and trash it entirely when we call stop() IMO that is the right way to go here.

@bleskes
bleskes Dec 2, 2014 Member

I moved to re-creating the engine, but based on Guice injectors instead of a delegate engine. I think it's simpler

@s1monw s1monw commented on an outdated diff Dec 1, 2014
...ticsearch/index/shard/service/InternalIndexShard.java
@@ -749,6 +728,19 @@ public void performRecoveryPrepareForTranslog() throws ElasticsearchException {
engine.start();
}
+ /** called if recovery has to be restarted after network error / delay ** */
+ public void performRecoveryRestart() {
+ if (state != IndexShardState.RECOVERING) {
+ throw new IndexShardNotRecoveringException(shardId, state);
+ }
+ if (engine.isStarted()) {
+ engine.stop();
+ // back to defaults..
+ engine.enableGcDeletes(true);
@s1monw
s1monw Dec 1, 2014 Contributor

why do we set this to true?

@s1monw s1monw commented on an outdated diff Dec 1, 2014
...sticsearch/indices/recovery/RecoveriesCollection.java
@@ -181,5 +189,40 @@ public RecoveryStatus status() {
return status;
}
}
+
+ private class RecoveryMonitor implements Runnable {
+ final private long recoveryId;
@s1monw
s1monw Dec 1, 2014 Contributor

can we make this private final :)

@s1monw s1monw commented on the diff Dec 1, 2014
...rg/elasticsearch/indices/recovery/RecoveryStatus.java
@@ -115,6 +120,16 @@ public Store store() {
return store;
}
+ /** return the last time this RecoveryStatus was used (based on System.nanoTime() */
+ public long lastAccessTime() {
+ return lastAccessTime;
+ }
+
+ /** sets the lasAccessTime flag to now */
+ public void setLastAccessTime() {
@s1monw
s1monw Dec 1, 2014 Contributor

can you make this final? you should not call non-final method from ctors

@bleskes
bleskes Dec 2, 2014 Member

removed it from the constructor instead

@s1monw s1monw and 1 other commented on an outdated diff Dec 1, 2014
...rg/elasticsearch/indices/recovery/RecoveryStatus.java
@@ -180,6 +196,11 @@ public void fail(RecoveryFailedException e, boolean sendShardFailure) {
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
+ final Thread thread = waitingRecoveryThread.get();
@s1monw
s1monw Dec 1, 2014 Contributor

can we maybe factor out the CancelableThreads from ShardRecoveryHandler and impl that here too?

@bleskes
bleskes Dec 2, 2014 Member

will do.

@s1monw s1monw and 1 other commented on an outdated diff Dec 1, 2014
...rg/elasticsearch/indices/recovery/RecoveryStatus.java
@@ -239,28 +260,37 @@ public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData meta
return indexOutput;
}
+ public void resetRecovery() {
@s1monw
s1monw Dec 1, 2014 Contributor

try / finally ?

@bleskes
bleskes Dec 2, 2014 Member

doubting - which exceptions are safe to ignore?

@s1monw
s1monw Dec 15, 2014 Contributor

I don't know why are we doing it in #closeInternal() ?

@bleskes
bleskes Dec 15, 2014 Member

the closeInternal intent is to clean up as much as possible in the case that the recovery is cancelled / closed. In this case the openIndexOutputs will be reused - I'm not sure what the safe option here is. I was opting for throwing exceptions which will cause the recovery be cancelled.

@s1monw s1monw commented on the diff Dec 1, 2014
...rg/elasticsearch/indices/recovery/RecoveryStatus.java
} finally {
// free store. increment happens in constructor
store.decRef();
}
}
+ protected void cleanOpenFiles() {
+ // clean open index outputs
+ Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
@s1monw
s1monw Dec 1, 2014 Contributor

maybe

IOUtils.closeWhileHandlingException(openIndexOutputs.values());
openIndexOutputs.clear();
@bleskes
bleskes Dec 2, 2014 Member

I'm not sure that's concurrently safe. Another change?

@s1monw
s1monw Dec 15, 2014 Contributor

ok

@s1monw
Contributor
s1monw commented Dec 1, 2014

left a bunch of comments - I love the test :)

@bleskes
Member
bleskes commented Dec 2, 2014

@s1monw I pushed and update based on feedback. Note that I also modified the CancelableThreads a bit when extracting it. It's not identical.

@s1monw
Contributor
s1monw commented Dec 2, 2014

@bleskes I looked at the two commits. I like the second one but do not like the first one. I think your changes to the creation model makes thing even more complex and error prone than they are already. Then engine should not be started / not started / stopped and have some state in between can might or might not be cleaned up. I think we should have a dedicated class that we might even can use without all the guice stuff that initializes everything it needs in the constructor. It might even create a new instance if we update settings etc. and tear down anything during that time. That is much cleaner and then you can do the start stop logic cleanup on top.

@bleskes
Member
bleskes commented Dec 10, 2014

@s1monw I rebased this against master and adapted it to the changes in #8784 . I also moved all recovery settings to one place - i.e., RecoverySettings

@s1monw s1monw commented on the diff Dec 10, 2014
src/main/java/org/elasticsearch/Version.java
/**
*/
@SuppressWarnings("deprecation")
-public class Version implements Serializable {
@s1monw
s1monw Dec 10, 2014 Contributor

is this still needed?

@bleskes
bleskes Dec 10, 2014 Member

Do you mean removing the Serializable? yeah, because it's not. It depends on org.apache.lucene.util.Version

@s1monw s1monw and 1 other commented on an outdated diff Dec 10, 2014
...org/elasticsearch/cluster/routing/RoutingService.java
@@ -153,8 +158,8 @@ public void onNoLongerMaster(String source) {
@Override
public void onFailure(String source, Throwable t) {
- ClusterState state = clusterService.state();
- logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
+ ClusterState state = clusterService.state();
+ logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
@s1monw
s1monw Dec 10, 2014 Contributor

can this state be large potentially?

@bleskes
bleskes Dec 10, 2014 Member

Yeah, I can trim it at 1024KB?

@s1monw
s1monw Dec 15, 2014 Contributor

I don't think we should do this in a log. This can be massive and just the printing can hang up a node I guess. I wonder if we can print this state only if trace is enable?

@s1monw s1monw and 1 other commented on an outdated diff Dec 10, 2014
.../org/elasticsearch/common/util/CancelableThreads.java
+ } finally {
+ remove();
+ }
+ if (failIfCanceled) {
+ failIfCanceled();
+ }
+ }
+
+
+ private synchronized void remove() {
+ threads.remove(Thread.currentThread());
+ failIfCanceled();
+ }
+
+ /** cancel all current running operations. Future calls to {@link #failIfCanceled()} will be failed with the given reason */
+ public synchronized void cancel(String reason) {
@s1monw
s1monw Dec 10, 2014 Contributor

I think we should barf if we cancel twice?

@bleskes
bleskes Dec 10, 2014 Member

I'm not sure it adds value. The idea of the class is to make this simple and safe to use - so you can cancel from anywhere at any time without thinking about synchronisation? . Potentially we want to accumulate the reasons?

@s1monw
s1monw Dec 15, 2014 Contributor

yeah ok I think we sholud only do it once and then barf to be honest I think executing this twice is a bug

@s1monw s1monw self-assigned this Dec 15, 2014
@s1monw s1monw and 1 other commented on an outdated diff Dec 15, 2014
...ticsearch/index/shard/service/InternalIndexShard.java
@@ -754,6 +733,16 @@ public void performRecoveryPrepareForTranslog() throws ElasticsearchException {
engine.start();
}
+ /** called if recovery has to be restarted after network error / delay ** */
+ public void performRecoveryRestart() {
@s1monw
s1monw Dec 15, 2014 Contributor

I still wonder if it is needed to stop the engine - can't we just replay the translog more than once?

@bleskes
bleskes Dec 15, 2014 Member

just replaying the translog is possible, but that would mean we'd have to make sure the primary doesn't flush in the mean time (the 10s before we retry). Right now, a disconnect will release all resources on the primary and restart from scratch. I still feel this is the right way to go but happy to hear alternatives.

@s1monw s1monw commented on an outdated diff Dec 15, 2014
...sticsearch/indices/recovery/RecoveriesCollection.java
@@ -181,5 +189,40 @@ public RecoveryStatus status() {
return status;
}
}
+
+ private class RecoveryMonitor implements Runnable {
+ private final long recoveryId;
+ private final TimeValue checkInterval;
+
+ private long lastSeenAccessTime;
+
+ private RecoveryMonitor(long recoveryId, long lastSeenAccessTime, TimeValue checkInterval) {
+ this.recoveryId = recoveryId;
+ this.checkInterval = checkInterval;
+ this.lastSeenAccessTime = lastSeenAccessTime;
+ }
+
+ @Override
+ public void run() {
@s1monw
s1monw Dec 15, 2014 Contributor

how do we deal with failures here? we just let it barf and escape?

@s1monw s1monw commented on an outdated diff Dec 15, 2014
.../elasticsearch/indices/recovery/RecoverySettings.java
@@ -46,7 +46,20 @@
public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams";
public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams";
public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec";
- public static final String INDICES_RECOVERY_RETRY_DELAY = "indices.recovery.retry_delay";
+ // how long to wait before retrying after issues cause by cluster state syncing between nodes
+ // i.e., local node is not yet known on remote node, remote shard not yet started etc.
+ public static final String INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC = "indices.recovery.retry_delay_state_sync";
+
+ // how long to wait before retrying after network related issues
+ public static final String INDICES_RECOVERY_RETRY_DELAY_NETWORK = "indices.recovery.retry_delay_network";
+
@s1monw
s1monw Dec 15, 2014 Contributor

it would be awesome to have some doc-strings on these settings

@s1monw s1monw and 1 other commented on an outdated diff Dec 15, 2014
...rg/elasticsearch/indices/recovery/RecoveryTarget.java
@Override
- public RecoveryResponse newInstance() {
- return new RecoveryResponse();
+ public void run() throws InterruptedException {
+ recoveryResponse[0] = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
+ @Override
+ public RecoveryResponse newInstance() {
+ return new RecoveryResponse();
@s1monw
s1monw Dec 15, 2014 Contributor

doing this via reflection would be nice too :)

@bleskes
bleskes Dec 15, 2014 Member

can you clarify?

@s1monw s1monw commented on an outdated diff Dec 15, 2014
...sticsearch/indices/recovery/ShardRecoveryHandler.java
@@ -95,23 +90,20 @@ protected void fail(String reason) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");
} else {
- throw new ElasticsearchException("recovery was canceled reason [" + reason + "]");
+ throw new ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
@s1monw
s1monw Dec 15, 2014 Contributor

g00d!

@s1monw s1monw commented on an outdated diff Dec 15, 2014
...sticsearch/indices/recovery/ShardRecoveryHandler.java
@@ -320,7 +312,7 @@ public void run() throws InterruptedException {
fileIndex++;
}
- cancelableThreads.run(new Interruptable() {
+ cancelableThreads.run(new CancelableThreads.Interruptable() {
@s1monw
s1monw Dec 15, 2014 Contributor

maybe add an import instead of these inner class references?

@s1monw s1monw commented on an outdated diff Dec 15, 2014
...lasticsearch/indices/recovery/IndexRecoveryTests.java
@@ -412,4 +428,117 @@ private void validateIndexRecoveryState(RecoveryState.Index indexState) {
assertThat(indexState.percentBytesRecovered(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.percentBytesRecovered(), lessThanOrEqualTo(100.0f));
}
+
+ @Test
+ @TestLogging("indices.recovery:TRACE")
@s1monw
s1monw Dec 15, 2014 Contributor

is this needed?

@s1monw
Contributor
s1monw commented Dec 15, 2014

I left some more comments

@bleskes
Member
bleskes commented Dec 15, 2014

@s1monw I pushed another update. Responded to some comments as well.

@bleskes bleskes Recovery: be more resilient to partial network partitions
This commits adds a test that simulate disconnecting nodes and dropping requests during the various stages of recovery and solves all the issues that were raised by it. In short:

1) On going recoveries will be scheduled for retry upon network disconnect. The default retry period is 5s (cross node connections are checked every 10s by default).
2) Sometimes the disconnect happens after the target engine has started (but the shard is still in recovery). For simplicity, I opted to restart the recovery from scratch (where little to no files will be copied again, because there were just synced). To do soI had to add a stop method to the internal engine, which doesn't free the underlying store (putting the engine back to it pre-start status).
3) To protected against dropped requests, a Recovery Monitor was added that fails a recovery if no progress has been made in the last 30m (by default), which is equivalent to the long time outs we use in recovery requests.
4) When a shard fails on a node, we try to assign it to another node. If no such node is available, the shard will remain unassigned, causing the target node to clean any in memory state for it (files on disk remain). At the moment the shard will remain unassigned until another cluster state change happens, which will re-assigned it to the node in question but if no such change happens the shard will remain stuck at unassigned. The commits adds an extra delayed reroute in such cases to make sure the shard will be reassinged
ba64f52
@bleskes
Member
bleskes commented Jan 9, 2015

@s1monw I rebased and squashed against the latest master. Would be great if you can give it another round.

@s1monw
Contributor
s1monw commented Jan 9, 2015

cool stuff LGTM

@bleskes bleskes added a commit that closed this pull request Jan 10, 2015
@bleskes bleskes Recovery: be more resilient to partial network partitions
This commits adds a test that simulate disconnecting nodes and dropping requests during the various stages of recovery and solves all the issues that were raised by it. In short:

1) On going recoveries will be scheduled for retry upon network disconnect. The default retry period is 5s (cross node connections are checked every 10s by default).
2) Sometimes the disconnect happens after the target engine has started (but the shard is still in recovery). For simplicity, I opted to restart the recovery from scratch (where little to no files will be copied again, because there were just synced).
3) To protected against dropped requests, a Recovery Monitor was added that fails a recovery if no progress has been made in the last 30m (by default), which is equivalent to the long time outs we use in recovery requests.
4) When a shard fails on a node, we try to assign it to another node. If no such node is available, the shard will remain unassigned, causing the target node to clean any in memory state for it (files on disk remain). At the moment the shard will remain unassigned until another cluster state change happens, which will re-assigned it to the node in question but if no such change happens the shard will remain stuck at unassigned. The commits adds an extra delayed reroute in such cases to make sure the shard will be reassinged
5) Moved all recovery related settings to the RecoverySettings.

Closes #8720
102e7ad
@bleskes bleskes closed this in 102e7ad Jan 10, 2015
@bleskes bleskes added a commit to bleskes/elasticsearch that referenced this pull request Jan 12, 2015
@bleskes bleskes Recovery: be more resilient to partial network partitions
This commits adds a test that simulate disconnecting nodes and dropping requests during the various stages of recovery and solves all the issues that were raised by it. In short:

1) On going recoveries will be scheduled for retry upon network disconnect. The default retry period is 5s (cross node connections are checked every 10s by default).
2) Sometimes the disconnect happens after the target engine has started (but the shard is still in recovery). For simplicity, I opted to restart the recovery from scratch (where little to no files will be copied again, because there were just synced).
3) To protected against dropped requests, a Recovery Monitor was added that fails a recovery if no progress has been made in the last 30m (by default), which is equivalent to the long time outs we use in recovery requests.
4) When a shard fails on a node, we try to assign it to another node. If no such node is available, the shard will remain unassigned, causing the target node to clean any in memory state for it (files on disk remain). At the moment the shard will remain unassigned until another cluster state change happens, which will re-assigned it to the node in question but if no such change happens the shard will remain stuck at unassigned. The commits adds an extra delayed reroute in such cases to make sure the shard will be reassinged
5) Moved all recovery related settings to the RecoverySettings.

Closes #8720
8c24cef
@bleskes bleskes added a commit to bleskes/elasticsearch that referenced this pull request Jan 13, 2015
@bleskes bleskes Add #8720 to the resiliency page 636d503
@bleskes bleskes deleted the bleskes:recovery_network_disconnect branch Jan 14, 2015
@bleskes bleskes added a commit that referenced this pull request Jan 16, 2015
@bleskes bleskes Add #8720 to the resiliency page
Closes #9277
fb2e4da
@bleskes bleskes added a commit to bleskes/elasticsearch that referenced this pull request Jan 30, 2015
@bleskes bleskes Recovery: update access time of ongoing recoveries
#8720 introduced a timeout mechanism for ongoing recoveries, based on a last access time variable. In the many iterations on that PR the update of the access time was lost. This adds it back, including a test that should have been there in the first place.
2d32bdb
@bleskes bleskes added a commit that referenced this pull request Jan 30, 2015
@bleskes bleskes Recovery: update access time of ongoing recoveries
#8720 introduced a timeout mechanism for ongoing recoveries, based on a last access time variable. In the many iterations on that PR the update of the access time was lost. This adds it back, including a test that should have been there in the first place.

Closes #9506
06d5db4
@bleskes bleskes added a commit that referenced this pull request Jan 30, 2015
@bleskes bleskes Recovery: update access time of ongoing recoveries
#8720 introduced a timeout mechanism for ongoing recoveries, based on a last access time variable. In the many iterations on that PR the update of the access time was lost. This adds it back, including a test that should have been there in the first place.

Closes #9506
eabc3cd
@bleskes bleskes added a commit that referenced this pull request Feb 5, 2015
@bleskes bleskes Test: add awaitFix to SearchWithRandomExceptionsTests
disabling this until further discussion. Recent failures probably relate to #9211 & #8720 (+ friends)
fa76268
@bleskes bleskes added a commit that referenced this pull request Feb 5, 2015
@bleskes bleskes Test: add awaitFix to SearchWithRandomExceptionsTests
disabling this until further discussion. Recent failures probably relate to #9211 & #8720 (+ friends)
97ac2f5
@clintongormley clintongormley added enhancement and removed review labels Mar 19, 2015
@clintongormley clintongormley changed the title from Recovery: be more resilient to partial network partitions to Be more resilient to partial network partitions Jun 7, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment