diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9e3af1f8f1f8c..eae25b7903ff1 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -304,7 +304,7 @@ protected Tuple shardOperationOnPrimary(Clu assert preVersionTypes[requestIndex] != null; } - processAfter(request, indexShard, location); + processAfter(request.refresh(), indexShard, location); BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; BulkItemRequest[] items = request.items(); for (int i = 0; i < items.length; i++) { @@ -500,21 +500,7 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request } } - processAfter(request, indexShard, location); - } - - private void processAfter(BulkShardRequest request, IndexShard indexShard, Translog.Location location) { - if (request.refresh()) { - try { - indexShard.refresh("refresh_flag_bulk"); - } catch (Throwable e) { - // ignore - } - } - - if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) { - indexShard.sync(location); - } + processAfter(request.refresh(), indexShard, location); } private void applyVersion(BulkItemRequest item, long version, VersionType versionType) { diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index c105fe189f423..0bbc37fa654ac 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -138,8 +138,7 @@ protected Tuple shardOperationOnPrimary(ClusterSt request.version(delete.version()); assert request.versionType().validateVersionForWrites(request.version()); - - processAfter(request, indexShard, delete.getTranslogLocation()); + processAfter(request.refresh(), indexShard, delete.getTranslogLocation()); DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found()); return new Tuple<>(response, shardRequest.request); @@ -151,7 +150,7 @@ protected void shardOperationOnReplica(ShardId shardId, DeleteRequest request) { Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); indexShard.delete(delete); - processAfter(request, indexShard, delete.getTranslogLocation()); + processAfter(request.refresh(), indexShard, delete.getTranslogLocation()); } @Override @@ -159,18 +158,4 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques return clusterService.operationRouting() .deleteShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing()); } - - private void processAfter(DeleteRequest request, IndexShard indexShard, Translog.Location location) { - if (request.refresh()) { - try { - indexShard.refresh("refresh_flag_delete"); - } catch (Throwable e) { - // ignore - } - } - - if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) { - indexShard.sync(location); - } - } } diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 60dfa9ee73dc9..348dae85a2d68 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -170,7 +170,7 @@ protected Tuple shardOperationOnPrimary(ClusterStat final WriteResult result = executeIndexRequestOnPrimary(null, request, indexShard); final IndexResponse response = result.response; final Translog.Location location = result.location; - processAfter(request, indexShard, location); + processAfter(request.refresh(), indexShard, location); return new Tuple<>(response, shardRequest.request); } @@ -193,20 +193,7 @@ protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) { throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); } operation.execute(indexShard); - processAfter(request, indexShard, operation.getTranslogLocation()); + processAfter(request.refresh(), indexShard, operation.getTranslogLocation()); } - private void processAfter(IndexRequest request, IndexShard indexShard, Translog.Location location) { - if (request.refresh()) { - try { - indexShard.refresh("refresh_flag_index"); - } catch (Throwable e) { - // ignore - } - } - - if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) { - indexShard.sync(location); - } - } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 6e2ea505fe601..dc6a89f81f08e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1082,4 +1082,18 @@ protected final WriteResult executeIndexRequestOnPrimary(BulkShar return new WriteResult(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation()); } + + protected final void processAfter(boolean refresh, IndexShard indexShard, Translog.Location location) { + if (refresh) { + try { + indexShard.refresh("refresh_flag_index"); + } catch (Throwable e) { + // ignore + } + } + if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) { + indexShard.sync(location); + } + indexShard.maybeFlush(); + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 097ad61fe0f6e..581bee103692f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -81,7 +81,6 @@ import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.cache.request.IndicesRequestCache; @@ -259,11 +258,9 @@ private void registerBuiltinIndexSettings() { registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2); registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE); registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY); - registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_INTERVAL, Validator.TIME); - registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER); - registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE); - registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME); - registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY); + registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER); + registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE); + registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY); registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY); registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY); registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN); diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index cdfe7b0153e78..cf43e6c2e8cca 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -59,7 +59,6 @@ import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreModule; -import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesLifecycle; @@ -435,9 +434,6 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable); } } - // now we can close the translog service, we need to close it before the we close the shard - // note the that the translog service is not there for shadow replicas - closeInjectorOptionalResource(sId, shardInjector, TranslogService.class); // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it if (indexShard != null) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java b/core/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java index bb19dd4908041..76ae65522c4d7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java +++ b/core/src/main/java/org/elasticsearch/index/shard/AbstractIndexShardComponent.java @@ -55,4 +55,6 @@ public Settings indexSettings() { public String nodeName() { return indexSettings.get("name", ""); } + + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index abf98391cfbe2..6b61f6955e805 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -46,10 +46,12 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.IndexService; @@ -113,7 +115,10 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class IndexShard extends AbstractIndexShardComponent { @@ -175,12 +180,20 @@ public class IndexShard extends AbstractIndexShardComponent { private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener(); private volatile boolean flushOnClose = true; + private volatile int flushThresholdOperations; + private volatile ByteSizeValue flushThresholdSize; + private volatile boolean disableFlush; /** * Index setting to control if a flush is executed before engine is closed * This setting is realtime updateable. */ public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close"; + public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops"; + public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size"; + public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush"; + + private final ShardPath path; private final IndexShardOperationCounter indexShardOperationCounter; @@ -250,8 +263,10 @@ public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, In cachingPolicy = new UsageTrackingQueryCachingPolicy(); } this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); + this.flushThresholdOperations = indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE)); + this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)); + this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false); this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId); - } public Store store() { @@ -1050,6 +1065,25 @@ public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryLi storeRecoveryService.recover(this, shouldExist, recoveryListener); } + /** + * Returns true iff this shard needs to be flushed due to too many translog operation or a too large transaction log. + * Otherwise false. + */ + boolean shouldFlush() { + if (disableFlush == false) { + Engine engine = engineUnsafe(); + if (engine != null) { + try { + Translog translog = engine.getTranslog(); + return translog.totalOperations() > flushThresholdOperations || translog.sizeInBytes() > flushThresholdSize.bytes(); + } catch (AlreadyClosedException ex) { + // that's fine we are already close - no need to flush + } + } + } + return false; + } + private class ApplyRefreshSettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { @@ -1058,6 +1092,22 @@ public void onRefreshSettings(Settings settings) { if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed return; } + int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, IndexShard.this.flushThresholdOperations); + if (flushThresholdOperations != IndexShard.this.flushThresholdOperations) { + logger.info("updating flush_threshold_ops from [{}] to [{}]", IndexShard.this.flushThresholdOperations, flushThresholdOperations); + IndexShard.this.flushThresholdOperations = flushThresholdOperations; + } + ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, IndexShard.this.flushThresholdSize); + if (!flushThresholdSize.equals(IndexShard.this.flushThresholdSize)) { + logger.info("updating flush_threshold_size from [{}] to [{}]", IndexShard.this.flushThresholdSize, flushThresholdSize); + IndexShard.this.flushThresholdSize = flushThresholdSize; + } + boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, IndexShard.this.disableFlush); + if (disableFlush != IndexShard.this.disableFlush) { + logger.info("updating disable_flush from [{}] to [{}]", IndexShard.this.disableFlush, disableFlush); + IndexShard.this.disableFlush = disableFlush; + } + final EngineConfig config = engineConfig; final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose); if (flushOnClose != IndexShard.this.flushOnClose) { @@ -1437,4 +1487,38 @@ private static Translog.Durabilty getFromSettings(ESLogger logger, Settings sett } } + private final AtomicBoolean asyncFlushRunning = new AtomicBoolean(); + + /** + * Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the + * Flush thread-pool asynchronously. + * @return true if a new flush is scheduled otherwise false. + */ + public boolean maybeFlush() { + if (shouldFlush()) { + if (asyncFlushRunning.compareAndSet(false, true)) { + final AbstractRunnable abstractRunnable = new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + if (state != IndexShardState.CLOSED) { + logger.warn("failed to flush index", t); + } + } + @Override + protected void doRun() throws Exception { + flush(new FlushRequest()); + } + + @Override + public void onAfter() { + asyncFlushRunning.compareAndSet(true, false); + } + }; + threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable); + return true; + } + } + return false; + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index 0851b1c0e18c0..08cf9b27ab866 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -20,18 +20,15 @@ package org.elasticsearch.index.shard; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Classes; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.cache.query.index.IndexQueryCache; import org.elasticsearch.index.engine.IndexSearcherWrapper; import org.elasticsearch.index.engine.IndexSearcherWrappingService; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.termvectors.ShardTermVectorsService; -import org.elasticsearch.index.translog.TranslogService; /** * The {@code IndexShardModule} module is responsible for binding the correct @@ -68,7 +65,6 @@ protected void configure() { bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton(); } else { bind(IndexShard.class).asEagerSingleton(); - bind(TranslogService.class).asEagerSingleton(); } bind(EngineFactory.class).to(engineFactoryImpl); diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 9e8776d1b1e97..502724e461c30 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -108,6 +108,12 @@ protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig con return engineFactory.newReadOnlyEngine(config); } + @Override + public boolean shouldFlush() { + // we don't need to flush since we don't write - all dominated by the primary + return false; + } + public boolean allowsPrimaryPromotion() { return false; } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogService.java deleted file mode 100644 index 8897227e991e2..0000000000000 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.index.engine.FlushNotAllowedEngineException; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.*; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.Closeable; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadLocalRandom; - -import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; - -/** - * - */ -public class TranslogService extends AbstractIndexShardComponent implements Closeable { - - - public static final String INDEX_TRANSLOG_FLUSH_INTERVAL = "index.translog.interval"; - public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops"; - public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size"; - public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD = "index.translog.flush_threshold_period"; - public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush"; - - private final ThreadPool threadPool; - private final IndexSettingsService indexSettingsService; - private final IndexShard indexShard; - - private volatile TimeValue interval; - private volatile int flushThresholdOperations; - private volatile ByteSizeValue flushThresholdSize; - private volatile TimeValue flushThresholdPeriod; - private volatile boolean disableFlush; - private volatile ScheduledFuture future; - - private final ApplySettings applySettings = new ApplySettings(); - - @Inject - public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, ThreadPool threadPool, IndexShard indexShard) { - super(shardId, indexSettings); - this.threadPool = threadPool; - this.indexSettingsService = indexSettingsService; - this.indexShard = indexShard; - this.flushThresholdOperations = indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE)); - this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)); - this.flushThresholdPeriod = indexSettings.getAsTime(INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TimeValue.timeValueMinutes(30)); - this.interval = indexSettings.getAsTime(INDEX_TRANSLOG_FLUSH_INTERVAL, timeValueMillis(5000)); - this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false); - logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod); - - this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush()); - - indexSettingsService.addListener(applySettings); - } - - @Override - public void close() { - indexSettingsService.removeListener(applySettings); - FutureUtils.cancel(this.future); - } - - - - class ApplySettings implements IndexSettingsService.Listener { - @Override - public void onRefreshSettings(Settings settings) { - int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, TranslogService.this.flushThresholdOperations); - if (flushThresholdOperations != TranslogService.this.flushThresholdOperations) { - logger.info("updating flush_threshold_ops from [{}] to [{}]", TranslogService.this.flushThresholdOperations, flushThresholdOperations); - TranslogService.this.flushThresholdOperations = flushThresholdOperations; - } - ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, TranslogService.this.flushThresholdSize); - if (!flushThresholdSize.equals(TranslogService.this.flushThresholdSize)) { - logger.info("updating flush_threshold_size from [{}] to [{}]", TranslogService.this.flushThresholdSize, flushThresholdSize); - TranslogService.this.flushThresholdSize = flushThresholdSize; - } - TimeValue flushThresholdPeriod = settings.getAsTime(INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TranslogService.this.flushThresholdPeriod); - if (!flushThresholdPeriod.equals(TranslogService.this.flushThresholdPeriod)) { - logger.info("updating flush_threshold_period from [{}] to [{}]", TranslogService.this.flushThresholdPeriod, flushThresholdPeriod); - TranslogService.this.flushThresholdPeriod = flushThresholdPeriod; - } - TimeValue interval = settings.getAsTime(INDEX_TRANSLOG_FLUSH_INTERVAL, TranslogService.this.interval); - if (!interval.equals(TranslogService.this.interval)) { - logger.info("updating interval from [{}] to [{}]", TranslogService.this.interval, interval); - TranslogService.this.interval = interval; - } - boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, TranslogService.this.disableFlush); - if (disableFlush != TranslogService.this.disableFlush) { - logger.info("updating disable_flush from [{}] to [{}]", TranslogService.this.disableFlush, disableFlush); - TranslogService.this.disableFlush = disableFlush; - } - } - } - - private TimeValue computeNextInterval() { - return new TimeValue(interval.millis() + (ThreadLocalRandom.current().nextLong(interval.millis()))); - } - - private class TranslogBasedFlush implements Runnable { - - private volatile long lastFlushTime = System.currentTimeMillis(); - - @Override - public void run() { - if (indexShard.state() == IndexShardState.CLOSED) { - return; - } - - // flush is disabled, but still reschedule - if (disableFlush) { - reschedule(); - return; - } - Translog translog = indexShard.engine().getTranslog(); - if (translog == null) { - reschedule(); - return; - } - int currentNumberOfOperations = translog.totalOperations(); - if (currentNumberOfOperations == 0) { - reschedule(); - return; - } - - if (flushThresholdOperations > 0) { - if (currentNumberOfOperations > flushThresholdOperations) { - logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations); - asyncFlushAndReschedule(); - return; - } - } - - if (flushThresholdSize.bytes() > 0) { - long sizeInBytes = translog.sizeInBytes(); - if (sizeInBytes > flushThresholdSize.bytes()) { - logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize); - asyncFlushAndReschedule(); - return; - } - } - - if (flushThresholdPeriod.millis() > 0) { - if ((threadPool.estimatedTimeInMillis() - lastFlushTime) > flushThresholdPeriod.millis()) { - logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod); - asyncFlushAndReschedule(); - return; - } - } - - reschedule(); - } - - private void reschedule() { - future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, this); - } - - private void asyncFlushAndReschedule() { - threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() { - @Override - public void run() { - try { - indexShard.flush(new FlushRequest()); - } catch (IllegalIndexShardStateException e) { - // we are being closed, or in created state, ignore - } catch (FlushNotAllowedEngineException e) { - // ignore this exception, we are not allowed to perform flush - } catch (Throwable e) { - logger.warn("failed to flush shard on translog threshold", e); - } - lastFlushTime = threadPool.estimatedTimeInMillis(); - - if (indexShard.state() != IndexShardState.CLOSED) { - future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, TranslogBasedFlush.this); - } - } - }); - } - } -} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8fd7821d2f684..b586a18807d20 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -46,6 +46,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; @@ -53,6 +55,7 @@ import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.indexing.IndexingOperationListener; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.Mapping; @@ -77,6 +80,8 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -685,4 +690,77 @@ public void postIndex(Engine.Index index, Throwable ex) { assertTrue(postIndexWithExceptionCalled.get()); } + + public void testMaybeFlush() throws Exception { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + IndexShard shard = test.shard(0); + assertFalse(shard.shouldFlush()); + client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get(); + client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get(); + assertFalse(shard.shouldFlush()); + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); + Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); + shard.index(index); + assertTrue(shard.shouldFlush()); + assertEquals(2, shard.engine().getTranslog().totalOperations()); + client().prepareIndex("test", "test", "2").setSource("{}").setRefresh(randomBoolean()).get(); + assertBusy(() -> { // this is async + assertFalse(shard.shouldFlush()); + }); + assertEquals(0, shard.engine().getTranslog().totalOperations()); + long size = shard.engine().getTranslog().sizeInBytes(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1000) + .put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES)).build()).get(); + client().prepareDelete("test", "test", "2").get(); + assertBusy(() -> { // this is async + assertFalse(shard.shouldFlush()); + }); + assertEquals(0, shard.engine().getTranslog().totalOperations()); + } + + public void testStressMaybeFlush() throws Exception { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.shard(0); + assertFalse(shard.shouldFlush()); + client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get(); + client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get(); + assertFalse(shard.shouldFlush()); + final AtomicBoolean running = new AtomicBoolean(true); + final int numThreads = randomIntBetween(2, 4); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread() { + public void run() { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + while (running.get()) { + shard.maybeFlush(); + } + } + }; + threads[i].start(); + } + barrier.await(); + FlushStats flushStats = shard.flushStats(); + long total = flushStats.getTotal(); + client().prepareIndex("test", "test", "1").setSource("{}").get(); + assertBusy(() -> { + assertEquals(total + 1, shard.flushStats().getTotal()); + }); + running.set(false); + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 354903fdb31d8..77b7c37a9cadf 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -62,7 +62,6 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; @@ -157,7 +156,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose - .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files + .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); ensureGreen(); @@ -263,7 +262,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose - .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files + .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); ensureGreen(); @@ -491,7 +490,7 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose - .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files + .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); ensureGreen(); @@ -547,7 +546,7 @@ public void testReplicaCorruption() throws Exception { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose - .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files + .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); ensureGreen(); diff --git a/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java b/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java index 40b902589c891..14b65265e5dba 100644 --- a/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java @@ -101,11 +101,11 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType.Loading; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.TranslogWriter; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cache.request.IndicesRequestCache; @@ -496,19 +496,13 @@ private static Settings.Builder setRandomIndexNormsLoading(Random random, Settin private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) { if (random.nextBoolean()) { - builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, RandomInts.randomIntBetween(random, 1, 10000)); + builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, RandomInts.randomIntBetween(random, 1, 10000)); } if (random.nextBoolean()) { - builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB)); + builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB)); } if (random.nextBoolean()) { - builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TimeValue.timeValueMinutes(RandomInts.randomIntBetween(random, 1, 60))); - } - if (random.nextBoolean()) { - builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_INTERVAL, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 1, 10000))); - } - if (random.nextBoolean()) { - builder.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, random.nextBoolean()); + builder.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, random.nextBoolean()); } if (random.nextBoolean()) { builder.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values())); @@ -1444,13 +1438,13 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma /** Disables translog flushing for the specified index */ public static void disableTranslogFlush(String index) { - Settings settings = Settings.builder().put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true).build(); + Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true).build(); client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); } /** Enables translog flushing for the specified index */ public static void enableTranslogFlush(String index) { - Settings settings = Settings.builder().put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, false).build(); + Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, false).build(); client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); } diff --git a/docs/reference/index-modules/translog.asciidoc b/docs/reference/index-modules/translog.asciidoc index 31cc4f7bf9240..5cb2e4b65a388 100644 --- a/docs/reference/index-modules/translog.asciidoc +++ b/docs/reference/index-modules/translog.asciidoc @@ -35,16 +35,6 @@ Once the translog hits this size, a flush will happen. Defaults to `512mb`. After how many operations to flush. Defaults to `unlimited`. -`index.translog.flush_threshold_period`:: - -How long to wait before triggering a flush regardless of translog size. Defaults to `30m`. - -`index.translog.interval`:: - -How often to check if a flush is needed, randomized between the interval value -and 2x the interval value. Defaults to `5s`. - - [float] === Translog settings