Skip to content

Commit

Permalink
Decouple recoveries from engine flush
Browse files Browse the repository at this point in the history
In order to safely complete recoveries / relocations we have to keep all operation done since the recovery start at available for replay. At the moment we do so by preventing the engine from flushing and thus making sure that the operations are kept in the translog. A side effect of this is that the translog keeps on growing until the recovery is done. This is not a problem as we do need these operations but if the another recovery starts concurrently it may have an unneededly long translog to replay. Also, if we shutdown the engine for some reason at this point (like when a node is restarted)  we have to recover a long translog when we come back.

To void this, the translog is changed to be based on multiple files instead of a single one. This allows recoveries to keep hold to the files they need while allowing the engine to flush and do a lucene commit (which will create a new translog files bellow the hood).

Change highlights:
- Refactor Translog file management to allow for multiple files.
- Translog maintains a list of referenced files, both by outstanding recoveries and files containing operations not yet committed to Lucene.
- A new Translog.View concept is introduced, allowing recoveries to get a reference to all currently uncommitted translog files plus all future translog files created until the view is closed. They can use this view to iterate over operations.
- Recovery phase3 is removed. That phase was replaying operations while preventing new writes to the engine. This is unneeded as standard indexing also send all operations from the start of the recovery  to the recovering shard. Replay all ops in the view acquired in recovery start is enough to guarantee no operation is lost.
- Opening and closing the translog is now the responsibility of the IndexShard. ShadowIndexShards do not open the translog.
- Moved the ownership of translog fsyncing to the translog it self, changing the responsible setting to `index.translog.sync_interval` (was `index.gateway.local.sync`)

Closes elastic#10624
  • Loading branch information
bleskes committed Apr 29, 2015
1 parent 351a4d3 commit 20d8cb7
Show file tree
Hide file tree
Showing 51 changed files with 2,191 additions and 1,736 deletions.
2 changes: 1 addition & 1 deletion docs/reference/index-modules/translog.asciidoc
Expand Up @@ -24,7 +24,7 @@ The period with no flush happening to force a flush. Defaults to `30m`.
How often to check if a flush is needed, randomized
between the interval value and 2x the interval value. Defaults to `5s`.

`index.gateway.local.sync`::
`index.translog.sync_interval`::

How often the translog is ++fsync++ed to disk. Defaults to `5s`.

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/elasticsearch/common/io/Channels.java
Expand Up @@ -87,6 +87,22 @@ public static int readFromFileChannel(FileChannel channel, long channelPosition,
return readFromFileChannel(channel, channelPosition, buffer);
}


/**
* read from a file channel into a byte buffer, starting at a certain position. An EOFException will be thrown if you
* attempt to read beyond the end of file.
*
* @param channel channel to read from
* @param channelPosition position to read from
* @param dest destination {@link java.nio.ByteBuffer} to put data in
*/
public static void readFromFileChannelWithEofException(FileChannel channel, long channelPosition, ByteBuffer dest) throws IOException {
int read = readFromFileChannel(channel, channelPosition, dest);
if (read < 0) {
throw new EOFException("read past EOF. pos [" + channelPosition + "] length: [" + dest.limit() + "] end: [" + channel.size() + "]");
}
}

/**
* read from a file channel into a byte buffer, starting at a certain position.
*
Expand Down
Expand Up @@ -73,5 +73,11 @@ public int refCount() {
return this.refCount.get();
}


/** gets the name of this instance */
public String getName() {
return name;
}

protected abstract void closeInternal();
}
Expand Up @@ -30,17 +30,48 @@
public class ReleasableLock implements Releasable {
private final Lock lock;

/* a per thread boolean indicating the lock is held by it. only works when assertions are enabled */
private final ThreadLocal<Boolean> holdingThreads;

public ReleasableLock(Lock lock) {
this.lock = lock;
boolean useHoldingThreads = false;
assert (useHoldingThreads = true);
if (useHoldingThreads) {
holdingThreads = new ThreadLocal<>();
} else {
holdingThreads = null;
}
}

@Override
public void close() {
lock.unlock();
assert removeCurrentThread();
}


public ReleasableLock acquire() throws EngineException {
lock.lock();
assert addCurrentThread();
return this;
}

private boolean addCurrentThread() {
holdingThreads.set(true);
return true;
}

private boolean removeCurrentThread() {
holdingThreads.remove();
return true;
}

public Boolean isHeldByCurrentThread() {
if (holdingThreads == null) {
throw new UnsupportedOperationException("asserts must be enabled");
}
Boolean b = holdingThreads.get();
return b != null && b.booleanValue();
}
}
38 changes: 24 additions & 14 deletions src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -65,8 +65,6 @@
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.suggest.SuggestShardModule;
import org.elasticsearch.index.termvectors.ShardTermVectorsModule;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -187,6 +185,7 @@ public IndexShard shard(int shardId) {
}
return null;
}

/**
* Return the shard with the provided id, or throw an exception if it doesn't exist.
*/
Expand Down Expand Up @@ -320,7 +319,6 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) {
modules.add(new ShardQueryCacheModule());
modules.add(new ShardBitsetFilterCacheModule());
modules.add(new ShardFieldDataModule());
modules.add(new TranslogModule(indexSettings));
modules.add(new IndexShardGatewayModule());
modules.add(new PercolatorShardModule());
modules.add(new ShardTermVectorsModule());
Expand Down Expand Up @@ -386,7 +384,8 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
}
}
// now we can close the translog service, we need to close it before the we close the shard
closeInjectorResource(sId, shardInjector, TranslogService.class);
// note the that the translog service is not there for shadow replicas
closeInjectorOptionalResource(sId, shardInjector, TranslogService.class);
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
Expand All @@ -402,7 +401,6 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
MergeSchedulerProvider.class,
MergePolicyProvider.class,
IndexShardGatewayService.class,
Translog.class,
PercolatorQueriesRegistry.class);

// call this before we close the store, so we can release resources for it
Expand All @@ -423,18 +421,30 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
*/
private void closeInjectorResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable>... toClose) {
for (Class<? extends Closeable> closeable : toClose) {
try {
final Closeable instance = shardInjector.getInstance(closeable);
if (instance == null) {
throw new NullPointerException("No instance available for " + closeable.getName());
}
IOUtils.close(instance);
} catch (Throwable t) {
logger.debug("{} failed to close {}", t, shardId, Strings.toUnderscoreCase(closeable.getSimpleName()));
if (closeInjectorOptionalResource(shardId, shardInjector, closeable) == false) {
logger.warn("[{}] no instance available for [{}], ignoring... ", shardId, closeable.getSimpleName());
}
}
}

/**
* Closes an optional resource. Returns true if the resource was found;
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log
*/
private boolean closeInjectorOptionalResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable> toClose) {
try {
final Closeable instance = shardInjector.getInstance(toClose);
if (instance == null) {
return false;
}
IOUtils.close(instance);
} catch (Throwable t) {
logger.debug("{} failed to close {}", t, shardId, Strings.toUnderscoreCase(toClose.getSimpleName()));
}
return true;
}


private void onShardClose(ShardLock lock, boolean ownsShard) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
try {
Expand Down Expand Up @@ -464,7 +474,7 @@ public StoreCloseListener(ShardId shardId, boolean ownsShard) {

@Override
public void handle(ShardLock lock) {
assert lock.getShardId().equals(shardId) : "shard Id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock, ownsShard);
}
}
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -82,7 +82,6 @@ public abstract class Engine implements Closeable {
protected Engine(EngineConfig engineConfig) {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");

this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
Expand Down Expand Up @@ -449,12 +448,12 @@ public void forceMerge(boolean flush) {
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;

/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract SnapshotIndexCommit snapshotIndex() throws EngineException;

public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;
public abstract SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException;

/** fail engine due to some error. the engine will also be closed. */
public void failEngine(String reason, Throwable failure) {
Expand Down Expand Up @@ -1048,7 +1047,7 @@ public void release() {
public void flushAndClose() throws IOException {
if (isClosed.get() == false) {
logger.trace("flushAndClose now acquire writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
try (ReleasableLock lock = writeLock.acquire()) {
logger.trace("flushAndClose now acquired writeLock");
try {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
Expand All @@ -1070,7 +1069,7 @@ public void flushAndClose() throws IOException {
public void close() throws IOException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.debug("close now acquiring writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
try (ReleasableLock lock = writeLock.acquire()) {
logger.debug("close acquired writeLock");
closeNoLock("api");
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/elasticsearch/index/engine/EngineConfig.java
Expand Up @@ -67,13 +67,15 @@ public final class EngineConfig {
private final IndicesWarmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
@Nullable
private final Translog translog;
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
private final Engine.FailedEngineListener failedEngineListener;
private final boolean ignoreUnknownTranslog;

/**
* Index setting for index concurrency / number of threadstates in the indexwriter.
Expand Down Expand Up @@ -117,6 +119,11 @@ public final class EngineConfig {
*/
public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size";


/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_IGNORE_UNKNOWN_TRANSLOG = "index.engine.ignore_unknown_translog";


public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAUTL_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
Expand Down Expand Up @@ -155,6 +162,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
updateVersionMapSize();
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.ignoreUnknownTranslog = indexSettings.getAsBoolean(INDEX_IGNORE_UNKNOWN_TRANSLOG, false);
}

/** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */
Expand Down Expand Up @@ -182,6 +190,10 @@ public String getVersionMapSizeSetting() {
return versionMapSizeSetting;
}

/** if true the engine will start even if the translog id in the commit point can not be found */
public boolean getIgnoreUnknownTranslog() {
return ignoreUnknownTranslog;
}

/**
* returns the size of the version map that should trigger a refresh
Expand Down

0 comments on commit 20d8cb7

Please sign in to comment.