Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/index-modules/translog.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ The period with no flush happening to force a flush. Defaults to `30m`.
How often to check if a flush is needed, randomized
between the interval value and 2x the interval value. Defaults to `5s`.

`index.gateway.local.sync`::
`index.translog.sync_interval`::

How often the translog is ++fsync++ed to disk. Defaults to `5s`.

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/elasticsearch/common/io/Channels.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ public static int readFromFileChannel(FileChannel channel, long channelPosition,
return readFromFileChannel(channel, channelPosition, buffer);
}


/**
* read from a file channel into a byte buffer, starting at a certain position. An EOFException will be thrown if you
* attempt to read beyond the end of file.
*
* @param channel channel to read from
* @param channelPosition position to read from
* @param dest destination {@link java.nio.ByteBuffer} to put data in
*/
public static void readFromFileChannelWithEofException(FileChannel channel, long channelPosition, ByteBuffer dest) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a shorter name here maybe forceReadFromFileChannel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have readFromFileChannelWithEofException . agreed it's long but rather do it in another change..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we should make such a big deal out of it

int read = readFromFileChannel(channel, channelPosition, dest);
if (read < 0) {
throw new EOFException("read past EOF. pos [" + channelPosition + "] length: [" + dest.limit() + "] end: [" + channel.size() + "]");
}
}

/**
* read from a file channel into a byte buffer, starting at a certain position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,11 @@ public int refCount() {
return this.refCount.get();
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs?

/** gets the name of this instance */
public String getName() {
return name;
}

protected abstract void closeInternal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,48 @@
public class ReleasableLock implements Releasable {
private final Lock lock;

/* a per thread boolean indicating the lock is held by it. only works when assertions are enabled */
private final ThreadLocal<Boolean> holdingThreads;

public ReleasableLock(Lock lock) {
this.lock = lock;
boolean useHoldingThreads = false;
assert (useHoldingThreads = true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can leave a comment here about what the holdingThreads ThreadLocal is used for, that way it doesn't get accidentally changed during a cleanup at a later time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added java docs on the holdingThreads field

if (useHoldingThreads) {
holdingThreads = new ThreadLocal<>();
} else {
holdingThreads = null;
}
}

@Override
public void close() {
lock.unlock();
assert removeCurrentThread();
}


public ReleasableLock acquire() throws EngineException {
lock.lock();
assert addCurrentThread();
return this;
}

private boolean addCurrentThread() {
holdingThreads.set(true);
return true;
}

private boolean removeCurrentThread() {
holdingThreads.remove();
return true;
}

public Boolean isHeldByCurrentThread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to make this a public API? and if so can we return boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole point here is to be able to ask a lock if it's held by the current thread so it has to be public. I made it a Boolean because it may not know (if assertions are not enabled). can change to assertIsHeldByCurrentThread in the hope it makes it clearer, but then what do we return when the info is not available , true?

if (holdingThreads == null) {
throw new UnsupportedOperationException("asserts must be enabled");
}
Boolean b = holdingThreads.get();
return b != null && b.booleanValue();
}
}
38 changes: 24 additions & 14 deletions src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.suggest.SuggestShardModule;
import org.elasticsearch.index.termvectors.ShardTermVectorsModule;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -187,6 +185,7 @@ public IndexShard shard(int shardId) {
}
return null;
}

/**
* Return the shard with the provided id, or throw an exception if it doesn't exist.
*/
Expand Down Expand Up @@ -320,7 +319,6 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) {
modules.add(new ShardQueryCacheModule());
modules.add(new ShardBitsetFilterCacheModule());
modules.add(new ShardFieldDataModule());
modules.add(new TranslogModule(indexSettings));
modules.add(new IndexShardGatewayModule());
modules.add(new PercolatorShardModule());
modules.add(new ShardTermVectorsModule());
Expand Down Expand Up @@ -386,7 +384,8 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
}
}
// now we can close the translog service, we need to close it before the we close the shard
closeInjectorResource(sId, shardInjector, TranslogService.class);
// note the that the translog service is not there for shadow replicas
closeInjectorOptionalResource(sId, shardInjector, TranslogService.class);
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
Expand All @@ -402,7 +401,6 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
MergeSchedulerProvider.class,
MergePolicyProvider.class,
IndexShardGatewayService.class,
Translog.class,
PercolatorQueriesRegistry.class);

// call this before we close the store, so we can release resources for it
Expand All @@ -423,18 +421,30 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
*/
private void closeInjectorResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable>... toClose) {
for (Class<? extends Closeable> closeable : toClose) {
try {
final Closeable instance = shardInjector.getInstance(closeable);
if (instance == null) {
throw new NullPointerException("No instance available for " + closeable.getName());
}
IOUtils.close(instance);
} catch (Throwable t) {
logger.debug("{} failed to close {}", t, shardId, Strings.toUnderscoreCase(closeable.getSimpleName()));
if (closeInjectorOptionalResource(shardId, shardInjector, closeable) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm really? NPE was good no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it was caught anyway and logged under debug? I made it a warn now..

logger.warn("[{}] no instance available for [{}], ignoring... ", shardId, closeable.getSimpleName());
}
}
}

/**
* Closes an optional resource. Returns true if the resource was found;
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log
*/
private boolean closeInjectorOptionalResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable> toClose) {
try {
final Closeable instance = shardInjector.getInstance(toClose);
if (instance == null) {
return false;
}
IOUtils.close(instance);
} catch (Throwable t) {
logger.debug("{} failed to close {}", t, shardId, Strings.toUnderscoreCase(toClose.getSimpleName()));
}
return true;
}


private void onShardClose(ShardLock lock, boolean ownsShard) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
try {
Expand Down Expand Up @@ -464,7 +474,7 @@ public StoreCloseListener(ShardId shardId, boolean ownsShard) {

@Override
public void handle(ShardLock lock) {
assert lock.getShardId().equals(shardId) : "shard Id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock, ownsShard);
}
}
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public abstract class Engine implements Closeable {
protected Engine(EngineConfig engineConfig) {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");

this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
Expand Down Expand Up @@ -278,6 +277,9 @@ public final Searcher acquireSearcher(String source) throws EngineException {
}
}

/** returns the translog for this engine */
public abstract Translog translog();

protected void ensureOpen() {
if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine);
Expand Down Expand Up @@ -449,12 +451,12 @@ public void forceMerge(boolean flush) {
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;

/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract SnapshotIndexCommit snapshotIndex() throws EngineException;

public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;
public abstract SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we separate the flush and make this a SnapshotIndexCommit snapshotIndex()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I see it's to opt out...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I can unbundle this and change those who use true to make a separate flush call, but we'll need a wait to not commit/open a new translog. I think we can probably always commit the translog now (one parameter less) but didn't want to go on that adventure now. Rather do it in another iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


/** fail engine due to some error. the engine will also be closed. */
public void failEngine(String reason, Throwable failure) {
Expand Down Expand Up @@ -1048,7 +1050,7 @@ public void release() {
public void flushAndClose() throws IOException {
if (isClosed.get() == false) {
logger.trace("flushAndClose now acquire writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
try (ReleasableLock lock = writeLock.acquire()) {
logger.trace("flushAndClose now acquired writeLock");
try {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
Expand All @@ -1070,7 +1072,7 @@ public void flushAndClose() throws IOException {
public void close() throws IOException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.debug("close now acquiring writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
try (ReleasableLock lock = writeLock.acquire()) {
logger.debug("close acquired writeLock");
closeNoLock("api");
}
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/org/elasticsearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -67,13 +66,13 @@ public final class EngineConfig {
private final IndicesWarmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final Translog translog;
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
private final Engine.FailedEngineListener failedEngineListener;
private final boolean ignoreUnknownTranslog;

/**
* Index setting for index concurrency / number of threadstates in the indexwriter.
Expand Down Expand Up @@ -117,6 +116,11 @@ public final class EngineConfig {
*/
public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size";


/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_IGNORE_UNKNOWN_TRANSLOG = "index.engine.ignore_unknown_translog";


public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAUTL_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
Expand All @@ -130,15 +134,14 @@ public final class EngineConfig {
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService, IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, TranslogRecoveryPerformer translogRecoveryPerformer) {
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService, IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, TranslogRecoveryPerformer translogRecoveryPerformer) {
this.shardId = shardId;
this.threadPool = threadPool;
this.indexingService = indexingService;
this.indexSettingsService = indexSettingsService;
this.warmer = warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.translog = translog;
this.mergePolicyProvider = mergePolicyProvider;
this.mergeScheduler = mergeScheduler;
this.analyzer = analyzer;
Expand All @@ -155,6 +158,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
updateVersionMapSize();
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.ignoreUnknownTranslog = indexSettings.getAsBoolean(INDEX_IGNORE_UNKNOWN_TRANSLOG, false);
}

/** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */
Expand Down Expand Up @@ -182,6 +186,10 @@ public String getVersionMapSizeSetting() {
return versionMapSizeSetting;
}

/** if true the engine will start even if the translog id in the commit point can not be found */
public boolean getIgnoreUnknownTranslog() {
return ignoreUnknownTranslog;
}

/**
* returns the size of the version map that should trigger a refresh
Expand Down Expand Up @@ -318,13 +326,6 @@ public SnapshotDeletionPolicy getDeletionPolicy() {
return deletionPolicy;
}

/**
* Returns a {@link Translog instance}
*/
public Translog getTranslog() {
return translog;
}

/**
* Returns the {@link org.elasticsearch.index.merge.policy.MergePolicyProvider} used to obtain
* a {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.elasticsearch.index.engine;

import org.elasticsearch.index.translog.fs.FsTranslog;

/**
* Simple Engine Factory
*/
public interface EngineFactory {

public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery);
public Engine newReadWriteEngine(EngineConfig config, FsTranslog translog, boolean skipTranslogRecovery);

public Engine newReadOnlyEngine(EngineConfig config);
}
Loading