Skip to content

Commit

Permalink
[ENGINE] Fix updates dynamic settings in InternalEngineHolder
Browse files Browse the repository at this point in the history
After the refactoring in #8784 some settings didn't get passed to the
actual engine and there exists a race if the settings are updated while
the engine is started such that the actual starting engine doesn't see
the latest settings. This commit fixes the concurrency issue as well as
adds tests to ensure the settings are reflected.
  • Loading branch information
s1monw committed Dec 10, 2014
1 parent 788d7cb commit f308049
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 44 deletions.
7 changes: 7 additions & 0 deletions src/main/java/org/elasticsearch/index/codec/CodecService.java
Expand Up @@ -99,4 +99,11 @@ public Codec codec(String name) throws ElasticsearchIllegalArgumentException {
}
return codec;
}

/**
* Returns all registered available codec names
*/
public String[] availableCodecs() {
return codecs.keySet().toArray(new String[0]);
}
}
Expand Up @@ -1752,4 +1752,29 @@ private static SegmentReader segmentReader(LeafReader reader) {
// hard fail - we can't get a SegmentReader
throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}

long getGcDeletesInMillis() {
return gcDeletesInMillis;
}

String getCodecName() {
return codecName;
}

boolean isCompoundOnFlush() {
return compoundOnFlush;
}

int getIndexConcurrency() {
return indexConcurrency;
}

boolean isFailEngineOnCorruption() {
return failEngineOnCorruption;
}

LiveIndexWriterConfig getCurrentIndexWriterConfig() {
IndexWriter writer = currentIndexWriter();
return writer == null ? null : writer.getConfig();
}
}
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final ApplySettings settingsListener;
private final MergeScheduleListener mergeSchedulerListener;
private volatile Boolean failOnMergeFailure;
protected volatile Boolean failOnMergeFailure;
protected volatile boolean failEngineOnCorruption;
protected volatile ByteSizeValue indexingBufferSize;
protected volatile int indexConcurrency;
Expand Down Expand Up @@ -142,7 +143,7 @@ public InternalEngineHolder(ShardId shardId, @IndexSettings Settings indexSettin
this.mergeSchedulerListener = new MergeScheduleListener();
this.mergeScheduler.addListener(mergeSchedulerListener);

this.settingsListener = new ApplySettings();
this.settingsListener = new ApplySettings(logger, this);
this.indexSettingsService.addListener(this.settingsListener);
store.incRef();
}
Expand Down Expand Up @@ -214,20 +215,19 @@ public synchronized void stop() throws EngineException {

@Override
public synchronized void close() throws ElasticsearchException {
if (closed) {
return;
}
closed = true;
try {
InternalEngine currentEngine = this.currentEngine.getAndSet(null);
if (currentEngine != null) {
currentEngine.close();
if (closed == false) {
closed = true;
try {
InternalEngine currentEngine = this.currentEngine.getAndSet(null);
if (currentEngine != null) {
currentEngine.close();
}
mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
mergeScheduler.removeListener(mergeSchedulerListener);
indexSettingsService.removeListener(settingsListener);
} finally {
store.decRef();
}
mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
mergeScheduler.removeListener(mergeSchedulerListener);
indexSettingsService.removeListener(settingsListener);
} finally {
store.decRef();
}
}

Expand Down Expand Up @@ -356,55 +356,76 @@ public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable f
}
}

class ApplySettings implements IndexSettingsService.Listener {
static class ApplySettings implements IndexSettingsService.Listener {

private final ESLogger logger;
private final InternalEngineHolder holder;

ApplySettings(ESLogger logger, InternalEngineHolder holder) {
this.logger = logger;
this.holder = holder;
}

@Override
public void onRefreshSettings(Settings settings) {
InternalEngine currentEngine = InternalEngineHolder.this.currentEngine.get();
boolean change = false;
long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis)).millis();
if (gcDeletesInMillis != InternalEngineHolder.this.gcDeletesInMillis) {
logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
InternalEngineHolder.this.gcDeletesInMillis = gcDeletesInMillis;
long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(holder.gcDeletesInMillis)).millis();
if (gcDeletesInMillis != holder.gcDeletesInMillis) {
logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(holder.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
holder.gcDeletesInMillis = gcDeletesInMillis;
change = true;
}

final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush);
if (compoundOnFlush != InternalEngineHolder.this.compoundOnFlush) {
logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush, compoundOnFlush);
InternalEngineHolder.this.compoundOnFlush = compoundOnFlush;
final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush);
if (compoundOnFlush != holder.compoundOnFlush) {
logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush, compoundOnFlush);
holder.compoundOnFlush = compoundOnFlush;
change = true;
}

final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption);
if (failEngineOnCorruption != InternalEngineHolder.this.failEngineOnCorruption) {
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption, failEngineOnCorruption);
InternalEngineHolder.this.failEngineOnCorruption = failEngineOnCorruption;
final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption);
if (failEngineOnCorruption != holder.failEngineOnCorruption) {
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption, failEngineOnCorruption);
holder.failEngineOnCorruption = failEngineOnCorruption;
change = true;
}
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngineHolder.this.indexConcurrency);
if (indexConcurrency != InternalEngineHolder.this.indexConcurrency) {
logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngineHolder.this.indexConcurrency, indexConcurrency);
InternalEngineHolder.this.indexConcurrency = indexConcurrency;
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, holder.indexConcurrency);
if (indexConcurrency != holder.indexConcurrency) {
logger.info("updating index.index_concurrency from [{}] to [{}]", holder.indexConcurrency, indexConcurrency);
holder.indexConcurrency = indexConcurrency;
// we have to flush in this case, since it only applies on a new index writer
change = true;
}
if (!codecName.equals(InternalEngineHolder.this.codecName)) {
logger.info("updating index.codec from [{}] to [{}]", InternalEngineHolder.this.codecName, codecName);
InternalEngineHolder.this.codecName = codecName;
final String codecName = settings.get(INDEX_CODEC, holder.codecName);
if (!codecName.equals(holder.codecName)) {
logger.info("updating index.codec from [{}] to [{}]", holder.codecName, codecName);
holder.codecName = codecName;
// we want to flush in this case, so the new codec will be reflected right away...
change = true;
}
if (failOnMergeFailure != InternalEngineHolder.this.failOnMergeFailure) {
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, InternalEngineHolder.this.failOnMergeFailure, failOnMergeFailure);
InternalEngineHolder.this.failOnMergeFailure = failOnMergeFailure;
final boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure);
if (failOnMergeFailure != holder.failOnMergeFailure) {
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure, failOnMergeFailure);
holder.failOnMergeFailure = failOnMergeFailure;
}
if (change && currentEngine != null) {
currentEngine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName);


if (change) {
holder.updateSettings();
}
}
}

synchronized void updateSettings() {
// we need to make sure that we wait for the engine to be fully initialized
// the start method sets the current engine once it's done but samples the settings
// at construction time.
final InternalEngine engine = currentEngine.get();
if (engine != null) {
engine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName);
}
}

class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@Override
public void onFailedMerge(MergePolicy.MergeException e) {
Expand Down
Expand Up @@ -134,4 +134,11 @@ public void addDynamicSetting(String setting, Validator validator) {
protected void configure() {
bind(DynamicSettings.class).annotatedWith(IndexDynamicSettings.class).toInstance(indexDynamicSettings);
}

/**
* Returns <code>true</code> iff the given setting is in the dynamic settings map. Otherwise <code>false</code>.
*/
public boolean containsSetting(String setting) {
return indexDynamicSettings.hasDynamicSetting(setting);
}
}
Expand Up @@ -26,18 +26,21 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -63,6 +66,8 @@
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexDynamicSettings;
import org.elasticsearch.index.settings.IndexDynamicSettingsModule;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
Expand All @@ -89,13 +94,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy;
import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*;

Expand Down Expand Up @@ -1502,4 +1506,61 @@ public void testFailStart() throws IOException {
}
}
}

@Test
public void testSettings() {
final InternalEngineHolder holder = (InternalEngineHolder) engine;
IndexDynamicSettingsModule settings = new IndexDynamicSettingsModule();
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_GC_DELETES));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_CODEC));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_INDEX_CONCURRENCY));

final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean();
boolean failOnCorruption = randomBoolean();
boolean failOnMerge = randomBoolean();
long gcDeletes = Math.max(0, randomLong());
int indexConcurrency = randomIntBetween(1, 20);
String codecName = randomFrom(holder.codecService.availableCodecs());

Settings build = ImmutableSettings.builder()
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, failOnCorruption)
.put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(InternalEngineHolder.INDEX_GC_DELETES, gcDeletes)
.put(InternalEngineHolder.INDEX_CODEC, codecName)
.put(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE, failOnMerge)
.put(InternalEngineHolder.INDEX_INDEX_CONCURRENCY, indexConcurrency)
.build();

engineSettingsService.refreshSettings(build);
LiveIndexWriterConfig currentIndexWriterConfig = holder.engineSafe().getCurrentIndexWriterConfig();
assertEquals(holder.compoundOnFlush, compoundOnFlush);
assertEquals(holder.engineSafe().isCompoundOnFlush(), compoundOnFlush);
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush);


assertEquals(holder.gcDeletesInMillis, gcDeletes);
assertEquals(holder.engineSafe().getGcDeletesInMillis(), gcDeletes);

assertEquals(holder.codecName, codecName);
assertEquals(holder.engineSafe().getCodecName(), codecName);
assertEquals(currentIndexWriterConfig.getCodec(), holder.codecService.codec(codecName));


assertEquals(holder.failEngineOnCorruption, failOnCorruption);
assertEquals(holder.engineSafe().isFailEngineOnCorruption(), failOnCorruption);

assertEquals(holder.failOnMergeFailure, failOnMerge); // only on the holder

assertEquals(holder.indexConcurrency, indexConcurrency);
assertEquals(holder.engineSafe().getIndexConcurrency(), indexConcurrency);
assertEquals(currentIndexWriterConfig.getMaxThreadStates(), indexConcurrency);


}
}
}

0 comments on commit f308049

Please sign in to comment.