Shards with heavy indexing should get more of the indexing buffer #14121

Merged
merged 24 commits into from Jan 12, 2016

Projects

None yet

6 participants

@mikemccand
Contributor

Today we take the total indexing buffer (default: 10% of heap) and divide it equally across all active shards.

But this is a sub-optimal usage of RAM for indexing: maybe the node has a bunch of small shards (e.g. Marvel) which require hardly any indexing heap, but were assigned a big chunk of heap (which typically goes mostly unused), while other heavy indexing shards were assigned the same indexing buffer but could effectively make use of much more.

This problem is very nearly the same issue IndexWriter faces, being told it has an X MB overall indexing buffer to use and then having to manage the N separate in-memory segments (one per thread).

I think we (ES) should take the same approach as IndexWriter, except across shards on the node: tell Lucene each shard has an effectively unlimited indexing buffer, but then periodically sum up the actual bytes used across all and when it's over the node's budget, ask the most-heap-consuming shard(s) to refresh to clear the heap.

This should also reduce merge pressure across the node since we'd typically be flushing fewer, larger segments, and helps smooth out IO pressure somewhat (instead of N shards trying to write at once, we stage it over time).

I also removed all configuration associated with the translog buffer (index.translog.fs.buffer_size): it's now hardwired to 32 KB. I don't understand why we need this buffer to be tunable: let the OS manage RAM assigned for IO write buffering / dirty pages?

@mikemccand mikemccand self-assigned this Oct 14, 2015
@nik9000 nik9000 and 2 others commented on an outdated diff Oct 14, 2015
...in/java/org/elasticsearch/index/shard/IndexShard.java
}
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
- * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
- * if the shard is inactive. */
- public boolean checkIdle(long inactiveTimeNS) {
+ * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. */
+ public void checkIdle(long inactiveTimeNS) {
@nik9000
nik9000 Oct 14, 2015 Contributor

Is idle really needed any more?

@mikemccand
mikemccand Oct 14, 2015 Contributor

Alas it's still needed. I removed it at first! And was so happy about the simplification :) But then it broke sync'd flush, which is triggered by that onShardInactive call.

@s1monw
s1monw Oct 17, 2015 Contributor

can we keep it internally? I mean we could store it alongside the active boolean?

@nik9000 nik9000 and 1 other commented on an outdated diff Oct 14, 2015
...icsearch/indices/memory/IndexingMemoryController.java
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
- // we need to have this relatively small to move a shard from inactive to active fast (enough)
- this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30));
+ // we need to have this relatively small to free up heap quickly enough
+ this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(1));
@nik9000
nik9000 Oct 14, 2015 Contributor

I wonder how safe this is. Probably quite safe in all but the highest throughput environments....

@mikemccand
mikemccand Oct 14, 2015 Contributor

I suspect it's plenty fast for even the craziest fast nodes for today ... one thing I thought about, if we really are worried about this, is maybe forcing a check from InternalEngine "every N bytes" of indexed docs, e.g. we could measure the bytes of the original request, or what we appended to the translog, sum that into an AtomicLong and force a check every N bytes ... but I think this is not really necessary.

@nik9000
nik9000 Oct 14, 2015 Contributor

Makes sense to me.

@mikemccand
mikemccand Oct 15, 2015 Contributor

Thinking about this more, I think it's important we force a check based on total bytes coming into the engine, in case there is thread starvation and the IMC threads fails to run frequently enough. I think I can maybe use Translog.Location for this ...

@nik9000
Contributor
nik9000 commented Oct 14, 2015

I wonder if we should ever ask for more refreshes than we need to get below the buffer. I think the old buffer would keep the heap usage of the buffer under or at the buffer's size but this scheme will keep the buffer closer to full all the time. In the most degenerate case of a bunch shards (n) being written to at the same rate this implementation will end up with more "floating".

@mikemccand
Contributor

I think the old buffer would keep the heap usage of the buffer under or at the buffer's size but this scheme will keep the buffer closer to full all the time.

Actually Lucene's IndexWriter works just like this change, riding the buffer near full, i.e. it waits for the heap used to cross the limit, then it picks the biggest in-memory segment(s) to write and writes it until heap used is under the budget again.

If we wanted we could add some hysteresis, e.g. when we cross full, we drive the buffer back down to maybe 75% of full, to give it a sharper sawtooth pattern ... but I don't think this is really necessary.

Typically the scheduled refresh (default: every 1s) is going to free the RAM well before IndexingMemoryController logic here kicks in.

@nik9000
Contributor
nik9000 commented Oct 14, 2015

Typically the scheduled refresh (default: every 1s) is going to free the RAM well before IndexingMemoryController logic here kicks in.

Yeah - I suspect so. I just thought about it and figured it was worth mentioning.

@s1monw s1monw commented on an outdated diff Oct 17, 2015
...ain/java/org/elasticsearch/cluster/ClusterModule.java
@@ -231,7 +231,6 @@ private void registerBuiltinIndexSettings() {
registerIndexDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
- registerIndexDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE);
@s1monw
s1monw Oct 17, 2015 Contributor

good! :)

@s1monw s1monw commented on an outdated diff Oct 17, 2015
...va/org/elasticsearch/index/engine/InternalEngine.java
@@ -783,8 +758,12 @@ protected final void writerSegmentStats(SegmentsStats stats) {
}
@Override
- public long indexWriterRAMBytesUsed() {
- return indexWriter.ramBytesUsed();
+ public long indexBufferRAMBytesUsed() {
+ if (refreshing) {
+ return 0;
+ } else {
+ return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
@s1monw
s1monw Oct 17, 2015 Contributor

watch out this can throw a AlreadyClosedException! I really wonder why we need the refreshing invariant... can you explain and add a comment?

@s1monw s1monw commented on an outdated diff Oct 17, 2015
...in/java/org/elasticsearch/index/shard/IndexShard.java
}
-
- engine.getTranslog().updateBuffer(shardTranslogBufferSize);
+ return engine.indexBufferRAMBytesUsed();
@s1monw
s1monw Oct 17, 2015 Contributor

this can throw AlreadyClosedException you might wanna catch it and return 0 instead?

@s1monw s1monw commented on an outdated diff Oct 17, 2015
...in/java/org/elasticsearch/index/shard/IndexShard.java
@@ -1230,6 +1176,23 @@ public PercolateStats percolateStats() {
return percolatorQueriesRegistry.stats();
}
+ /**
+ * Asynchronously refreshes the engine for new search operations to reflect the latest
+ * changes.
+ */
+ public void refreshAsync(final String reason) {
+ engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
@s1monw
s1monw Oct 17, 2015 Contributor

we do this already in EngineRefresher in this class, maybe we can factor it out and put that here?

@s1monw s1monw commented on an outdated diff Oct 17, 2015
...icsearch/indices/memory/IndexingMemoryController.java
@@ -224,12 +159,39 @@ protected boolean shardAvailable(ShardId shardId) {
return shardAvailable(getShard(shardId));
}
+ /** returns how much heap this shard is using for its indexing buffer */
+ protected long getIndexBufferRAMBytesUsed(ShardId shardId) {
+ IndexShard shard = getShard(shardId);
+ if (shard == null) {
+ return 0;
+ }
+
+ return shard.getIndexBufferRAMBytesUsed();
+ }
+
+ /** ask this shard to refresh, in the background, to free up heap */
+ protected void refreshShardAsync(ShardId shardId) {
+ IndexShard shard = getShard(shardId);
+ if (shard != null) {
+ shard.refreshAsync("memory");
@s1monw
s1monw Oct 17, 2015 Contributor

I wonder if we should keep IndexShard clean just call refresh from a thread here?

@s1monw s1monw commented on an outdated diff Oct 17, 2015
...icsearch/indices/memory/IndexingMemoryController.java
/** check if any shards active status changed, now. */
public void forceCheck() {
statusChecker.run();
}
- class ShardsIndicesStatusChecker implements Runnable {
+ static class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
@s1monw
s1monw Oct 17, 2015 Contributor

can this class be final?

@s1monw s1monw commented on an outdated diff Oct 17, 2015
...icsearch/indices/memory/IndexingMemoryController.java
- // Shard became active itself, since we last checked (due to new indexing op arriving)
- changes.add(ShardStatusChangeType.BECAME_ACTIVE);
- logger.debug("marking shard {} as active indexing wise", shardId);
- shardWasActive.put(shardId, true);
- } else if (checkIdle(shardId, inactiveTime.nanos()) == Boolean.TRUE) {
- // Make shard inactive now
- changes.add(ShardStatusChangeType.BECAME_INACTIVE);
- logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
- shardId,
- inactiveTime);
- shardWasActive.put(shardId, false);
+ if (totalBytesUsed > indexingBuffer.bytes()) {
+ // OK we are using too much; make a queue and ask largest shard(s) to refresh:
+ logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer);
+ PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>();
+ for (ShardId shardId : availableShards()) {
@s1monw
s1monw Oct 17, 2015 Contributor

is it possible that if we can't refresh quick enough that we schedule a large amount of refreshes even if one is still pending? if the node is busy it can pile up quite a bit no? I think we should have at most one refresh in flight per index shard otherwise we might refresh the same shard over and over again?

When we worked on DWPT you pushed hard on having some stall control that blocks indexing threads if we can't flush fast enough, do we need something like this here as well? over paranoid? ;)

@mikemccand
Contributor

I chatted with @s1monw about this ... I think we can add an API to IndexWriter to give us more granular control (write just the biggest segment to disk), and more specific control (just write the segment to disk, don't refresh it) to move our dirty bytes to the OS so it can move them to disk. I opened https://issues.apache.org/jira/browse/LUCENE-6849 to work on this.

I agree the stalling issue is important, so just pretending "0 bytes heap used" as soon as we start moving bytes to disk, is dangerous. I'll change the PR to track "pending dirty bytes moving to disk", and if that pending bytes is too large vs our budget, we need throttle incoming indexing, hopefully just tapping into the index throttling we already have for when merges are falling behind. The OS will do its own scary back-pressure here (blocking any thread, or maybe/probably the whole process, that's attempting to write to disk) when its efforts to move dirty bytes to disk are falling behind.

@clintongormley clintongormley removed the v2.1.0 label Nov 20, 2015
@mikemccand
Contributor

I finally got back to this PR (merged master = painful!) and addressing feedback from @nik9000 and @s1monw:

  • Now we sometimes use the (new) IndexWriter.flush API to move indexing buffers to disk, which is cheaper than a refresh since that must also apply all buffered deletes and then open SegmentReaders.
  • I added back-pressure for when incoming indexing rate exceeds how quickly we can move the indexing buffer to disk: I just tap into the existing single-threaded index throttling we already do when merges are falling behind on a single shard

I think this is ready ... it's a big change, but I think an important one for having ES use its allotted (default 10% of JVM's heap) indexing buffer more efficiently when there are many shards with different indexing rates on one node.

@mikemccand mikemccand added the review label Dec 17, 2015
@s1monw s1monw commented on the diff Dec 21, 2015
...va/org/elasticsearch/index/engine/InternalEngine.java
@@ -537,6 +513,43 @@ public void refresh(String source) throws EngineException {
}
@Override
+ public void writeIndexingBuffer() throws EngineException {
+
+ // we obtain a read lock here, since we don't want a flush to happen while we are writing
+ // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
+ try (ReleasableLock lock = readLock.acquire()) {
+ ensureOpen();
+
+ // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
@s1monw
s1monw Dec 21, 2015 Contributor

I agree with this TODO maybe we should open an issue for this?

@mikemccand
mikemccand Jan 5, 2016 Contributor

I opened #15768 for this.

@s1monw s1monw commented on an outdated diff Dec 21, 2015
...va/org/elasticsearch/index/engine/InternalEngine.java
@@ -537,6 +513,43 @@ public void refresh(String source) throws EngineException {
}
@Override
+ public void writeIndexingBuffer() throws EngineException {
+
+ // we obtain a read lock here, since we don't want a flush to happen while we are writing
+ // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
+ try (ReleasableLock lock = readLock.acquire()) {
+ ensureOpen();
+
+ // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
+ // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
+ // refresh API), and another for version map interactions:
+ long versionMapBytes = versionMap.ramBytesUsedForRefresh();
@s1monw
s1monw Dec 21, 2015 Contributor

can we make all the vars final?

@s1monw s1monw commented on an outdated diff Dec 21, 2015
...va/org/elasticsearch/index/engine/InternalEngine.java
@@ -810,8 +823,8 @@ protected final void writerSegmentStats(SegmentsStats stats) {
}
@Override
- public long indexWriterRAMBytesUsed() {
- return indexWriter.ramBytesUsed();
+ public long indexBufferRAMBytesUsed() {
+ return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
@s1monw
s1monw Dec 21, 2015 Contributor

oh awesome.. should we rename the method getIndexBufferRAMBytesUsed

@s1monw s1monw commented on an outdated diff Dec 21, 2015
...va/org/elasticsearch/index/engine/InternalEngine.java
public void activateThrottling() {
- throttle.activate();
+ int count = throttleRequestCount.incrementAndGet();
+ assert count >= 1;
@s1monw
s1monw Dec 21, 2015 Contributor

can you add the count to the message?

@s1monw s1monw commented on the diff Dec 21, 2015
...org/elasticsearch/index/shard/IndexEventListener.java
@@ -90,13 +89,6 @@ default void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardS
default void onShardInactive(IndexShard indexShard) {}
/**
- * Called when a shard is marked as active ie. was previously inactive and is now active again.
@s1monw
s1monw Dec 21, 2015 Contributor

awesome

@s1monw s1monw and 1 other commented on an outdated diff Dec 21, 2015
...icsearch/indices/memory/IndexingMemoryController.java
@@ -211,93 +179,181 @@ public ByteSizeValue translogBufferSize() {
return availableShards;
}
- /** returns true if shard exists and is availabe for updates */
- protected boolean shardAvailable(IndexShard shard) {
- // shadow replica doesn't have an indexing buffer
- return shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
+ /** returns how much heap this shard is using for its indexing buffer */
+ protected long getIndexBufferRAMBytesUsed(IndexShard shard) {
+ return shard.getIndexBufferRAMBytesUsed();
@s1monw
s1monw Dec 21, 2015 Contributor

this is protected just for testing right?

@mikemccand
mikemccand Jan 5, 2016 Contributor

yes

@s1monw s1monw commented on an outdated diff Dec 21, 2015
...icsearch/indices/memory/IndexingMemoryController.java
class ShardsIndicesStatusChecker implements Runnable {
+
+ long bytesWrittenSinceCheck;
+
+ /** Shard calls this on each indexing/delete op */
+ public synchronized void bytesWritten(int bytes) {
+ bytesWrittenSinceCheck += bytes;
@s1monw
s1monw Dec 21, 2015 Contributor

hmm I am not sure if this is relevant but do we want to run this under a lock all the time? I mean we will block for the entire check which can be longish? We could use an AtomicLong here instead and protect the run method with a ReentrantLock and only do it if we can acquire the lock with tryLock. then just do the check in while loop and we are good?

@s1monw s1monw and 1 other commented on an outdated diff Dec 21, 2015
...icsearch/indices/memory/IndexingMemoryController.java
+ static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
+ final long bytesUsed;
+ final IndexShard shard;
+
+ public ShardAndBytesUsed(long bytesUsed, IndexShard shard) {
+ this.bytesUsed = bytesUsed;
+ this.shard = shard;
+ }
+
+ @Override
+ public int compareTo(ShardAndBytesUsed other) {
+ // Sort larger shards first:
+ return Long.compare(other.bytesUsed, bytesUsed);
+ }
+ }
+
class ShardsIndicesStatusChecker implements Runnable {
@s1monw
s1monw Dec 21, 2015 Contributor

I wonder if we wanna pass the actual parts this one uses in a ctor to make it more clear what it uses instead of an non-static inner class

@mikemccand
mikemccand Jan 5, 2016 Contributor

The problem is this inner class is actually using a lot from its containing class: indexingBuffer, availableShards(), inactiveTime, writingBytes, getIndexBufferRAMBytesUsed(), logger, etc.

@s1monw
s1monw Jan 5, 2016 Contributor

fair enough...

@s1monw s1monw commented on an outdated diff Dec 21, 2015
...icsearch/indices/memory/IndexingMemoryController.java
@Override
public synchronized void run() {
- List<IndexShard> availableShards = availableShards();
- List<IndexShard> activeShards = new ArrayList<>();
- for (IndexShard shard : availableShards) {
- if (!checkIdle(shard)) {
- activeShards.add(shard);
+
@s1monw
s1monw Dec 21, 2015 Contributor

phew nice! I mean it's pretty much the same as IW or rather DocumentsWriter does on a per doc basis but it's a pretty large method with lots of loops I really wonder if we wanna only do it in a tryLock fashion if possible?

@s1monw
Contributor
s1monw commented Dec 21, 2015

@mikemccand this looks awesome - I left some comments. I wonder if we should put this in a feature branch and let CI chew on it for a while?

@s1monw s1monw commented on an outdated diff Dec 21, 2015
...in/java/org/elasticsearch/index/shard/IndexShard.java
+ } else {
+ if (state != IndexShardState.CLOSED) {
+ logger.warn("Failed to perform engine refresh", e);
+ }
+ }
+ } else {
+ if (state != IndexShardState.CLOSED) {
+ logger.warn("Failed to perform engine refresh", e);
+ }
+ }
+ }
+
+ /**
+ * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
+ */
+ public void writeIndexingBufferAsync() {
@s1monw
s1monw Dec 21, 2015 Contributor

I wonder if we should really add more async logic to IndexShard or if we should just add the sync method in here an make the caller call it in an async fashion?

@s1monw s1monw commented on an outdated diff Dec 21, 2015
...icsearch/indices/memory/IndexingMemoryController.java
this.statusChecker = new ShardsIndicesStatusChecker();
- logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}]",
- this.indexingBuffer,
- MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
- MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize,
- SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval);
+ logger.debug("using indexing buffer size [{}] with {} [{}], {} [{}]",
+ this.indexingBuffer,
+ SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
+ SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
+ }
+
+ /** Shard calls this when it starts writing its indexing buffer to disk to notify us */
+ public void addWritingBytes(IndexShard shard, long numBytes) {
+ writingBytes.put(shard, numBytes);
@s1monw
s1monw Dec 21, 2015 Contributor

@mikemccand if we just do a put here don't we override if there is already somthing in the map? I think we need to add / maybe use an atomic long and use putIfAbsent?

@s1monw
s1monw Dec 21, 2015 Contributor

I wonder if we should just keep an AtomicLong on the IndexShard and just query it instead?

@s1monw s1monw and 1 other commented on an outdated diff Dec 21, 2015
...in/java/org/elasticsearch/index/shard/IndexShard.java
@@ -531,6 +528,9 @@ public void delete(Engine.Delete delete) {
throw ex;
}
indexingService.postDelete(delete);
@s1monw
s1monw Dec 21, 2015 Contributor

if we remove the removeWritingBytes / addWritingBytes and just use a AtomicLong on IndexShard we can move indexingMemoryController.bytesWritten into indexingService.postDelete / indexingService.postIndex and therefore can remove the dependency between IndexShard and IndexMemoryController which I think we should not have, What do you think @mikemccand

@mikemccand
mikemccand Jan 5, 2016 Contributor

can remove the dependency between IndexShard and IndexMemoryController

I like this in principle but does this mean I should have IMC call .addListener for each shard so it's invoked on each indexing op? This seems tricky (I somehow need to do this as soon as the shard is created before any indexing ops are sent to it). Or can I just call IMC.bytesWritten directly inside ShardIndexingService.postIndex,Delete?

@spinscale spinscale added v2.3.0 and removed v2.2.0 labels Dec 23, 2015
@mikemccand
Contributor

OK I merged master and folded in all feedback, except removing IMC dependency from IndexShard (@s1monw has some pre-cursor cleanup ideas here)... I'm going to push to a feature branch so Jenkins can chew on it. I'll also run indexing perf tests.

@mikemccand
Contributor

I ran an indexing benchmark and it looks like there's no real change to indexing throughput, which is good!

Master got 26.0 K doc/sec, and 26.5 K docs/sec (2nd try), and this PR got 26.9 K docs/sec and 26.3 K docs/sec ... seems like any change is in the noise.

This is really just a "first do no harm" test, because this indexing test is indexing full speed / uniformly into 5 shards, whereas the PR should do better is when indexing heavily into one set of shards and slowly into another ...

@dakrone
Member
dakrone commented Jan 6, 2016
@mikemccand
Contributor

I'm curious what the indexing throughput change is if the
Marvel plugin is used to have slowly indexing shards while the benchmark
is running.

I tested this and the effect looks minor. At first I ran the Marvel plugin at defaults, but this only creates 2 lightweight shards (plus my 6 heavily indexing shards) ... so then I set a custom template to have Marvel use 5 shards, and saw maybe a smallish impact:

iwbuffer.12.marvel.fast.2.log:Total docs/sec: 32240.0
iwbuffer.12.marvel.fast.log:Total docs/sec: 30976.7
master.12.marvel.fast.2.log:Total docs/sec: 31118.2
master.12.marvel.fast.log:Total docs/sec: 30037.0

The iwbuffer log is this PR, the 12 is using 12 client-side indexing threads. So maybe a minor gain in this test, or maybe within measurement noise. This was using the same settings as "Fast" from https://benchmarks.elastic.co : refresh every 30 sec, 4 gb heap, 4 gb xlog flush threshold.

@mikemccand
Contributor

I merged master and fixed IMC to "just" be another IndexOperationListener ... I think this is ready.

However, a number of the CI jobs have been failing: https://build-us-01.elastic.co/view/Elasticsearch/job/elastic+elasticsearch+fair_indexing_buffers+periodic/

The failures are weird, like something is interrupting the build (Thread.interrupt) vs true test failures caused by this change, I think... but I'll dig.

@s1monw s1monw commented on an outdated diff Jan 11, 2016
...c/main/java/org/elasticsearch/index/IndexService.java
@@ -113,7 +115,8 @@ public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
IndexStore indexStore,
IndexEventListener eventListener,
IndexModule.IndexSearcherWrapperFactory wrapperFactory,
- MapperRegistry mapperRegistry) throws IOException {
+ MapperRegistry mapperRegistry,
+ IndexingMemoryController indexingMemoryController) throws IOException {
@s1monw
s1monw Jan 11, 2016 Contributor

Can this be a IndexingOperationListener... listeners that way we don't introduce a hard dependency on IMC

@s1monw s1monw commented on an outdated diff Jan 11, 2016
...rc/main/java/org/elasticsearch/index/IndexModule.java
@@ -241,7 +242,8 @@ public boolean match(String setting) {
IndexSearcherWrapper newWrapper(final IndexService indexService);
}
- public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry) throws IOException {
+ public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry,
+ IndexingMemoryController indexingMemoryController) throws IOException {
@s1monw
s1monw Jan 11, 2016 Contributor

can this also be IndexingOperationListener... listeners please

@s1monw s1monw commented on the diff Jan 11, 2016
...in/java/org/elasticsearch/index/shard/IndexShard.java
public void refresh(String source) {
verifyNotClosed();
- if (logger.isTraceEnabled()) {
- logger.trace("refresh with source: {}", source);
+ if (canIndex()) {
+ long bytes = getEngine().getIndexBufferRAMBytesUsed();
+ writingBytes.addAndGet(bytes);
+ try {
+ logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
+ long time = System.nanoTime();
+ getEngine().refresh(source);
+ refreshMetric.inc(System.nanoTime() - time);
+ } finally {
+ logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
@s1monw
s1monw Jan 11, 2016 Contributor

can we make this a trace or should be keep using debug for this?

@mikemccand
mikemccand Jan 11, 2016 Contributor

I like debug for this? It should be quite infrequent, since we are asking most memory consuming shards to write, compared to our typical frequency at trace level?

@s1monw s1monw commented on an outdated diff Jan 11, 2016
...in/java/org/elasticsearch/index/shard/IndexShard.java
@@ -954,11 +974,9 @@ public void readAllowed() throws IllegalIndexShardStateException {
}
}
- /** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
+ /** Sets {@code active} to true if we were inactive. */
private void markLastWrite() {
@s1monw
s1monw Jan 11, 2016 Contributor

do we need a method here or can we just set the value directly?

@s1monw s1monw commented on the diff Jan 11, 2016
...in/java/org/elasticsearch/index/shard/IndexShard.java
- // our allowed buffer was changed to less than we are currently using; we ask IW to refresh
- // so it clears its buffers (otherwise it won't clear until the next indexing/delete op)
- logger.debug(message + "; now refresh to clear IndexWriter memory");
-
- // TODO: should IW have an API to move segments to disk, but not refresh? Its flush method is protected...
- try {
- refresh("update index buffer");
- } catch (Throwable e) {
- logger.warn("failed to refresh after decreasing index buffer", e);
- }
- } else {
- logger.debug(message);
- }
+ try {
+ return engine.getIndexBufferRAMBytesUsed();
+ } catch (AlreadyClosedException ex) {
@s1monw
s1monw Jan 11, 2016 Contributor

does this also throw EngineClosedException ?

@s1monw s1monw and 1 other commented on an outdated diff Jan 11, 2016
...g/elasticsearch/indices/IndexingMemoryController.java
}
protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
// it's fine to run it on the scheduler thread, no busy work
- return threadPool.scheduleWithFixedDelay(statusChecker, interval);
+ if (threadPool != null) {
+ return threadPool.scheduleWithFixedDelay(statusChecker, interval);
+ } else {
+ // tests pass null for threadPool --> no periodic checking
@s1monw
s1monw Jan 11, 2016 Contributor

wait I added this method to make sure test can just override and pass null for threadPool?

@mikemccand
mikemccand Jan 11, 2016 Contributor

LOL, woops :)

@s1monw s1monw commented on an outdated diff Jan 11, 2016
...g/elasticsearch/indices/IndexingMemoryController.java
- protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize) {
- try {
- shard.updateBufferSize(shardIndexingBufferSize);
- } catch (EngineClosedException | FlushNotAllowedEngineException e) {
- // ignore
- } catch (Exception e) {
- logger.warn("failed to set shard {} index buffer to [{}]", e, shard.shardId(), shardIndexingBufferSize);
- }
+ /** returns how many bytes this shard is currently writing to disk */
+ protected long getShardWritingBytes(IndexShard shard) {
+ return shard.getWritingBytes();
+ }
+
+ /** ask this shard to refresh, in the background, to free up heap */
+ protected void writeIndexingBufferAsync(IndexShard shard) {
+ threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
@s1monw
s1monw Jan 11, 2016 Contributor

can we please use abstract runnable and override onReject() to ensure we don't barf if that threadpool shuts down.

@s1monw s1monw commented on the diff Jan 11, 2016
...g/elasticsearch/indices/IndexingMemoryController.java
+ public void bytesWritten(int bytes) {
+ statusChecker.bytesWritten(bytes);
+ }
+
+ /** Asks this shard to throttle indexing to one thread */
+ protected void activateThrottling(IndexShard shard) {
+ shard.activateThrottling();
+ }
+
+ /** Asks this shard to stop throttling indexing to one thread */
+ protected void deactivateThrottling(IndexShard shard) {
+ shard.deactivateThrottling();
+ }
+
+ @Override
+ public void postIndex(Engine.Index index) {
@s1monw
s1monw Jan 11, 2016 Contributor

nice! :)

@s1monw s1monw and 1 other commented on an outdated diff Jan 11, 2016
...g/elasticsearch/indices/IndexingMemoryController.java
+ public int compareTo(ShardAndBytesUsed other) {
+ // Sort larger shards first:
+ return Long.compare(other.bytesUsed, bytesUsed);
+ }
+ }
+
+ /** not static because we need access to many fields/methods from our containing class (IMC): */
+ final class ShardsIndicesStatusChecker implements Runnable {
+
+ final AtomicLong bytesWrittenSinceCheck = new AtomicLong();
+ final ReentrantLock runLock = new ReentrantLock();
+
+ /** Shard calls this on each indexing/delete op */
+ public void bytesWritten(int bytes) {
+ long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
+ if (totalBytes > indexingBuffer.bytes()/30) {
@s1monw
s1monw Jan 11, 2016 Contributor

I think we have to make this a while (totalBytes > indexingBuffer.bytes()/30) and then if we can't acquire the lock we step out. otherwise we might miss a required run?

@mikemccand
mikemccand Jan 11, 2016 Contributor

Nice (tricky concurrency) catch!

@s1monw
s1monw Jan 11, 2016 Contributor

those are hard to debug I can tell u 👯

@s1monw s1monw commented on an outdated diff Jan 11, 2016
...g/elasticsearch/indices/IndexingMemoryController.java
+
+ final AtomicLong bytesWrittenSinceCheck = new AtomicLong();
+ final ReentrantLock runLock = new ReentrantLock();
+
+ /** Shard calls this on each indexing/delete op */
+ public void bytesWritten(int bytes) {
+ long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
+ if (totalBytes > indexingBuffer.bytes()/30) {
+ if (runLock.tryLock()) {
+ try {
+ bytesWrittenSinceCheck.addAndGet(-totalBytes);
+ // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
+ // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against
+ // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
+ // processed by indexing:
+ run();
@s1monw
s1monw Jan 11, 2016 Contributor

can we call runUnlocked instead?

@s1monw s1monw commented on the diff Jan 11, 2016
...in/java/org/elasticsearch/indices/IndicesService.java
@@ -293,14 +293,13 @@ public synchronized IndexService createIndex(final NodeServicesProvider nodeServ
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
- indexModule.addIndexEventListener(indexingMemoryController);
@s1monw
s1monw Jan 11, 2016 Contributor

nice :)

@s1monw s1monw commented on an outdated diff Jan 11, 2016
...st/java/org/elasticsearch/index/IndexModuleTests.java
@@ -248,7 +249,7 @@ public void testSetupUnknownSimilarity() throws IOException {
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(new Index("foo"), indexSettings), null, new AnalysisRegistry(null, environment));
try {
- module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
+ module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
@s1monw
s1monw Jan 11, 2016 Contributor

these can go away if we cut over to a IndexingRequestListener...

@s1monw s1monw commented on the diff Jan 11, 2016
docs/reference/modules/indices/indexing_buffer.asciidoc
@@ -23,10 +23,3 @@ in the cluster:
If the `index_buffer_size` is specified as a percentage, then this
setting can be used to specify an absolute maximum. Defaults to unbounded.
-
-`indices.memory.min_shard_index_buffer_size`::
@s1monw
s1monw Jan 11, 2016 Contributor

can we add this to the migrate doc that it has been removed without a replacement

@s1monw
Contributor
s1monw commented Jan 11, 2016

@mikemccand awesome! I left a bunch of comments but this looks fantastic

@s1monw
Contributor
s1monw commented Jan 12, 2016

LGTM

@mikemccand mikemccand merged commit b4a095d into elastic:master Jan 12, 2016

1 check passed

CLA Commit author is a member of Elasticsearch
Details
@mikemccand mikemccand removed the v2.3.0 label Jan 12, 2016
@mikemccand
Contributor

I removed 2.3.0 from this ... it's a big change, and its precursors haven't been ported to 2.3.0, so I think it should be in our next major release only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment