Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When shard becomes active again, immediately increase its indexing buffer #13918

Merged
merged 9 commits into from Oct 8, 2015

Conversation

Projects
None yet
3 participants
@mikemccand
Copy link
Contributor

mikemccand commented Oct 3, 2015

Spinoff from #13802.

Today, when an index goes from inactive to active, because indexing ops suddenly arrive to an inactive index, we (IndexingMemoryController) take up to 30 seconds to notice this and then increase the indexing buffer from the tiny idle 512KB to "its fair share". This is somewhat dangerous because it could write many, many segments in those 30 seconds...

This PR changes that so the inactive -> active transition instead causes us to immediately re-visit the indexing buffer for all shards.

It also shifts responsibility of tracking active/inactive into IndexShard, and no longer uses the translog ID/ops count to check for changes (I think this is more error prone? E.g. #13802), simplifying it instead to timestamp (System.nanoTime) of the last indexing op.

mikemccand added some commits Oct 3, 2015

@@ -975,6 +999,7 @@ public void addFailedEngineListener(Engine.FailedEngineListener failedEngineList
this.failedEngineListener.delegates.add(failedEngineListener);
}

/** Returns true if the indexing buffer size did change */
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {

This comment has been minimized.

Copy link
@nik9000

nik9000 Oct 5, 2015

Contributor

I don't think this doc is correct - the return type is void

}
return currentTimeInNanos() - shard.getLastWriteNS() >= inactiveTimeNS;
}


/** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */

This comment has been minimized.

Copy link
@nik9000

nik9000 Oct 5, 2015

Contributor

Out of date comment?

@@ -255,49 +251,43 @@ protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBu
}
}

protected boolean isShardInactive(ShardId shardId, long inactiveTimeNS) {

This comment has been minimized.

Copy link
@nik9000

nik9000 Oct 5, 2015

Contributor

Would isShardActive be simpler to understand just because it tests a positive?

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

Good point about using negative name ... I think I'll rename to isShardIdle?

mikemccand added some commits Oct 6, 2015

Merge branch 'master' into immediate_shard_active
Conflicts:
	core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
	core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
	core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java
@mikemccand

This comment has been minimized.

Copy link
Contributor Author

mikemccand commented Oct 6, 2015

Thanks @nik9000 I folded in the feedback.

@Override
public long indexWriterRAMBytesUsed() {
// No IndexWriter
return 0L;

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

I wonder if UnsupportedOperationException is a better choice here- we don't have an indexwriter...

@@ -190,6 +192,10 @@

private final IndexSearcherWrapper searcherWrapper;

private final AtomicBoolean active = new AtomicBoolean();

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

can we have some java docs here?


private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
lastWriteNS = op.startTime();
if (active.getAndSet(true) == false) {

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

can we do this once we check the write is indeed allowed? Also I think it will be clearer if we have this in a dedicated method (markLastWrite or something like that)

this.searcherWrapper = provider.getIndexSearcherWrapper();
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, mapperService, indexFieldDataService);
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
percolatorQueriesRegistry.enableRealTimePercolator();
}

// TODO: can we somehow call IMC.forceCheck here? Since we just became active, it can divvy up the RAM

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

I assume you tried? what broke?

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

I assume you tried? what broke?

I didn't try! It just looked not simple because here we are in this IndexShard ctor and if we invoke IMC at this point, probably the caller/Guice (creating this shard, IndicesService maybe?) won't even have this shard listed in its state?

Maybe up above somewhere I could call it ... I'll try.

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

OK I added a line at the end of IndexService.createShard to invoke forceCheck ... but are we worried about the O(N^2) cost here? N should be smallish (number of nodes on the shard)?

// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, shardIndexingBufferSize);
// TODO: should IW have an API to move segments to disk, but not refresh?

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

I'm curious about this one - I thought the segments are always (once made) on disk - but not fsynced?

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

I thought the segments are always (once made) on disk - but not fsynced?

IW writes new segments to disk and only fsyncs once IW.commit (ES flush) is called.

But from ES the only way we have to get IW to move in-memory segments to disk is via refresh, but that's overkill (it writes the segment and also opens new SegmentReader) and if the ES user e.g. has high or disabled refresh_interval this is wasted work.

IW doesn't give a public method to do this today ... I suppose we could addIndexes with nothing :)

@@ -993,28 +1019,40 @@ public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValu
if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
// so we push changes these changes down to IndexWriter:
engine.onSettingsChanged();
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize);

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

can we make this log include the decision about whether to refresh (and also the current IW memory consumption, while at it)

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

can we make this log include the decision about whether to refresh (and also the current IW memory consumption, while at it)

You mean have a single debug log line instead of two? OK ...

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

meh - missed the addition of the other one (just saw the removal of the log in the if), but yeah - one is nicer..

updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, TranslogConfig.INACTIVE_SHARD_TRANSLOG_BUFFER);
indicesLifecycle.onShardInactive(this);
if (active.getAndSet(false)) {
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, TranslogConfig.INACTIVE_SHARD_TRANSLOG_BUFFER);

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

I know it's not part of this change, but it bugs me (for a while now) that the indexing memory controller owns the buffer size for active indices but inactivity is controlled by the engine/shard. I wonder if we should move these settings to the memory controller.

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

Good idea, I'll move both translog and indexing inactive buffer constants to IMC.

}

/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

I wonder if it now makes more sense to have this setting on a shard level (since shard owns the inactivity). We can fall back to the old, node level settings (or just remove it if this is 3.0 change only and deprecate on 2.x)

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

I wonder if it now makes more sense to have this setting on a shard level (since shard owns the inactivity).

So users could change the inactive time per shard instead of globally for all shards on the node?

I don't think that's a good idea:) I think there are too many knobs already! Taking one knob and turning it into 20 seems not great...

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

fare enough. It would be a settings per index, but I get your point.

* {@code Boolean.FALSE} if they have, and {@code null} if the shard is unknown */
protected boolean isShardIdle(ShardId shardId, long inactiveTimeNS) {
final IndexShard shard = getShard(shardId);
if (shard == null) {

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

this isn't what the comment says? (return null)


@Override
public void run() {
public synchronized void run() {

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

good catch!

if (shard == null) {
return false;
}
return currentTimeInNanos() - shard.getLastWriteNS() >= inactiveTimeNS;

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

Shall we push this into IndexShard as well? have a method like

IndexShard.checkIfIdle() {
   if (System.currentTime - lastWriteNs() > inactiveTimeNs) {
         active.set(false);
  }
  return active.get() == false;
}

so all inactive/active logic is in IndexShard and you just ask for a check from the outside.

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

This is a good idea! I think I'll take it further and fold it into a single method that checks for idle, and becomes idle (dropping the buffers) if so.

@@ -364,12 +360,15 @@ private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<Sha
return changes;
}

private void calcAndSetShardBuffers(int activeShards, String reason) {
if (activeShards == 0) {
private void calcAndSetShardBuffers(int activeShardCount, String reason) {

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

I think things will be simpler if this method is driven by the current shardWasActive map. This means we don't need to count activeShardCount and also not worry about whether availableShards() changed it's mind (see loop at the end of this method)

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

Hmm, confused: seems like we still need activeShardCount up front since we then divide total allowed heap by that?

I agree the loop at the end of the method should just use shardWasActive ... I'll fix!

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

Hmm, confused: seems like we still need activeShardCount up front since we then divide total allowed heap by that?

what I mean that we should just loop on shardWasActive twice here, once to calculate activeShardCount and one to apply it. I think it will be clearer than passing it as an argument and trying to make sure it's correct in another functionn

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

Ahh gotchya, will do!

long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();

public MockController(Settings settings) {
super(Settings.builder()
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
.put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate
.put(SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

wondering - why did you change this?

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 6, 2015

Author Contributor

wondering - why did you change this?

Oh it's because I changed the inactive time setting to be >= not >, i.e. an inactive time of 60s means "if you've been idle for 60s or more, mark yourself inactive".

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 7, 2015

Member

OK. I'll brace for the beast which will break this because it runs faster than 1ms :)

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 7, 2015

Author Contributor

Wait, this test does not actually rely on wall clock time :) We advance the clock on our own, deterministically ... the fastest beast in the world should not break it!

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 7, 2015

Member

Oei, I forgot about the virtual time I added - smart move :)

controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K

// add another shard
final ShardId shard2 = new ShardId("test", 2);
controller.setTranslog(shard2, randomInt(10), randomInt(10));
controller.simulateIndexing(shard2);

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 6, 2015

Member

I wonder if simulate indexing should also call forceCheck (like real indexing would do)..

@bleskes

This comment has been minimized.

Copy link
Member

bleskes commented Oct 6, 2015

I love how this turned out, especially around moving away from the translog checks. I left some suggestions here and there. I also think we should move the controller to work on IndexShard objects rather than shard ids (sorry for the scope creep - can be done in a different change). At some point I got worried that a new shard will not be seen as "added" if the original shard was destroyed and a new one created. I think it's OK now (because we only use current state when allocating memory) but it's more correct to use physical instances.

@mikemccand

This comment has been minimized.

Copy link
Contributor Author

mikemccand commented Oct 6, 2015

I also think we should move the controller to work on IndexShard objects rather than shard ids

++

But I thought we used ShardId all over to make this more easily unit-tested? But anyway let's do this on a separate change ... this one is big already :)

@mikemccand

This comment has been minimized.

Copy link
Contributor Author

mikemccand commented Oct 6, 2015

OK I folded feedback @bleskes, thank you!

@@ -288,6 +288,7 @@ public synchronized IndexShard createShard(int sShardId, ShardRouting routing) {
indicesLifecycle.afterIndexShardCreated(indexShard);
settingsService.addListener(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
indexServicesProvider.getIndexingMemoryController().forceCheck();

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 7, 2015

Member

I was thinking about it and I think we should start the shard as inactive (it is :)) and only mark it as active + the claiming of memory in IndexShard#skipTranslogRecovery / IndexShard#performTranslogRecovery when we actually open the engine for indexing. We also make sure call IndexingMemoryController.forceCheck before we create the engine, so we always have an up to date buffer size ...

Re concerns about n^2 problem, it is there but since we limit the amount of concurrent recoveries a node can do, it means it will be done during a longer process. Also because of the limit it is hard to do anything around batching. All in all, I think we're OK.

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 7, 2015

Author Contributor

Actually, why not simply start up as inactive, init the buffers to the INACTIVE values, and then the first indexing op that arrives will make it active like normal?

OK that's a good point, that concurrent recoveries are limited anyway, so the O(N^2) cost is paid over a long time.

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 7, 2015

Member

Actually, why not simply start up as inactive, init the buffers to the INACTIVE values, and then the first indexing op that arrives will make it active like normal?

That was my original thought as well, but then I remembered you were very worried about indexing into a small buffer. When one creates an index by indexing into it that index will likely get many indexing requests at once, the first will trigger a forced check and the others will fly through into the engine. Figured we better not have to worry about it but If you're good with that, I'm good :)

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 7, 2015

Author Contributor

If you're good with that, I'm good :)

I'm gonna try :)

@@ -108,7 +108,6 @@
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAULT_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 7, 2015

Member

awesome

return true;
}

return false;

This comment has been minimized.

Copy link
@bleskes

bleskes Oct 7, 2015

Member

I think we should return active.get() == false and also - set it if we became idle...

@bleskes

This comment has been minimized.

Copy link
Member

bleskes commented Oct 7, 2015

This is getting great. I left some final comments.

if (iwBytesUsed > shardIndexingBufferSize.bytes()) {
// our allowed buffer was changed to less than we are currently using; we ask IW to refresh
// so it clears its buffers (otherwise it won't clear until the next indexing/delete op)
logger.debug(message + "; now refresh to clear IndexWriter memory");

This comment has been minimized.

Copy link
@nik9000

nik9000 Oct 7, 2015

Contributor

I'm used to wrapping debug logging stuff in if (logger.isDebugEnabled()) tests to prevent the message construction in the (very common) case that debug is disabled. Here it probably doesn't matter because message construction is dwarfed by the refresh call.

* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard did in fact become inactive, else false. */
public boolean checkIdle(long inactiveTimeNS) {
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS && active.getAndSet(false)) {

This comment has been minimized.

Copy link
@nik9000

nik9000 Oct 7, 2015

Contributor

Can you move the getAndSet to its own if statement? It has a side effect and its mixed with stuff that doesn't and when I first read the code I didn't notice it.


// TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
// is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
// get the same indexing buffer as large indices. But it quickly gets tricky...

This comment has been minimized.

Copy link
@nik9000

nik9000 Oct 7, 2015

Contributor

Indeed! This sounds like a fun thing to think about!

This comment has been minimized.

Copy link
@mikemccand

mikemccand Oct 7, 2015

Author Contributor

Indeed! This sounds like a fun thing to think about!

I'll open a followon issue for this ... I think often there are very lopsided shards in a single node, and evenly dividing the indexing buffer seems crazy.

It would be better if the whole system acted like IW acts with N threads using up heap: as soon as the sum of all heap used by these threads exceeds the global budget, we ask the largest one to write to disk.

@mikemccand

This comment has been minimized.

Copy link
Contributor Author

mikemccand commented Oct 7, 2015

OK thanks @nik9000 and @bleskes, I folded in the last round of feedback.

@bleskes

This comment has been minimized.

Copy link
Member

bleskes commented Oct 7, 2015

LGTM. Awesome mike.

Merge branch 'master' into immediate_shard_active
Conflicts:
	core/src/main/java/org/elasticsearch/index/engine/Engine.java
	core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

mikemccand pushed a commit that referenced this pull request Oct 8, 2015

Michael McCandless
Merge pull request #13918 from mikemccand/immediate_shard_active
When shard becomes active again, immediately increase its indexing buffer instead of waiting for up to 30 seconds while indexing with a tiny (500 KB) indexing buffer.

@mikemccand mikemccand merged commit 9688e86 into elastic:master Oct 8, 2015

1 check passed

CLA Commit author is a member of Elasticsearch
Details

mikemccand added a commit that referenced this pull request Oct 8, 2015

Merge pull request #13918 from mikemccand/immediate_shard_active
When shard becomes active again, immediately increase its indexing buffer instead of waiting for up to 30 seconds while indexing with a tiny (500 KB) indexing buffer.
Conflicts:
	core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java
	core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
	core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
	core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java
	core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
	core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java

mikemccand added a commit that referenced this pull request Oct 8, 2015

Merge pull request #13918 from mikemccand/immediate_shard_active
When shard becomes active again, immediately increase its indexing buffer instead of waiting for up to 30 seconds while indexing with a tiny (500 KB) indexing buffer.
Conflicts:
	core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java
	core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
	core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
	core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java
	core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
	core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.