Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove reconnect abort #9741

Merged
merged 10 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -87,13 +86,6 @@ public class TeachingSynchronizer {

protected final ReconnectConfig reconnectConfig;

/**
* A mechanism to check if teaching should be stopped, e.g. when the teacher itself has
* fallen behind network.
*/
@Nullable
private final BooleanSupplier requestToStopTeaching;

private final Time time;

/**
Expand All @@ -113,8 +105,6 @@ public class TeachingSynchronizer {
* if there is a thread stuck on a blocking IO
* operation that will never finish due to a
* failure.
* @param requestToStopTeaching
* a function to check periodically if teaching should be stopped
* @param reconnectConfig
* reconnect configuration from platform
*/
Expand All @@ -125,7 +115,6 @@ public TeachingSynchronizer(
@NonNull final MerkleDataOutputStream out,
@NonNull final MerkleNode root,
@Nullable final Runnable breakConnection,
@Nullable final BooleanSupplier requestToStopTeaching,
@NonNull final ReconnectConfig reconnectConfig) {

this.time = Objects.requireNonNull(time);
Expand All @@ -137,7 +126,6 @@ public TeachingSynchronizer(
subtrees.add(new TeacherSubtree(root));

this.breakConnection = breakConnection;
this.requestToStopTeaching = requestToStopTeaching;
this.reconnectConfig = Objects.requireNonNull(reconnectConfig, "reconnectConfig must not be null");
}

Expand Down Expand Up @@ -202,16 +190,7 @@ private <T> void sendTree(final MerkleNode root, final TeacherTreeView<T> view)

final AtomicBoolean senderIsFinished = new AtomicBoolean(false);

new TeacherSendingThread<T>(
time,
reconnectConfig,
workGroup,
in,
out,
subtrees,
view,
requestToStopTeaching,
senderIsFinished)
new TeacherSendingThread<T>(time, reconnectConfig, workGroup, in, out, subtrees, view, senderIsFinished)
.start();
new TeacherReceivingThread<>(workGroup, in, view, senderIsFinished).start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import com.swirlds.common.threading.pool.StandardWorkGroup;
import com.swirlds.common.utility.throttle.RateLimiter;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -63,9 +61,6 @@ public class TeacherSendingThread<T> {
private final Queue<TeacherSubtree> subtrees;
private final TeacherTreeView<T> view;

@Nullable
private final BooleanSupplier requestToStopTeaching;

private final AtomicBoolean senderIsFinished;

private final RateLimiter rateLimiter;
Expand All @@ -82,8 +77,6 @@ public class TeacherSendingThread<T> {
* @param subtrees a queue containing roots of subtrees to send, may have more roots added by this
* class
* @param view an object that interfaces with the subtree
* @param requestToStopTeaching a function to check periodically if teaching should be stopped, e.g. because of the
* teacher has fallen behind network
* @param senderIsFinished set to true when this thread has finished
*/
public TeacherSendingThread(
Expand All @@ -94,14 +87,12 @@ public TeacherSendingThread(
final AsyncOutputStream<Lesson<T>> out,
final Queue<TeacherSubtree> subtrees,
final TeacherTreeView<T> view,
@Nullable final BooleanSupplier requestToStopTeaching,
final AtomicBoolean senderIsFinished) {
this.workGroup = workGroup;
this.in = in;
this.out = out;
this.subtrees = subtrees;
this.view = view;
this.requestToStopTeaching = requestToStopTeaching;
this.senderIsFinished = senderIsFinished;

final int maxRate = reconnectConfig.teacherMaxNodesPerSecond();
Expand Down Expand Up @@ -212,13 +203,6 @@ private void run() {

while (view.areThereNodesToHandle()) {
rateLimit();

if ((requestToStopTeaching != null) && requestToStopTeaching.getAsBoolean()) {
logger.info(
RECONNECT.getMarker(),
"Teacher's sending thread is requested to stop teaching (fallen behind?)");
break;
}
final T node = view.getNextNodeToHandle();
sendLesson(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public boolean acquireProvidePermit() {
return providePermit.tryAcquire();
}

/**
* Try to acquire the provide permit bypassing the check to see if the consumer is waiting for the resource, this
* will block the providers until {@link #releaseProvidePermit()} is called
*
* @return true if the permit has been acquired
*/
public boolean tryBlockProvidePermit() {
return providePermit.tryAcquire();
}

/**
* Release a previously acquired provide permit
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -247,23 +246,6 @@ void fullTeacherSingleLeafLearner2() {
assertDoesNotThrow(this::reconnect, "Should not throw a Exception");
}

@Test
@Tags({@Tag("VirtualMerkle"), @Tag("Reconnect")})
@DisplayName("Teacher is requested to stop teaching after a few attempts")
void simulateTeacherFallenBehind() {
teacherMap.put(A_KEY, APPLE);
teacherMap.put(B_KEY, BANANA);
teacherMap.put(C_KEY, CHERRY);
teacherMap.put(D_KEY, DATE);
teacherMap.put(E_KEY, EGGPLANT);
teacherMap.put(F_KEY, FIG);

final AtomicInteger counter = new AtomicInteger(0);
requestTeacherToStop = () -> counter.incrementAndGet() == 4;

reconnectMultipleTimes(2);
}

/**
* This test simulates some divergence from the teacher and the learner. At the time both the teacher and learner
* had diverged, both had simple integer values for the key and value. At the time of divergence, the teacher had
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -96,7 +95,6 @@ public class VirtualMapReconnectTestBase {
protected VirtualMap<TestKey, TestValue> learnerMap;
protected BrokenBuilder teacherBuilder;
protected BrokenBuilder learnerBuilder;
protected BooleanSupplier requestTeacherToStop;

VirtualDataSourceBuilder<TestKey, TestValue> createBuilder() throws IOException {
// The tests create maps with identical names. They would conflict with each other in the default
Expand Down Expand Up @@ -132,7 +130,6 @@ void setupEach() throws Exception {
learnerBuilder = createBrokenBuilder(dataSourceBuilder);
teacherMap = new VirtualMap<>("Teacher", teacherBuilder);
learnerMap = new VirtualMap<>("Learner", learnerBuilder);
requestTeacherToStop = () -> false; // don't interrupt teaching by default
}

@BeforeAll
Expand Down Expand Up @@ -221,10 +218,7 @@ protected void reconnectMultipleTimes(

try {
final MerkleNode node = MerkleTestUtils.hashAndTestSynchronization(
learnerTree,
failureExpected ? brokenTeacherTree : teacherTree,
requestTeacherToStop,
reconnectConfig);
learnerTree, failureExpected ? brokenTeacherTree : teacherTree, reconnectConfig);
node.release();
assertFalse(failureExpected, "We should only succeed on the last try");
final VirtualRoot root = learnerMap.getRight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public boolean acquireLearnerPermit() {
return connectionProvider.acquireProvidePermit();
}

/**
* Try to block the learner permit for reconnect. The method {@link #cancelLearnerPermit()} should be called
* to unblock the permit.
*
* @return true if the permit has been blocked
*/
public boolean blockLearnerPermit() {
return connectionProvider.tryBlockProvidePermit();
}

/**
* Releases a previously acquired permit for reconnect
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public boolean shouldAccept() {
RECONNECT.getMarker(),
"Rejecting reconnect request from node {} because this node has fallen behind",
peerId);
reconnectMetrics.recordReconnectRejection(peerId);
reconnectRejected();
return false;
}

Expand All @@ -169,7 +169,7 @@ public boolean shouldAccept() {
RECONNECT.getMarker(),
"Rejecting reconnect request from node {} because this node isn't ACTIVE",
peerId);
reconnectMetrics.recordReconnectRejection(peerId);
reconnectRejected();
return false;
}

Expand All @@ -181,35 +181,52 @@ public boolean shouldAccept() {
RECONNECT.getMarker(),
"Rejecting reconnect request from node {} due to lack of a fully signed state",
peerId);
reconnectMetrics.recordReconnectRejection(peerId);
reconnectRejected();
return false;
}

if (!teacherState.get().isComplete()) {
// this is only possible if signed state manager violates its contractual obligations
teacherState.close();
teacherState = null;
stateIncompleteLogger.error(
RECONNECT.getMarker(),
"Rejecting reconnect request from node {} due to lack of a fully signed state."
+ " The signed state manager attempted to provide a state that was not"
+ " fully signed, which should not be possible.",
peerId);
reconnectMetrics.recordReconnectRejection(peerId);
reconnectRejected();
return false;
}

// we should not become a learner while we are teaching
// this can happen if we fall behind while we are teaching
// in this case, we want to finish teaching before we start learning
// so we acquire the learner permit and release it when we are done teaching
if (!reconnectController.blockLearnerPermit()) {
reconnectRejected();
return false;
}

// Check if a reconnect with the learner is permitted by the throttle.
final boolean reconnectPermittedByThrottle = teacherThrottle.initiateReconnect(peerId);
if (reconnectPermittedByThrottle) {
initiatedBy = InitiatedBy.PEER;
return true;
} else {
if (!reconnectPermittedByThrottle) {
reconnectRejected();
reconnectController.cancelLearnerPermit();
return false;
}

initiatedBy = InitiatedBy.PEER;
return true;
}

/**
* Called when we reject a reconnect as a teacher
*/
private void reconnectRejected() {
if (teacherState != null) {
teacherState.close();
teacherState = null;
reconnectMetrics.recordReconnectRejection(peerId);
return false;
}
reconnectMetrics.recordReconnectRejection(peerId);
}

/** {@inheritDoc} */
Expand All @@ -218,6 +235,8 @@ public void acceptFailed() {
teacherState.close();
teacherState = null;
teacherThrottle.reconnectAttemptFinished();
// cancel the permit acquired in shouldAccept() so that we can start learning if we need to
reconnectController.cancelLearnerPermit();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -269,13 +288,14 @@ private void teacher(final Connection connection) {
connection.getSelfId(),
connection.getOtherId(),
state.get().getRound(),
fallenBehindManager::hasFallenBehind,
reconnectMetrics,
configuration)
.execute(state.get());
} finally {
teacherThrottle.reconnectAttemptFinished();
teacherState = null;
// cancel the permit acquired in shouldAccept() so that we can start learning if we need to
reconnectController.cancelLearnerPermit();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import com.swirlds.platform.network.Connection;
import com.swirlds.platform.state.signed.SignedState;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.net.SocketException;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -70,12 +68,6 @@ public class ReconnectTeacher {
private final ThreadManager threadManager;
private final Time time;

/**
* A function to check periodically if teaching should be stopped, e.g. when the teacher has fallen behind.
*/
@Nullable
private final BooleanSupplier requestToStopTeaching;

/**
* @param threadManager responsible for managing thread lifecycles
* @param connection the connection to be used for the reconnect
Expand All @@ -94,7 +86,6 @@ public ReconnectTeacher(
@NonNull final NodeId selfId,
@NonNull final NodeId otherId,
final long lastRoundReceived,
@Nullable final BooleanSupplier requestToStopTeaching,
@NonNull final ReconnectMetrics statistics,
@NonNull final Configuration configuration) {

Expand All @@ -106,7 +97,6 @@ public ReconnectTeacher(
this.selfId = Objects.requireNonNull(selfId);
this.otherId = Objects.requireNonNull(otherId);
this.lastRoundReceived = lastRoundReceived;
this.requestToStopTeaching = requestToStopTeaching;
this.statistics = Objects.requireNonNull(statistics);
this.configuration = Objects.requireNonNull(configuration);
}
Expand Down Expand Up @@ -231,7 +221,6 @@ private void reconnect(final SignedState signedState) throws InterruptedExceptio
new MerkleDataOutputStream(connection.getDos()),
signedState.getState(),
connection::disconnect,
requestToStopTeaching,
reconnectConfig);

synchronizer.synchronize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,7 @@ public void runProtocol(final Connection connection)
private void teacher(final Connection connection) {
try {
new EmergencyReconnectTeacher(
time,
threadManager,
stateFinder,
reconnectSocketTimeout,
fallenBehindManager::hasFallenBehind,
reconnectMetrics,
configuration)
time, threadManager, stateFinder, reconnectSocketTimeout, reconnectMetrics, configuration)
.execute(connection);
} finally {
teacherThrottle.reconnectAttemptFinished();
Expand Down