From d8f5a3d647ac94c7c15280145a64bd54701cc5b4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 13 Oct 2019 12:48:05 -0400 Subject: [PATCH] Avoid unneeded refresh with concurrent realtime gets (#47895) This change should reduce refreshes for a use-case where we perform multiple realtime gets at the same time on an active index. Currently, we only call refresh if the index operation is still on the versionMap. However, at the time we call refresh, that operation might be already or will be included in the latest reader. Hence, we do not need to refresh. Adding another lock here is not an issue as the refresh is already sequential. --- .../index/engine/InternalEngine.java | 3 +- .../index/engine/InternalEngineTests.java | 55 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3766317b05756..fd23e89b8b0a8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -671,7 +671,8 @@ public GetResult get(Get get, BiFunction trackTranslogLocation.set(true); } } - refresh("realtime_get", SearcherScope.INTERNAL, true); + assert versionValue.seqNo >= 0 : versionValue; + refreshIfNeeded("realtime_get", versionValue.seqNo); } scope = SearcherScope.INTERNAL; } else { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b09f10f5f9bf0..4b4dba0188e4f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -165,6 +165,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -6102,4 +6103,58 @@ private void runTestDeleteFailure( } } + public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception { + final AtomicInteger refreshCount = new AtomicInteger(); + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() { + + } + + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + refreshCount.incrementAndGet(); + } + } + }; + try (Store store = createStore()) { + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, + refreshListener, null, null, engine.config().getCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + int numDocs = randomIntBetween(10, 100); + Set ids = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + engine.index(indexForDoc(createParsedDoc(id, null))); + ids.add(id); + } + final int refreshCountBeforeGet = refreshCount.get(); + Thread[] getters = new Thread[randomIntBetween(1, 4)]; + Phaser phaser = new Phaser(getters.length + 1); + for (int t = 0; t < getters.length; t++) { + getters[t] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + int iters = randomIntBetween(1, 10); + for (int i = 0; i < iters; i++) { + ParsedDocument doc = createParsedDoc(randomFrom(ids), null); + try (Engine.GetResult getResult = engine.get(newGet(true, doc), engine::acquireSearcher)) { + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + } + } + }); + getters[t].start(); + } + phaser.arriveAndAwaitAdvance(); + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc("more-" + i, null))); + } + for (Thread getter : getters) { + getter.join(); + } + assertThat(refreshCount.get(), lessThanOrEqualTo(refreshCountBeforeGet + 1)); + } + } + } }