Skip to content
Permalink
Browse files

Merge pull request #16257 from s1monw/no_fsync_on_every_operation

Remove the ability to fsync on every operation and only schedule fsync task if really needed
  • Loading branch information...
s1monw committed Jan 27, 2016
2 parents dd3bd6d + 98bc9de commit 1df7d4d5b559cf4627620feaca1c077b24b736e2
@@ -108,7 +108,7 @@
private final IndexingSlowLog slowLog;
private final IndexingOperationListener[] listeners;
private volatile AsyncRefreshTask refreshTask;
private final AsyncTranslogFSync fsyncTask;
private volatile AsyncTranslogFSync fsyncTask;
private final SearchSlowLog searchSlowLog;

public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
@@ -147,13 +147,9 @@ public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
this.listeners[0] = slowLog;
System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
// kick off async ops for the first shard in this index
if (this.indexSettings.getTranslogSyncInterval().millis() != 0) {
this.fsyncTask = new AsyncTranslogFSync(this);
} else {
this.fsyncTask = null;
}
this.refreshTask = new AsyncRefreshTask(this);
searchSlowLog = new SearchSlowLog(indexSettings);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

public int numberOfShards() {
@@ -565,6 +561,7 @@ public IndexMetaData getMetaData() {
}

public synchronized void updateMetaData(final IndexMetaData metadata) {
final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability();
if (indexSettings.updateIndexMetaData(metadata)) {
for (final IndexShard shard : this.shards.values()) {
try {
@@ -576,6 +573,20 @@ public synchronized void updateMetaData(final IndexMetaData metadata) {
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
rescheduleRefreshTasks();
}
final Translog.Durability durability = indexSettings.getTranslogDurability();
if (durability != oldTranslogDurability) {
rescheduleFsyncTask(durability);
}
}
}

private void rescheduleFsyncTask(Translog.Durability durability) {
try {
if (fsyncTask != null) {
fsyncTask.close();
}
} finally {
fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this);
}
}

@@ -55,7 +55,7 @@
public static final Setting<Boolean> QUERY_STRING_ANALYZE_WILDCARD = Setting.boolSetting("indices.query.query_string.analyze_wildcard", false, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> QUERY_STRING_ALLOW_LEADING_WILDCARD = Setting.boolSetting("indices.query.query_string.allowLeadingWildcard", true, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> ALLOW_UNMAPPED = Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, false, Setting.Scope.INDEX);
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), false, Setting.Scope.INDEX);
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), false, Setting.Scope.INDEX);
public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING = new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), true, Setting.Scope.INDEX);
public static final Setting<Boolean> INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting("index.warmer.enabled", true, true, Setting.Scope.INDEX);
public static final Setting<Boolean> INDEX_TTL_DISABLE_PURGE_SETTING = Setting.boolSetting("index.ttl.disable_purge", false, true, Setting.Scope.INDEX);
@@ -429,9 +429,6 @@ public Location add(Operation operation) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes);
if (config.isSyncOnEachOperation()) {
current.sync();
}
assert assertBytesAtLocation(location, bytes);
return location;
}
@@ -65,13 +65,6 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet
this.bigArrays = bigArrays;
}

/**
* Returns <code>true</code> iff each low level operation shoudl be fsynced
*/
public boolean isSyncOnEachOperation() {
return indexSettings.getTranslogSyncInterval().millis() == 0;
}

/**
* Returns the index indexSettings
*/
@@ -39,6 +39,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -264,7 +265,7 @@ public void testRefreshTaskIsUpdated() throws IOException {
}

public void testFsyncTaskIsRunning() throws IOException {
IndexService indexService = createIndex("test", Settings.EMPTY);
IndexService indexService = createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build());
IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask();
assertNotNull(fsyncTask);
assertEquals(5000, fsyncTask.getInterval().millis());
@@ -274,6 +275,9 @@ public void testFsyncTaskIsRunning() throws IOException {
indexService.close("simon says", false);
assertFalse(fsyncTask.isScheduled());
assertTrue(fsyncTask.isClosed());

indexService = createIndex("test1", Settings.EMPTY);
assertNull(indexService.getFsyncTask());
}

public void testRefreshActuallyWorks() throws Exception {
@@ -307,7 +311,7 @@ public void testRefreshActuallyWorks() throws Exception {

public void testAsyncFsyncActuallyWorks() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10ms") // very often :)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "100ms") // very often :)
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)
.build();
IndexService indexService = createIndex("test", settings);
@@ -320,11 +324,43 @@ public void testAsyncFsyncActuallyWorks() throws Exception {
});
}

public void testNoFsyncTaskIfDisabled() {
public void testRescheduleAsyncFsync() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "100ms") // very often :)
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertNull(indexService.getFsyncTask());
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)).build();
indexService.updateMetaData(metaData);
assertNotNull(indexService.getFsyncTask());
assertTrue(indexService.getRefreshTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get();
IndexShard shard = indexService.getShard(0);
assertBusy(() -> {
assertFalse(shard.getTranslog().syncNeeded());
});

metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)).build();
indexService.updateMetaData(metaData);
assertNull(indexService.getFsyncTask());

metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)).build();
indexService.updateMetaData(metaData);
assertNotNull(indexService.getFsyncTask());

}

public void testIllegalFsyncInterval() {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
.build();
try {
createIndex("test", settings);
fail();
} catch (IllegalArgumentException ex) {
assertEquals("Failed to parse value [0ms] for setting [index.translog.sync_interval] must be >= 100ms", ex.getMessage());
}
}
}
@@ -1393,7 +1393,6 @@ public void testTragicEventCanBeAnyException() throws IOException {
Path tempDir = createTempDir();
final FailSwitch fail = new FailSwitch();
TranslogConfig config = getTranslogConfig(tempDir);
assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation());
Translog translog = getFailableTranslog(fail, config, false, true);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
@@ -37,8 +37,8 @@ The data in the transaction log is only persisted to disk when the translog is
++fsync++ed and committed. In the event of hardware failure, any data written
since the previous translog commit will be lost.

By default, Elasticsearch ++fsync++s and commits the translog every 5 seconds
and at the end of every <<docs-index_,index>>, <<docs-delete,delete>>,
By default, Elasticsearch ++fsync++s and commits the translog every 5 seconds if `index.translog.durability` is set
to `async` or if set to `request` (default) at the end of every <<docs-index_,index>>, <<docs-delete,delete>>,
<<docs-update,update>>, or <<docs-bulk,bulk>> request. In fact, Elasticsearch
will only report success of an index, delete, update, or bulk request to the
client after the transaction log has been successfully ++fsync++ed and committed
@@ -50,7 +50,7 @@ control the behaviour of the transaction log:
`index.translog.sync_interval`::

How often the translog is ++fsync++ed to disk and committed, regardless of
write operations. Defaults to `5s`.
write operations. Defaults to `5s`. Values less than `100ms` are not allowed.

`index.translog.durability`::
+
@@ -233,6 +233,10 @@ The `index.translog.flush_threshold_ops` setting is not supported anymore. In or
growth use `index.translog.flush_threshold_size` instead. Changing the translog type with `index.translog.fs.type` is not supported
anymore, the `buffered` implementation is now the only available option and uses a fixed `8kb` buffer.

The translog by default is fsynced on a request basis such that the ability to fsync on every operation is not necessary anymore. In-fact it can
be a performance bottleneck and it's trappy since it enabled by a special value set on `index.translog.sync_interval`. `index.translog.sync_interval`
now doesn't accept a value less than `100ms` which prevents fsyncing too often if async durability is enabled. The special value `0` is not supported anymore.

==== Request Cache Settings

The deprecated settings `index.cache.query.enable` and `indices.cache.query.size` have been removed and are replaced with
@@ -527,11 +527,7 @@ public void randomIndexTemplate() throws IOException {
}

if (random.nextBoolean()) {
if (rarely(random)) {
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), 0); // 0 has special meaning to sync each op
} else {
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
}
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
}

return builder;

0 comments on commit 1df7d4d

Please sign in to comment.
You can’t perform that action at this time.