Skip to content

Commit

Permalink
Add refresh stats tracking for realtime get (#25052)
Browse files Browse the repository at this point in the history
Passes a `LongConsumer` into the `Engine` during GETs which the engine
calls if it refreshed to perform the get.

Closes #24806
  • Loading branch information
PnPie authored and nik9000 committed Jun 6, 2017
1 parent fb5a402 commit e9919ca
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 38 deletions.
7 changes: 2 additions & 5 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -87,6 +87,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.LongConsumer;

public abstract class Engine implements Closeable {

Expand Down Expand Up @@ -449,11 +450,7 @@ protected final GetResult getFromSearcher(Get get, Function<String, Searcher> se
}
}

public final GetResult get(Get get) throws EngineException {
return get(get, this::acquireSearcher);
}

public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException;
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException;

/**
* Returns a new searcher instance. The consumer of this
Expand Down
Expand Up @@ -86,6 +86,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongConsumer;

public class InternalEngine extends Engine {
/**
Expand Down Expand Up @@ -346,7 +347,7 @@ private SearcherManager createSearcherManager() throws EngineException {
}

@Override
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
public GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException {
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Expand All @@ -360,7 +361,9 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.getVersion(), get.version()));
}
long time = System.nanoTime();
refresh("realtime_get");
onRefresh.accept(System.nanoTime() - time);
}
}

Expand Down
Expand Up @@ -36,6 +36,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.LongConsumer;

/**
* ShadowEngine is a specialized engine that only allows read-only operations
Expand All @@ -52,8 +53,8 @@
* regular primary (which uses {@link org.elasticsearch.index.engine.InternalEngine})
*
* Notice that since this Engine does not deal with the translog, any
* {@link #get(Get get)} request goes directly to the searcher, meaning it is
* non-realtime.
* {@link #get(Get, Function, LongConsumer)} request goes directly to the searcher,
* meaning it is non-realtime.
*/
public class ShadowEngine extends Engine {

Expand Down Expand Up @@ -160,8 +161,9 @@ public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDel
}

@Override
public GetResult get(Get get, Function<String, Searcher> searcherFacotry) throws EngineException {
public GetResult get(Get get, Function<String, Searcher> searcherFacotry, LongConsumer onRefresh) throws EngineException {
// There is no translog, so we can get it directly from the searcher
// Since we never refresh we just drop the onRefresh parameter on the floor
return getFromSearcher(get, searcherFacotry);
}

Expand Down
Expand Up @@ -612,7 +612,7 @@ private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws I
}
public Engine.GetResult get(Engine.Get get) {
readAllowed();
return getEngine().get(get, this::acquireSearcher);
return getEngine().get(get, this::acquireSearcher, (timeElapsed) -> refreshMetric.inc(timeElapsed));
}

/**
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -147,6 +148,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -746,7 +748,10 @@ public void testConcurrentGetAndFlush() throws Exception {
engine.index(indexForDoc(doc));

final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
latestGetResult.set(engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc))));
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final AtomicBoolean refreshed = new AtomicBoolean(false);
latestGetResult.set(engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, refreshTook -> refreshed.set(true)));
assertTrue("failed to refresh", refreshed.get());
final AtomicBoolean flushFinished = new AtomicBoolean(false);
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread getThread = new Thread(() -> {
Expand All @@ -760,7 +765,7 @@ public void testConcurrentGetAndFlush() throws Exception {
if (previousGetResult != null) {
previousGetResult.release();
}
latestGetResult.set(engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc))));
latestGetResult.set(engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done")));
if (latestGetResult.get().exists() == false) {
break;
}
Expand All @@ -780,6 +785,9 @@ public void testSimpleOperations() throws Exception {
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();

final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final AtomicBoolean refreshed = new AtomicBoolean(false);

// create a document
Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
Expand All @@ -793,12 +801,13 @@ public void testSimpleOperations() throws Exception {
searchResult.close();

// but, not there non realtime
Engine.GetResult getResult = engine.get(new Engine.Get(false, doc.type(), doc.id(), newUid(doc)));
Engine.GetResult getResult = engine.get(new Engine.Get(false, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
assertThat(getResult.exists(), equalTo(false));
getResult.release();

// but, we can still get it (in realtime)
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)));
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> refreshed.set(true));
assertTrue("failed to refresh", refreshed.getAndSet(false));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
Expand All @@ -813,7 +822,7 @@ public void testSimpleOperations() throws Exception {
searchResult.close();

// also in non realtime
getResult = engine.get(new Engine.Get(false, doc.type(), doc.id(), newUid(doc)));
getResult = engine.get(new Engine.Get(false, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
Expand All @@ -833,7 +842,8 @@ public void testSimpleOperations() throws Exception {
searchResult.close();

// but, we can still get it (in realtime)
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)));
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> refreshed.set(true));
assertTrue("failed to refresh", refreshed.get());
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
Expand All @@ -858,7 +868,7 @@ public void testSimpleOperations() throws Exception {
searchResult.close();

// but, get should not see it (in realtime)
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)));
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
assertThat(getResult.exists(), equalTo(false));
getResult.release();

Expand Down Expand Up @@ -898,7 +908,7 @@ public void testSimpleOperations() throws Exception {
engine.flush();

// and, verify get (in real time)
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)));
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done"));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
Expand Down Expand Up @@ -1644,6 +1654,8 @@ class OpAndVersion {
ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1L, testDocument(), bytesArray(""), null);
final Term uidTerm = newUid(doc);
engine.index(indexForDoc(doc));
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final AtomicBoolean refreshed = new AtomicBoolean(false);
for (int i = 0; i < thread.length; i++) {
thread[i] = new Thread(() -> {
startGun.countDown();
Expand All @@ -1653,7 +1665,7 @@ class OpAndVersion {
throw new AssertionError(e);
}
for (int op = 0; op < opsPerThread; op++) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
Expand All @@ -1680,6 +1692,7 @@ class OpAndVersion {
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
assertTrue("failed to refresh", refreshed.getAndSet(false));
List<OpAndVersion> sortedHistory = new ArrayList<>(history);
sortedHistory.sort(Comparator.comparing(o -> o.version));
Set<String> currentValues = new HashSet<>();
Expand All @@ -1694,7 +1707,8 @@ class OpAndVersion {
assertTrue(op.added + " should not exist", exists);
}

try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
assertTrue("failed to refresh", refreshed.get());
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString()));
Expand Down Expand Up @@ -1830,6 +1844,8 @@ public void testEnableGcDeletes() throws Exception {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
engine.config().setEnableGcDeletes(false);

final Function<String, Searcher> searcherFactory = engine::acquireSearcher;

// Add document
Document document = testDocument();
document.add(new TextField("value", "test1", Field.Store.YES));
Expand All @@ -1841,7 +1857,7 @@ public void testEnableGcDeletes() throws Exception {
engine.delete(new Engine.Delete("test", "1", newUid(doc), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));

// Get should not find the document
Engine.GetResult getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)));
Engine.GetResult getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
assertThat(getResult.exists(), equalTo(false));

// Give the gc pruning logic a chance to kick in
Expand All @@ -1856,7 +1872,7 @@ public void testEnableGcDeletes() throws Exception {
engine.delete(new Engine.Delete("test", "2", newUid(doc2), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));

// Get should not find the document (we never indexed uid=2):
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc2)));
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc2)), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
assertThat(getResult.exists(), equalTo(false));

// Try to index uid=1 with a too-old version, should fail:
Expand All @@ -1865,7 +1881,7 @@ public void testEnableGcDeletes() throws Exception {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));

// Get should still not find the document
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)));
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
assertThat(getResult.exists(), equalTo(false));

// Try to index uid=2 with a too-old version, should fail:
Expand All @@ -1874,7 +1890,7 @@ public void testEnableGcDeletes() throws Exception {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));

// Get should not find the document
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)));
getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), newUid(doc)), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
assertThat(getResult.exists(), equalTo(false));
}
}
Expand Down

0 comments on commit e9919ca

Please sign in to comment.