Skip to content

Commit

Permalink
Remove TranslogService and fold it into synchronous IndexShard API
Browse files Browse the repository at this point in the history
This commit moves the size and ops based flush into a synchronous API into
IndexShard and removes the time-based flush alltogether since it' basically
covered by the inactive async flush API we have today. The functionality doesn't
need to be covered by scheduled task and async APIs while we can actually make all
the decisions in a sync manner which is way easier to control and to test.

Closes #13707
  • Loading branch information
s1monw committed Sep 23, 2015
1 parent 4fb6386 commit 75e8164
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 297 deletions.
Expand Up @@ -304,7 +304,7 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Clu
assert preVersionTypes[requestIndex] != null;
}

processAfter(request, indexShard, location);
processAfter(request.refresh(), indexShard, location);
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
Expand Down Expand Up @@ -500,21 +500,7 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request
}
}

processAfter(request, indexShard, location);
}

private void processAfter(BulkShardRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_bulk");
} catch (Throwable e) {
// ignore
}
}

if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
processAfter(request.refresh(), indexShard, location);
}

private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {
Expand Down
Expand Up @@ -138,8 +138,7 @@ protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterSt
request.version(delete.version());

assert request.versionType().validateVersionForWrites(request.version());

processAfter(request, indexShard, delete.getTranslogLocation());
processAfter(request.refresh(), indexShard, delete.getTranslogLocation());

DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found());
return new Tuple<>(response, shardRequest.request);
Expand All @@ -151,26 +150,12 @@ protected void shardOperationOnReplica(ShardId shardId, DeleteRequest request) {
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);

indexShard.delete(delete);
processAfter(request, indexShard, delete.getTranslogLocation());
processAfter(request.refresh(), indexShard, delete.getTranslogLocation());
}

@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterService.operationRouting()
.deleteShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
}

private void processAfter(DeleteRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_delete");
} catch (Throwable e) {
// ignore
}
}

if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
}
}
Expand Up @@ -170,7 +170,7 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard);
final IndexResponse response = result.response;
final Translog.Location location = result.location;
processAfter(request, indexShard, location);
processAfter(request.refresh(), indexShard, location);
return new Tuple<>(response, shardRequest.request);
}

Expand All @@ -193,20 +193,7 @@ protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
processAfter(request, indexShard, operation.getTranslogLocation());
processAfter(request.refresh(), indexShard, operation.getTranslogLocation());
}

private void processAfter(IndexRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");
} catch (Throwable e) {
// ignore
}
}

if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
}
}
Expand Up @@ -1082,4 +1082,18 @@ protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShar

return new WriteResult(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
}

protected final void processAfter(boolean refresh, IndexShard indexShard, Translog.Location location) {
if (refresh) {
try {
indexShard.refresh("refresh_flag_index");
} catch (Throwable e) {
// ignore
}
}
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
indexShard.maybeFlush();
}
}
Expand Up @@ -81,7 +81,6 @@
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
Expand Down Expand Up @@ -259,11 +258,9 @@ private void registerBuiltinIndexSettings() {
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_INTERVAL, Validator.TIME);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY);
registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -59,7 +59,6 @@
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle;
Expand Down Expand Up @@ -435,9 +434,6 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable);
}
}
// now we can close the translog service, we need to close it before the we close the shard
// note the that the translog service is not there for shadow replicas
closeInjectorOptionalResource(sId, shardInjector, TranslogService.class);
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
Expand Down
Expand Up @@ -55,4 +55,6 @@ public Settings indexSettings() {
public String nodeName() {
return indexSettings.get("name", "");
}


}
86 changes: 85 additions & 1 deletion core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -46,10 +46,12 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.IndexService;
Expand Down Expand Up @@ -113,7 +115,10 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class IndexShard extends AbstractIndexShardComponent {

Expand Down Expand Up @@ -175,12 +180,20 @@ public class IndexShard extends AbstractIndexShardComponent {

private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
private volatile boolean disableFlush;

/**
* Index setting to control if a flush is executed before engine is closed
* This setting is realtime updateable.
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";


private final ShardPath path;

private final IndexShardOperationCounter indexShardOperationCounter;
Expand Down Expand Up @@ -250,8 +263,10 @@ public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, In
cachingPolicy = new UsageTrackingQueryCachingPolicy();
}
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.flushThresholdOperations = indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE));
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);

}

public Store store() {
Expand Down Expand Up @@ -1050,6 +1065,25 @@ public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryLi
storeRecoveryService.recover(this, shouldExist, recoveryListener);
}

/**
* Returns <code>true</code> iff this shard needs to be flushed due to too many translog operation or a too large transaction log.
* Otherwise <code>false</code>.
*/
boolean shouldFlush() {
if (disableFlush == false) {
Engine engine = engineUnsafe();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.totalOperations() > flushThresholdOperations || translog.sizeInBytes() > flushThresholdSize.bytes();
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
}
}
}
return false;
}

private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
Expand All @@ -1058,6 +1092,22 @@ public void onRefreshSettings(Settings settings) {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, IndexShard.this.flushThresholdOperations);
if (flushThresholdOperations != IndexShard.this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", IndexShard.this.flushThresholdOperations, flushThresholdOperations);
IndexShard.this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, IndexShard.this.flushThresholdSize);
if (!flushThresholdSize.equals(IndexShard.this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", IndexShard.this.flushThresholdSize, flushThresholdSize);
IndexShard.this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, IndexShard.this.disableFlush);
if (disableFlush != IndexShard.this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", IndexShard.this.disableFlush, disableFlush);
IndexShard.this.disableFlush = disableFlush;
}

final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose);
if (flushOnClose != IndexShard.this.flushOnClose) {
Expand Down Expand Up @@ -1437,4 +1487,38 @@ private static Translog.Durabilty getFromSettings(ESLogger logger, Settings sett
}
}

private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();

/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
*/
public boolean maybeFlush() {
if (shouldFlush()) {
if (asyncFlushRunning.compareAndSet(false, true)) {
final AbstractRunnable abstractRunnable = new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", t);
}
}
@Override
protected void doRun() throws Exception {
flush(new FlushRequest());
}

@Override
public void onAfter() {
asyncFlushRunning.compareAndSet(true, false);
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable);
return true;
}
}
return false;
}

}
Expand Up @@ -20,18 +20,15 @@
package org.elasticsearch.index.shard;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Classes;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.query.index.IndexQueryCache;
import org.elasticsearch.index.engine.IndexSearcherWrapper;
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
import org.elasticsearch.index.translog.TranslogService;

/**
* The {@code IndexShardModule} module is responsible for binding the correct
Expand Down Expand Up @@ -68,7 +65,6 @@ protected void configure() {
bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton();
} else {
bind(IndexShard.class).asEagerSingleton();
bind(TranslogService.class).asEagerSingleton();
}

bind(EngineFactory.class).to(engineFactoryImpl);
Expand Down
Expand Up @@ -108,6 +108,12 @@ protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig con
return engineFactory.newReadOnlyEngine(config);
}

@Override
public boolean shouldFlush() {
// we don't need to flush since we don't write - all dominated by the primary
return false;
}

public boolean allowsPrimaryPromotion() {
return false;
}
Expand Down

0 comments on commit 75e8164

Please sign in to comment.