Skip to content

Commit

Permalink
Engine.newChangesSnapshot may cause unneeded refreshes if called conc…
Browse files Browse the repository at this point in the history
…urrently (#35169)

When the engine is asked for historical operations, we check if some of the requested operations
are not yet refreshed and if so we refresh before returning the operations. The refresh check is
based on capturing the local checkpoint before each refresh and comparing that value to the one
requested when `newChangesSnapshot` was called. If the requested range is above the captured
local checkpoint we issue a refresh.

This can currently cause unneeded extra refreshes if the method is called concurrently which may cause unwanted degradation in indexing performance. This is especially relevant for CCR where we always ask for a range below the global checkpoint. That range is guaranteed to be below the local
checkpoint of the shard and one refresh is enough to serve multiple changes requests.

This commit fixes this by introducing a dedicated mutex to make sure the test for whether a refresh
is needed actually wait for concurrents for concurrent refreshes that were caused by another
change refresh. 

Note that this is not a big change in semantics as refreshes are serialized by lucene anyway. I also
opted not to keep the synchronization to the changes snapshot request only even if in theory we
can apply it to all refreshes, not matter where they come from.
  • Loading branch information
bleskes committed Nov 4, 2018
1 parent bedaee3 commit 85177c7
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2643,12 +2643,19 @@ final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}


private final Object refreshIfNeededMutex = new Object();

/**
* Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint.
*/
protected final void refreshIfNeeded(String source, long requestingSeqNo) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL);
synchronized (refreshIfNeededMutex) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -5079,6 +5078,60 @@ public void testLastRefreshCheckpoint() throws Exception {
assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getLocalCheckpoint()));
}

public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception {
final MapperService mapperService = createMapperService("test");
final long maxSeqNo = randomLongBetween(10, 50);
final AtomicLong refreshCounter = new AtomicLong();
try (Store store = createStore();
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(),
null,
new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() throws IOException {
refreshCounter.incrementAndGet();
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {

}
}, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) {
for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) {
final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean()));
}

final long initialRefreshCount = refreshCounter.get();
final Thread[] snapshotThreads = new Thread[between(1, 3)];
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < snapshotThreads.length; i++) {
final long min = randomLongBetween(0, maxSeqNo - 5);
final long max = randomLongBetween(min, maxSeqNo);
snapshotThreads[i] = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}

@Override
protected void doRun() throws Exception {
latch.await();
Translog.Snapshot changes = engine.newChangesSnapshot("test", mapperService, min, max, true);
changes.close();
}
});
snapshotThreads[i].start();
}
latch.countDown();
for (Thread thread : snapshotThreads) {
thread.join();
}
assertThat(refreshCounter.get(), equalTo(initialRefreshCount + 1L));
assertThat(engine.lastRefreshedCheckpoint(), equalTo(maxSeqNo));
}
}

public void testAcquireSearcherOnClosingEngine() throws Exception {
engine.close();
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,20 +573,29 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) {
IndexWriterConfig iwc = newIndexWriterConfig();
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier);
}

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener externalRefreshListener,
ReferenceManager.RefreshListener internalRefreshListener,
Sort indexSort, LongSupplier globalCheckpointSupplier) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
Engine.EventListener listener = new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, @Nullable Exception e) {
// we don't need to notify anybody in this test
}
};
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
final List<ReferenceManager.RefreshListener> extRefreshListenerList =
externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener);
final List<ReferenceManager.RefreshListener> intRefreshListenerList =
internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener);
EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort,
TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort,
new NoneCircuitBreakerService(),
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
Expand Down

0 comments on commit 85177c7

Please sign in to comment.