Skip to content

Commit

Permalink
[Infra] remove indicesLifecycle.Listener from IndexingMemoryController
Browse files Browse the repository at this point in the history
The IndexingMemoryController determines the amount of indexing buffer size and translog buffer size each shard should have. It takes memory from inactive shards (indexing wise) and assigns it to other shards. To do so it needs to know about the addition and closing of shards. The current implementation hooks into the indicesService.indicesLifecycle() mechanism to receive call backs, such shard entered the POST_RECOVERY state. Those call backs are typically run on the thread that actually made the change. A mutex was used to synchronize those callbacks with IndexingMemoryController's background thread, which updates the internal engines memory usage on a regular interval. This introduced a dependency between those threads and the locks of the internal engines hosted on the node. In a *very* rare situation (two tests runs locally) this can cause recovery time outs where two nodes are recovering replicas from each other.

 This commit introduces a a lock free approach that updates the internal data structures during iterations in the background thread.

Closes elastic#6892
  • Loading branch information
bleskes committed Jul 17, 2014
1 parent 92b5ada commit bba9726
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 212 deletions.
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.indices.memory;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -37,16 +36,12 @@
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
Expand All @@ -66,16 +61,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin

private final TimeValue inactiveTime;
private final TimeValue interval;
private final AtomicBoolean shardsRecoveredOrDeleted = new AtomicBoolean();

private final Listener listener = new Listener();

private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = Maps.newHashMap();

private volatile ScheduledFuture scheduler;

private final Object mutex = new Object();

private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);

@Inject
Expand Down Expand Up @@ -137,14 +125,12 @@ public IndexingMemoryController(Settings settings, ThreadPool threadPool, Indice

@Override
protected void doStart() throws ElasticsearchException {
indicesService.indicesLifecycle().addListener(listener);
// its fine to run it on the scheduler thread, no busy work
this.scheduler = threadPool.scheduleWithFixedDelay(new ShardsIndicesStatusChecker(), interval);
}

@Override
protected void doStop() throws ElasticsearchException {
indicesService.indicesLifecycle().removeListener(listener);
if (scheduler != null) {
scheduler.cancel(false);
scheduler = null;
Expand All @@ -164,153 +150,185 @@ public ByteSizeValue indexingBufferSize() {
}

class ShardsIndicesStatusChecker implements Runnable {

private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = new HashMap<>();


@Override
public void run() {
synchronized (mutex) {
boolean activeInactiveStatusChanges = false;
List<IndexShard> activeToInactiveIndexingShards = Lists.newArrayList();
List<IndexShard> inactiveToActiveIndexingShards = Lists.newArrayList();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
long time = threadPool.estimatedTimeInMillis();
Translog translog = ((InternalIndexShard) indexShard).translog();
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null) { // not added yet
continue;
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);

changes.addAll(purgeDeletedAndClosedShards());

final List<IndexShard> activeToInactiveIndexingShards = Lists.newArrayList();
final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards);
for (IndexShard indexShard : activeToInactiveIndexingShards) {
// update inactive indexing buffer size
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
// ignore
}
}
if (!changes.isEmpty()) {
calcAndSetShardBuffers(activeShards, "[" + changes + "]");
}
}

/**
* goes through all existing shards and check whether the changes their active status
*
* @return the current count of active shards
*/
private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<IndexShard> activeToInactiveIndexingShards) {
int activeShards = 0;
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {

if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) {
// not ready to be updated yet.
continue;
}

final long time = threadPool.estimatedTimeInMillis();

Translog translog = ((InternalIndexShard) indexShard).translog();
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null) {
status = new ShardIndexingStatus();
shardsIndicesStatus.put(indexShard.shardId(), status);
changes.add(ShardStatusChangeType.ADDED);
}
// check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time)
if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) {
if (status.time == -1) { // first time
status.time = time;
}
// check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time)
if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) {
if (status.time == -1) { // first time
status.time = time;
}
// inactive?
if (!status.inactiveIndexing) {
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) {
// inactive for this amount of time, mark it
activeToInactiveIndexingShards.add(indexShard);
status.inactiveIndexing = true;
activeInactiveStatusChanges = true;
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
}
// inactive?
if (status.activeIndexing) {
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) {
// inactive for this amount of time, mark it
activeToInactiveIndexingShards.add(indexShard);
status.activeIndexing = false;
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
}
} else {
if (status.inactiveIndexing) {
inactiveToActiveIndexingShards.add(indexShard);
status.inactiveIndexing = false;
activeInactiveStatusChanges = true;
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
}
status.time = -1;
}
status.translogId = translog.currentId();
status.translogNumberOfOperations = translog.estimatedNumberOfOperations();
} else {
if (!status.activeIndexing) {
status.activeIndexing = true;
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
}
status.time = -1;
}
}
for (IndexShard indexShard : activeToInactiveIndexingShards) {
// update inactive indexing buffer size
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
// ignore
status.translogId = translog.currentId();
status.translogNumberOfOperations = translog.estimatedNumberOfOperations();

if (status.activeIndexing) {
activeShards++;
}
}
boolean shardsRecoveredOrDeleted = IndexingMemoryController.this.shardsRecoveredOrDeleted.compareAndSet(true, false);
if (shardsRecoveredOrDeleted || activeInactiveStatusChanges) {
calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] recovered/deleted[" + shardsRecoveredOrDeleted + "]");
}
}
return activeShards;
}
}

class Listener extends IndicesLifecycle.Listener {
/**
* purge any existing statuses that are no longer updated
*
* @return true if any change
*/
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);

Iterator<ShardId> statusShardIdIterator = shardsIndicesStatus.keySet().iterator();
while (statusShardIdIterator.hasNext()) {
ShardId statusShardId = statusShardIdIterator.next();
IndexService indexService = indicesService.indexService(statusShardId.getIndex());
boolean remove = false;
try {
if (indexService == null) {
remove = true;
continue;
}
IndexShard indexShard = indexService.shard(statusShardId.id());
if (indexShard == null) {
remove = true;
continue;
}
remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state());

@Override
public void afterIndexShardPostRecovery(IndexShard indexShard) {
synchronized (mutex) {
shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus());
shardsRecoveredOrDeleted.set(true);
} finally {
if (remove) {
changes.add(ShardStatusChangeType.DELETED);
statusShardIdIterator.remove();
}
}
}
return changes;
}

@Override
public void afterIndexShardClosed(ShardId shardId) {
synchronized (mutex) {
shardsIndicesStatus.remove(shardId);
shardsRecoveredOrDeleted.set(true);
private void calcAndSetShardBuffers(int activeShards, String reason) {
if (activeShards == 0) {
return;
}
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShards);
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = minShardIndexBufferSize;
}
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = maxShardIndexBufferSize;
}
}
}


private void calcAndSetShardBuffers(String reason) {
int shardsCount = countActiveShards();
if (shardsCount == 0) {
return;
}
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = minShardIndexBufferSize;
}
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = maxShardIndexBufferSize;
}

ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / shardsCount);
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = minShardTranslogBufferSize;
}
if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = maxShardTranslogBufferSize;
}
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShards);
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = minShardTranslogBufferSize;
}
if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = maxShardTranslogBufferSize;
}

logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
IndexShardState state = indexShard.state();
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) {
logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state);
continue;
}
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null || !status.inactiveIndexing) {
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize);
} catch (EngineClosedException e) {
// ignore
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
IndexShardState state = indexShard.state();
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) {
logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state);
continue;
} catch (FlushNotAllowedEngineException e) {
// ignore
continue;
} catch (Exception e) {
logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize);
}
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null || status.activeIndexing) {
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize);
} catch (EngineClosedException e) {
// ignore
continue;
} catch (FlushNotAllowedEngineException e) {
// ignore
continue;
} catch (Exception e) {
logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize);
}
}
}
}
}
}

private int countActiveShards() {
int shardsCount = 0;
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null || !status.inactiveIndexing) {
shardsCount++;
}
}
}
return shardsCount;
private static enum ShardStatusChangeType {
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
}


static class ShardIndexingStatus {
long translogId = -1;
int translogNumberOfOperations = -1;
boolean inactiveIndexing = false;
boolean activeIndexing = true;
long time = -1; // contains the first time we saw this shard with no operations done on it
}
}

0 comments on commit bba9726

Please sign in to comment.