Skip to content

Commit

Permalink
Core: trigger refresh when the live version map is using too much RAM
Browse files Browse the repository at this point in the history
When refresh_interval is long or disabled, and indexing rate is high,
it's possible for live version map to use non-trivial amounts of RAM.
With this change we now trigger a refresh in such cases to clear the
version map so we don't use unbounded RAM.

Closes #6443
  • Loading branch information
mikemccand committed Jul 8, 2014
1 parent 94c19c4 commit 85fe9a1
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.engine.internal;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.index.translog.Translog;

/** Holds a deleted version, which just adds a timestmap to {@link VersionValue} so we know when we can expire the deletion. */
Expand All @@ -40,4 +41,9 @@ public long time() {
public boolean delete() {
return true;
}

@Override
public long ramBytesUsed() {
return super.ramBytesUsed() + RamUsageEstimator.NUM_BYTES_LONG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.engine.internal;

import com.google.common.collect.Lists;

import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -97,6 +98,10 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private volatile boolean compoundOnFlush = true;

private long gcDeletesInMillis;

/** When we last pruned expired tombstones from versionMap.deletes: */
private volatile long lastDeleteVersionPruneTimeMSec;

private volatile boolean enableGcDeletes = true;
private volatile String codecName;
private final boolean optimizeAutoGenerateId;
Expand Down Expand Up @@ -159,6 +164,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>();

private final AtomicLong translogIdGenerator = new AtomicLong();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();

private SegmentInfos lastCommittedSegmentInfos;

Expand All @@ -180,6 +186,7 @@ public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, Th
this.codecName = indexSettings.get(INDEX_CODEC, "default");

this.threadPool = threadPool;
this.lastDeleteVersionPruneTimeMSec = threadPool.estimatedTimeInMillis();
this.indexSettingsService = indexSettingsService;
this.indexingService = indexingService;
this.warmer = (InternalIndicesWarmer) warmer;
Expand Down Expand Up @@ -393,6 +400,7 @@ public void create(Create create) throws EngineException {
maybeFailEngine(t);
throw new CreateFailedEngineException(shardId, create, t);
}
checkVersionMapRefresh();
}

private void maybeFailEngine(Throwable t) {
Expand Down Expand Up @@ -487,6 +495,20 @@ public void index(Index index) throws EngineException {
maybeFailEngine(t);
throw new IndexFailedEngineException(shardId, index, t);
}
checkVersionMapRefresh();
}

/** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). */
private void checkVersionMapRefresh() {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25*this.indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) {
// Now refresh to clear versionMap:
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
public void run() {
refresh(new Refresh("version_table_full"));
}
});
}
}

private void innerIndex(Index index, IndexWriter writer) throws IOException {
Expand Down Expand Up @@ -514,7 +536,6 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException {
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);


index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
Expand Down Expand Up @@ -557,6 +578,16 @@ public void delete(Delete delete) throws EngineException {
maybeFailEngine(t);
throw new DeleteFailedEngineException(shardId, delete, t);
}

maybePruneDeletedTombstones();
}

private void maybePruneDeletedTombstones() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (enableGcDeletes && threadPool.estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > gcDeletesInMillis*0.25) {
pruneDeletedTombstones();
}
}

private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
Expand All @@ -583,7 +614,6 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
}
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);

final boolean found;
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exist and no prior deletes
Expand All @@ -599,7 +629,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {

delete.updateVersion(updatedVersion, found);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation));
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation));

indexingService.postDeleteUnderLock(delete);
}
Expand Down Expand Up @@ -633,8 +663,10 @@ public void delete(DeleteByQuery delete) throws EngineException {
maybeFailEngine(t);
throw new DeleteByQueryFailedEngineException(shardId, delete, t);
}
//TODO: This is heavy, since we refresh, but we really have to...
pruneDeletedVersions(System.currentTimeMillis());

// TODO: This is heavy, since we refresh, but we must do this because we don't know which documents were in fact deleted (i.e., our
// versionMap isn't updated), so we must force a cutover to a new reader to "see" the deletions:
refresh(new Refresh("delete_by_query").force(true));
}

@Override
Expand Down Expand Up @@ -726,6 +758,12 @@ public void refresh(Refresh refresh) throws EngineException {
failEngine("refresh failed", t);
throw new RefreshFailedEngineException(shardId, t);
}

// TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
maybePruneDeletedTombstones();
versionMapRefreshPending.set(false);
}

@Override
Expand Down Expand Up @@ -780,7 +818,9 @@ public void flush(Flush flush) throws EngineException {
} catch (Throwable t) {
logger.warn("Failed to close current SearcherManager", t);
}
pruneDeletedVersions(threadPool.estimatedTimeInMillis());

maybePruneDeletedTombstones();

} catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t);
}
Expand All @@ -799,17 +839,26 @@ public void flush(Flush flush) throws EngineException {
translog.newTransientTranslog(translogId);
indexWriter.setCommitData(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
indexWriter.commit();
pruneDeletedVersions(threadPool.estimatedTimeInMillis());
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table_flush").force(true));
// we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get
// when tans overrides it
translog.makeTransientCurrent();

} catch (Throwable e) {
translog.revertTransient();
throw new FlushFailedEngineException(shardId, e);
}
}
}

// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
pruneDeletedTombstones();
}

} else if (flush.type() == Flush.Type.COMMIT) {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
Expand All @@ -827,6 +876,13 @@ public void flush(Flush flush) throws EngineException {
throw new FlushFailedEngineException(shardId, e);
}
}

// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
pruneDeletedTombstones();
}

} else {
throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported");
}
Expand Down Expand Up @@ -869,32 +925,27 @@ private IndexWriter currentIndexWriter() {
return writer;
}

private void pruneDeletedVersions(long time) {
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table").force(true));
private void pruneDeletedTombstones() {
long timeMSec = threadPool.estimatedTimeInMillis();

// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

// we only need to prune deletes; the adds/updates are cleared whenever reader is refreshed:
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllDeletes()) {
// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey();
synchronized (dirtyLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?

// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getDeleteUnderLock(uid);
if (versionValue == null) {
// another thread has re-added this uid since we started refreshing:
continue;
}
if (time - versionValue.time() <= 0) {
continue; // its a newer value, from after/during we refreshed, don't clear it
}
assert versionValue.delete();
if (enableGcDeletes && (time - versionValue.time()) > gcDeletesInMillis) {
versionMap.removeDeleteUnderLock(uid);
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.time() > gcDeletesInMillis) {
versionMap.removeTombstoneUnderLock(uid);
}
}
}
}

lastDeleteVersionPruneTimeMSec = timeMSec;
}

@Override
Expand Down

0 comments on commit 85fe9a1

Please sign in to comment.