Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force Refresh Listeners when Acquiring all Operation Permits #36835

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,10 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer)
throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated";
Expand Down Expand Up @@ -639,6 +641,8 @@ public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer
// Fail primary relocation source and target shards.
failShard("timed out waiting for relocation hand-off to complete", null);
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
} finally {
forceRefreshes.close();
}
}

Expand Down Expand Up @@ -2339,7 +2343,24 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> {
forceRefreshes.close();
onPermitAcquired.onResponse(r);
}, e -> {
forceRefreshes.close();
onPermitAcquired.onFailure(e);
});
try {
indexShardOperationPermits.asyncBlockOperations(wrappedListener, timeout, timeUnit);
} catch (Exception e) {
forceRefreshes.close();
throw e;
}
}

private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
Expand All @@ -2349,7 +2370,7 @@ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onFailure(final Exception e) {
try {
Expand Down Expand Up @@ -2442,8 +2463,10 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true,
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
onPermitAcquired, true,
listener -> asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())
);
}

private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private Releasable acquire(Object debugInfo, StackTraceElement[] stackTrace) thr
/**
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
*
* @return the active operation count, or zero when all permits ar eheld
* @return the active operation count, or zero when all permits are held
*/
int getActiveOperationsCount() {
int availablePermits = semaphore.availablePermits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.translog.Translog;

Expand Down Expand Up @@ -53,6 +55,13 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
* Is this closed? If true then we won't add more listeners and have flushed all pending listeners.
*/
private volatile boolean closed = false;

/**
* Force-refreshes new refresh listeners that are added while {@code >= 0}. Used to prevent becoming blocked on operations waiting for
* refresh during relocation.
*/
private int refreshForcers;

/**
* List of refresh listeners. Defaults to null and built on demand because most refresh cycles won't need it. Entries are never removed
* from it, rather, it is nulled and rebuilt when needed again. The (hopefully) rare entries that didn't make the current refresh cycle
Expand All @@ -75,6 +84,31 @@ public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefres
this.threadContext = threadContext;
}

/**
* Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}.
*/
public Releasable forceRefreshes() {
synchronized (this) {
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
assert refreshForcers >= 0;
refreshForcers += 1;
}
final RunOnce runOnce = new RunOnce(() -> {
synchronized (RefreshListeners.this) {
assert refreshForcers > 0;
refreshForcers -= 1;
}
});
if (refreshNeeded()) {
try {
forceRefresh.run();
} catch (Exception e) {
runOnce.run();
throw e;
}
}
return () -> runOnce.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could assert before this line here that assert refreshListeners == null?

}

/**
* Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it
* forces a refresh and calls the listener immediately as well.
Expand Down Expand Up @@ -102,7 +136,7 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
listeners = new ArrayList<>();
refreshListeners = listeners;
}
if (listeners.size() < getMaxRefreshListeners.getAsInt()) {
if (refreshForcers == 0 && listeners.size() < getMaxRefreshListeners.getAsInt()) {
ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true);
Consumer<Boolean> contextPreservingListener = forced -> {
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -342,6 +343,40 @@ public void testLotsOfThreads() throws Exception {
refresher.cancel();
}

public void testDisallowAddListeners() throws Exception {
assertEquals(0, listeners.pendingCount());
DummyRefreshListener listener = new DummyRefreshListener();
assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
engine.refresh("I said so");
assertFalse(listener.forcedRefresh.get());
listener.assertNoError();

try (Releasable releaseable1 = listeners.forceRefreshes()) {
listener = new DummyRefreshListener();
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
assertTrue(listener.forcedRefresh.get());
listener.assertNoError();
assertEquals(0, listeners.pendingCount());

try (Releasable releaseable2 = listeners.forceRefreshes()) {
listener = new DummyRefreshListener();
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
assertTrue(listener.forcedRefresh.get());
listener.assertNoError();
assertEquals(0, listeners.pendingCount());
}

listener = new DummyRefreshListener();
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
assertTrue(listener.forcedRefresh.get());
listener.assertNoError();
assertEquals(0, listeners.pendingCount());
}

assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), new DummyRefreshListener()));
assertEquals(1, listeners.pendingCount());
}

private Engine.IndexResult index(String id) throws IOException {
return index(id, "test");
}
Expand Down
94 changes: 94 additions & 0 deletions server/src/test/java/org/elasticsearch/recovery/RelocationIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import com.carrotsearch.hppc.procedures.IntProcedure;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.English;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -506,6 +509,97 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr

}

public void testRelocateWhileWaitingForRefresh() {
logger.info("--> starting [node1] ...");
final String node1 = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", -1) // we want to control refreshes
).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have an actual index");
client().admin().indices().prepareFlush().execute().actionGet();
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i).execute();
}

logger.info("--> start another node");
final String node2 = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> relocate the shard from node1 to node2");
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand("test", 0, node1, node2))
.execute().actionGet();

clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> verifying count");
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
}

public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() {
logger.info("--> starting [node1] ...");
final String node1 = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", -1) // we want to control refreshes
).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have an actual index");
client().admin().indices().prepareFlush().execute().actionGet();
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i).execute();
}

logger.info("--> start another node");
final String node2 = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> relocate the shard from node1 to node2");
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand("test", 0, node1, node2))
.execute();
logger.info("--> index 100 docs while relocating");
for (int i = 20; i < 120; i++) {
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i).execute();
}
relocationListener.actionGet();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> verifying count");
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L));
}

class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {

private final CountDownLatch corruptionCount;
Expand Down