Skip to content

Commit

Permalink
[RECOVERY] Increment Store refcount on RecoveryTarget
Browse files Browse the repository at this point in the history
We should make sure we have incremented the store refcount
before we start the recovery on the recovyer target.

Closes #6844
  • Loading branch information
s1monw committed Jul 14, 2014
1 parent 5b641d1 commit de1e7fe
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 91 deletions.
24 changes: 22 additions & 2 deletions src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,36 @@ public boolean suggestUseCompoundFile() {
*
* Note: Close can safely be called multiple times.
* @see #decRef
* @see #tryIncRef()
* @throws AlreadyClosedException iff the reference counter can not be incremented.
*/
public final void incRef() {
if (tryIncRef() == false) {
throw new AlreadyClosedException("Store is already closed can't increment refCount current count [" + refCount.get() + "]");
}
}

/**
* Tries to increment the refCount of this Store instance. This method will return <tt>true</tt> iff the refCount was
* incremented successfully otherwise <tt>false</tt>. RefCounts are used to determine when a
* Store can be closed safely, i.e. as soon as there are no more references. Be sure to always call a
* corresponding {@link #decRef}, in a finally clause; otherwise the store may never be closed. Note that
* {@link #close} simply calls decRef(), which means that the Store will not really be closed until {@link
* #decRef} has been called for all outstanding references.
*
* Note: Close can safely be called multiple times.
* @see #decRef()
* @see #incRef()
*/
public final boolean tryIncRef() {
do {
int i = refCount.get();
if (i > 0) {
if (refCount.compareAndSet(i, i + 1)) {
return;
return true;
}
} else {
throw new AlreadyClosedException("Store is already closed can't increment refCount current count [" + i + "]");
return false;
}
} while (true);
}
Expand Down
182 changes: 93 additions & 89 deletions src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void startRecovery(final StartRecoveryRequest request, final InternalInde
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
doRecovery(request, recoveryStatus, listener);
doRecovery(request, recoveryStatus, listener);
}
});
}
Expand All @@ -188,7 +188,6 @@ public void run() {

private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) {
assert request.sourceNode() != null : "can't do a recovery without a source node";

final InternalIndexShard shard = recoveryStatus.indexShard;
if (shard == null) {
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
Expand All @@ -205,106 +204,111 @@ private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus
}

recoveryStatus.recoveryThread = Thread.currentThread();
if (shard.store().tryIncRef()) {
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());

try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());

StopWatch stopWatch = new StopWatch().start();
RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
StopWatch stopWatch = new StopWatch().start();
RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet();
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
stopWatch.stop();
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("{} recovery completed from [{}], took [{}]", request.shardId(), request.sourceNode(), stopWatch.totalTime());
}
}).txGet();
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
stopWatch.stop();
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("{} recovery completed from [{}], took [{}]", request.shardId(), request.sourceNode(), stopWatch.totalTime());
}
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onRecoveryDone();
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
}
if (recoveryStatus.isCanceled()) {
// don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery");
return;
}
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
listener.onRecoveryDone();
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
}
if (recoveryStatus.isCanceled()) {
// don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery");
return;
}
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}

// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)

if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// if the target is not ready yet, retry
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
return;
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// if the target is not ready yet, retry
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
return;
}

if (cause instanceof DelayRecoveryException) {
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
return;
}
if (cause instanceof DelayRecoveryException) {
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
return;
}

// here, we check against ignore recovery options
// here, we check against ignore recovery options

// in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later
// it will get deleted in the IndicesStore if all are allocated and no shard exists on this node...
// in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later
// it will get deleted in the IndicesStore if all are allocated and no shard exists on this node...

removeAndCleanOnGoingRecovery(recoveryStatus);
removeAndCleanOnGoingRecovery(recoveryStatus);

if (cause instanceof ConnectTransportException) {
listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")");
return;
}
if (cause instanceof ConnectTransportException) {
listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")");
return;
}

if (cause instanceof IndexShardClosedException) {
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
return;
}
if (cause instanceof IndexShardClosedException) {
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
return;
}

if (cause instanceof AlreadyClosedException) {
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
return;
}
if (cause instanceof AlreadyClosedException) {
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
return;
}

logger.warn("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
logger.warn("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
} finally {
shard.store().decRef();
}
} else {
listener.onIgnoreRecovery(false, "local store closed, stop recovery");
}
}

Expand Down

0 comments on commit de1e7fe

Please sign in to comment.