Skip to content

Commit

Permalink
Engine: back port elastic#9648 - Fix deadlock problems when API flush…
Browse files Browse the repository at this point in the history
… and finish recovery happens concurrently

Issue elastic#9648 fixes a potential deadlock between two concurrent flushes - one at the end of recovery and one through the API or background flush. This back ports the logic to 1.4 . It is slightly more contrived as we still use the write lock in the flush code.

If we feel we have some concerns about this approach we can also move the recovery flush to happen on a generic thread.

Closes elastic#9942
  • Loading branch information
bleskes committed Mar 2, 2015
1 parent fb327f8 commit 9f88029
Showing 1 changed file with 86 additions and 91 deletions.
Expand Up @@ -150,7 +150,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
// will not really happen, and then the commitUserData and the new translog will not be reflected
private volatile boolean flushNeeded = false;
private final AtomicInteger flushing = new AtomicInteger();
private final Lock flushLock = new ReentrantLock();
private final InternalLock flushLock = new InternalLock(new ReentrantLock());

private final RecoveryCounter onGoingRecoveries = new RecoveryCounter();

Expand Down Expand Up @@ -853,118 +853,108 @@ public void flush(Flush flush) throws EngineException {
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
}

flushLock.lock();
try {
final InternalLock lockNeeded;
switch (flush.type()) {
case NEW_WRITER:
lockNeeded = writeLock;
break;
case COMMIT:
case COMMIT_TRANSLOG:
lockNeeded = readLock;
break;
default:
throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported");
}

/*
we have to acquire the flush lock second to prevent dead locks and keep the locking order identical.
callers may already have acquired the read-write lock so we have to be consistent and always lock it first.
*/
try (InternalLock _ = lockNeeded.acquire(); InternalLock flock = flushLock.acquire()) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
ensureOpen();
if (flush.type() == Flush.Type.NEW_WRITER) {
try (InternalLock _ = writeLock.acquire()) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
// disable refreshing, not dirty
dirty = false;
try {
{ // commit and close the current writer - we write the current tanslog ID just in case
final long translogId = translog.currentId();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
indexWriter.rollback();
}
// disable refreshing, not dirty
dirty = false;
try {
{ // commit and close the current writer - we write the current tanslog ID just in case
final long translogId = translog.currentId();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
indexWriter.rollback();
}
indexWriter = createWriter();
mergeScheduler.removeListener(this.throttle);

this.throttle = new IndexThrottle(mergeScheduler, this.logger, indexingService);
mergeScheduler.addListener(throttle);
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
if (flushNeeded || flush.force()) {
flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
translog.newTranslog(translogId);
}

SearcherManager current = this.searcherManager;
this.searcherManager = buildSearchManager(indexWriter);
versionMap.setManager(searcherManager);
indexWriter = createWriter();
mergeScheduler.removeListener(this.throttle);

try {
IOUtils.close(current);
} catch (Throwable t) {
logger.warn("Failed to close current SearcherManager", t);
}
this.throttle = new IndexThrottle(mergeScheduler, this.logger, indexingService);
mergeScheduler.addListener(throttle);
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
if (flushNeeded || flush.force()) {
flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
translog.newTranslog(translogId);
}

maybePruneDeletedTombstones();
SearcherManager current = this.searcherManager;
this.searcherManager = buildSearchManager(indexWriter);
versionMap.setManager(searcherManager);

try {
IOUtils.close(current);
} catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t);
logger.warn("Failed to close current SearcherManager", t);
}
} catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t);
}
} else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) {
try (InternalLock _ = readLock.acquire()) {
final IndexWriter indexWriter = currentIndexWriter();
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}

if (flushNeeded || flush.force()) {
flushNeeded = false;
try {
long translogId = translogIdGenerator.incrementAndGet();
translog.newTransientTranslog(translogId);
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table_flush").force(true));
// we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get
// when tans overrides it
translog.makeTransientCurrent();

} catch (Throwable e) {
translog.revertTransient();
throw new FlushFailedEngineException(shardId, e);
}
}
}

// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
pruneDeletedTombstones();
}

} else if (flush.type() == Flush.Type.COMMIT) {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
// of that translog
try (InternalLock _ = readLock.acquire()) {
final IndexWriter indexWriter = currentIndexWriter();
// we allow to *just* commit if there is an ongoing recovery happening...
// its ok to use this, only a flush will cause a new translogId, and we are locked here from
// other flushes use flushLock
final IndexWriter indexWriter = currentIndexWriter();
if (flushNeeded || flush.force()) {
flushNeeded = false;
try {
long translogId = translog.currentId();
long translogId = translogIdGenerator.incrementAndGet();
translog.newTransientTranslog(translogId);
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table_flush").force(true));
// we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get
// when tans overrides it
translog.makeTransientCurrent();

} catch (Throwable e) {
translog.revertTransient();
throw new FlushFailedEngineException(shardId, e);
}
}

// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
pruneDeletedTombstones();
} else if (flush.type() == Flush.Type.COMMIT) {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
// of that translog
final IndexWriter indexWriter = currentIndexWriter();
// we allow to *just* commit if there is an ongoing recovery happening...
// its ok to use this, only a flush will cause a new translogId, and we are locked here from
// other flushes use flushLock
try {
long translogId = translog.currentId();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
} catch (Throwable e) {
throw new FlushFailedEngineException(shardId, e);
}

} else {
throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported");
}

// reread the last committed segment infos
try (InternalLock _ = readLock.acquire()) {
ensureOpen();
try {
readLastCommittedSegmentsInfo();
} catch (Throwable e) {
if (closedOrFailed == false) {
Expand All @@ -978,9 +968,14 @@ public void flush(Flush flush) throws EngineException {
maybeFailEngine(ex, "flush");
throw ex;
} finally {
flushLock.unlock();
flushing.decrementAndGet();
}

// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
pruneDeletedTombstones();
}
}

private void ensureOpen() {
Expand Down

0 comments on commit 9f88029

Please sign in to comment.