Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Dec 28, 2018
1 parent f7a5171 commit daa9fc7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 76 deletions.
88 changes: 25 additions & 63 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -607,12 +607,10 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer)
throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
refreshListeners.disallowAdd();
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
try {
if (refreshListeners.refreshNeeded()) {
refresh("relocated");
}
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated";
Expand Down Expand Up @@ -644,7 +642,7 @@ public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer
failShard("timed out waiting for relocation hand-off to complete", null);
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
} finally {
refreshListeners.allowAdd();
forceRefreshes.close();
}
}

Expand Down Expand Up @@ -2345,15 +2343,22 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

refreshListeners.disallowAdd();
asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> {
forceRefreshes.close();
onPermitAcquired.onResponse(r);
}, e -> {
forceRefreshes.close();
onPermitAcquired.onFailure(e);
});
try {
if (refreshListeners.refreshNeeded()) {
refresh("acquire all primary operations permits");
}
indexShardOperationPermits.asyncBlockOperations(
ensuringAllowRefreshListeners(onPermitAcquired), timeout.duration(), timeout.timeUnit());
indexShardOperationPermits.asyncBlockOperations(wrappedListener, timeout, timeUnit);
} catch (Exception e) {
refreshListeners.allowAdd();
forceRefreshes.close();
throw e;
}
}
Expand All @@ -2365,7 +2370,7 @@ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
final ActionListener<Releasable> actionListener = new ActionListener<Releasable>() {
asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onFailure(final Exception e) {
try {
Expand Down Expand Up @@ -2411,41 +2416,9 @@ public void onResponse(final Releasable releasable) {
}
}
}
};
refreshListeners.disallowAdd();
try {
if (refreshListeners.refreshNeeded()) {
refresh("bump primary term");
}
indexShardOperationPermits.asyncBlockOperations(ensuringAllowRefreshListeners(actionListener), 30, TimeUnit.MINUTES);
pendingPrimaryTerm = newPrimaryTerm;
termUpdated.countDown();
} catch (Exception e) {
refreshListeners.allowAdd();
throw e;
}
}

/**
* Wraps an ActionListener so that {@link RefreshListeners#allowAdd()} is always called on {@link #refreshListeners}
* after the listeners actions have executed.
* @param actionListener ActionListener to wrap
* @return Wrapped ActionListener
*/
private ActionListener<Releasable> ensuringAllowRefreshListeners(ActionListener<Releasable> actionListener) {
return ActionListener.wrap(r -> {
try {
actionListener.onResponse(r);
} finally {
refreshListeners.allowAdd();
}
}, e -> {
try {
actionListener.onFailure(e);
} finally {
refreshListeners.allowAdd();
}
});
}, 30, TimeUnit.MINUTES);
pendingPrimaryTerm = newPrimaryTerm;
termUpdated.countDown();
}

/**
Expand Down Expand Up @@ -2490,21 +2463,10 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout) {
refreshListeners.disallowAdd();
try {
if (refreshListeners.refreshNeeded()) {
refresh("acquire all replica operations permits");
}
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
onPermitAcquired, true,
listener -> indexShardOperationPermits.asyncBlockOperations(
ensuringAllowRefreshListeners(listener), timeout.duration(), timeout.timeUnit()
)
);
} catch (Exception e) {
refreshListeners.allowAdd();
throw e;
}
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
onPermitAcquired, true,
listener -> asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())
);
}

private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.translog.Translog;

Expand Down Expand Up @@ -55,7 +57,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
private volatile boolean closed = false;

/**
* Prevents new refresh listeners from being registered while {@code >= 0}. Used to prevent becoming blocked on operations waiting for
* Force-refreshes new refresh listeners that are added while {@code >= 0}. Used to prevent becoming blocked on operations waiting for
* refresh during relocation.
*/
private int refreshForcers;
Expand Down Expand Up @@ -83,19 +85,28 @@ public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefres
}

/**
* Prohibit adding new refresh listeners. See {@link #refreshForcers}.
* Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}.
*/
public synchronized void disallowAdd() {
refreshForcers += 1;
assert refreshForcers >= 0;
}

/**
* Enable adding new refresh listeners. See {@link #refreshForcers}.
*/
public synchronized void allowAdd() {
refreshForcers -= 1;
assert refreshForcers >= 0;
public Releasable forceRefreshes() {
synchronized (this) {
assert refreshForcers >= 0;
refreshForcers += 1;
}
final RunOnce runOnce = new RunOnce(() -> {
synchronized (RefreshListeners.this) {
assert refreshForcers > 0;
refreshForcers -= 1;
}
});
if (refreshNeeded()) {
try {
forceRefresh.run();
} catch (Exception e) {
runOnce.run();
throw e;
}
}
return () -> runOnce.run();
}

/**
Expand Down

0 comments on commit daa9fc7

Please sign in to comment.