Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force refresh when versionMap is using too much RAM #6443

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.engine.internal;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.index.translog.Translog;

/** Holds a deleted version, which just adds a timestmap to {@link VersionValue} so we know when we can expire the deletion. */
Expand All @@ -40,4 +41,9 @@ public long time() {
public boolean delete() {
return true;
}

// @Override
public long ramBytesUsed() {
return super.ramBytesUsed() + RamUsageEstimator.NUM_BYTES_LONG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -40,7 +39,6 @@
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -53,7 +51,6 @@
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.SegmentReaderUtils;
Expand All @@ -64,7 +61,6 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
Expand Down Expand Up @@ -101,6 +97,10 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private volatile boolean compoundOnFlush = true;

private long gcDeletesInMillis;

/** When we last pruned expired tombstones from versionMap.deletes: */
private volatile long lastDeleteVersionPruneTimeMSec;

private volatile boolean enableGcDeletes = true;
private volatile String codecName;
private final boolean optimizeAutoGenerateId;
Expand Down Expand Up @@ -184,6 +184,7 @@ public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, Th
this.codecName = indexSettings.get(INDEX_CODEC, "default");

this.threadPool = threadPool;
this.lastDeleteVersionPruneTimeMSec = threadPool.estimatedTimeInMillis();
this.indexSettingsService = indexSettingsService;
this.indexingService = indexingService;
this.warmer = (InternalIndicesWarmer) warmer;
Expand Down Expand Up @@ -392,6 +393,7 @@ public void create(Create create) throws EngineException {
maybeFailEngine(t);
throw new CreateFailedEngineException(shardId, create, t);
}
checkVersionMapRefresh();
}

private void maybeFailEngine(Throwable t) {
Expand Down Expand Up @@ -482,6 +484,17 @@ public void index(Index index) throws EngineException {
maybeFailEngine(t);
throw new IndexFailedEngineException(shardId, index, t);
}
checkVersionMapRefresh();
}

/** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). */
private void checkVersionMapRefresh() {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsed()/1024/1024. > 0.25*this.indexWriter.getConfig().getRAMBufferSizeMB()) {
// Now refresh to clear versionMap adds:
// TODO: should we instead ask refresh threadPool to do this?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is an important TODO, would love to have it happens in this change, since otherwise, we lucene flush on an indexing thread. Should be relatively simple to fork to a REFRESH thread pool using the threadPool variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I changed this to ask the threadPool to do it, but please confirm I did that correctly!

I also added an AtomicBoolean so that only one refresh is triggered when the versionMap is using too much RAM (else each many refreshes could easily happen).

refresh(new Refresh("version_table_full"));
}
}

private void innerIndex(Index index, IndexWriter writer) throws IOException {
Expand Down Expand Up @@ -509,7 +522,6 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException {
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);


index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
Expand All @@ -531,6 +543,7 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException {
}
Translog.Location translogLocation = translog.add(new Translog.Index(index));

// TODO: expose versionMap's RAM usage in ShardStats?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can remove this now that we have the ticket? #6483

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'll remove.

versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));

indexingService.postIndexUnderLock(index);
Expand All @@ -552,6 +565,12 @@ public void delete(Delete delete) throws EngineException {
maybeFailEngine(t);
throw new DeleteFailedEngineException(shardId, delete, t);
}

// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (enableGcDeletes && threadPool.estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > gcDeletesInMillis*0.25) {
pruneDeletedTombstones();
}
}

private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
Expand All @@ -578,7 +597,6 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
}
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);

final boolean found;
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exist and no prior deletes
Expand All @@ -594,7 +612,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {

delete.updateVersion(updatedVersion, found);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation));
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation));

indexingService.postDeleteUnderLock(delete);
}
Expand Down Expand Up @@ -628,8 +646,10 @@ public void delete(DeleteByQuery delete) throws EngineException {
maybeFailEngine(t);
throw new DeleteByQueryFailedEngineException(shardId, delete, t);
}
//TODO: This is heavy, since we refresh, but we really have to...
pruneDeletedVersions(System.currentTimeMillis());

// TODO: This is heavy, since we refresh, but we must do this because we don't know which documents were in fact deleted (i.e., our
// versionMap isn't updated), so we must force a cutover to a new reader to "see" the deletions:
refresh(new Refresh("delete_by_query").force(true));
}

@Override
Expand Down Expand Up @@ -721,6 +741,12 @@ public void refresh(Refresh refresh) throws EngineException {
failEngine("refresh failed", t);
throw new RefreshFailedEngineException(shardId, t);
}

// We don't need to prune here, but it's an "opportune" time to, in case for example the wall clock time has shifted and we are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do need to do this in order to make sure a delete "burst" is cleaned up. o.w. it may stay for here for ever (or until another delete comes in). It also keeps the system more or less inline with the Gc delete interval as refresh happens every 1s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll fix the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, we didn't use to do it and left the delete/version pruning to flushes. Maybe we want to put this under the same gc delete sanity check that we have in the delete operation and only call this if 25% of the gc delete interval has passed?

// never pruning in delete:
if (enableGcDeletes) {
pruneDeletedTombstones();
}
}

@Override
Expand Down Expand Up @@ -775,7 +801,7 @@ public void flush(Flush flush) throws EngineException {
} catch (Throwable t) {
logger.warn("Failed to close current SearcherManager", t);
}
pruneDeletedVersions(threadPool.estimatedTimeInMillis());

} catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t);
}
Expand All @@ -794,7 +820,8 @@ public void flush(Flush flush) throws EngineException {
translog.newTransientTranslog(translogId);
indexWriter.setCommitData(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
indexWriter.commit();
pruneDeletedVersions(threadPool.estimatedTimeInMillis());
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table").force(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe change version_table to something mentioning flush?

// we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get
// when tans overrides it
Expand Down Expand Up @@ -864,32 +891,25 @@ private IndexWriter currentIndexWriter() {
return writer;
}

private void pruneDeletedVersions(long time) {
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table").force(true));
private void pruneDeletedTombstones() {
long timeMSec = threadPool.estimatedTimeInMillis();

// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

// we only need to prune deletes; the adds/updates are cleared whenever reader is refreshed:
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllDeletes()) {
// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey();
synchronized (dirtyLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?

// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getDeleteUnderLock(uid);
if (versionValue == null) {
// another thread has re-added this uid since we started refreshing:
continue;
}
if (time - versionValue.time() <= 0) {
continue; // its a newer value, from after/during we refreshed, don't clear it
}
assert versionValue.delete();
if (enableGcDeletes && (time - versionValue.time()) > gcDeletesInMillis) {
versionMap.removeDeleteUnderLock(uid);
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (timeMSec - versionValue.time() > gcDeletesInMillis) {
versionMap.removeTombstoneUnderLock(uid);
}
}
}

lastDeleteVersionPruneTimeMSec = timeMSec;
}

@Override
Expand Down