Skip to content

Commit

Permalink
fix: Modify where components look to indicate overloaded intake (#11369)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <austin@swirldslabs.com>
  • Loading branch information
alittley committed Feb 6, 2024
1 parent 3cf9f8c commit 4324340
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
Expand Up @@ -625,7 +625,7 @@ public class SwirldsPlatform implements Platform {
selfId,
appVersion,
transactionPool,
platformWiring.getHasherUnprocessedTaskCountSupplier(),
platformWiring.getIntakeQueueSizeSupplier(),
platformStatusManager::getCurrentStatus,
latestReconnectRound::get);

Expand Down Expand Up @@ -702,7 +702,7 @@ public class SwirldsPlatform implements Platform {
emergencyRecoveryManager,
consensusRef,
platformWiring.getGossipEventInput()::put,
platformWiring.getHasherUnprocessedTaskCountSupplier(),
platformWiring.getIntakeQueueSizeSupplier(),
swirldStateManager,
latestCompleteState,
syncMetrics,
Expand Down
Expand Up @@ -502,13 +502,17 @@ public InputWire<Long> getPcesWriterRegisterDiscontinuityInput() {
}

/**
* Get a supplier for the number of unprocessed tasks in the hasher.
* Get a supplier for the number of unprocessed tasks at the front of the intake pipeline. This is for the purpose
* of applying backpressure to the event creator and gossip when the intake pipeline is overloaded.
* <p>
* Technically, the first component of the intake pipeline is the hasher, but tasks to be passed along actually
* accumulate in the post hash collector. This is due to how the concurrent hasher handles backpressure.
*
* @return a supplier for the number of unprocessed tasks in the hasher
* @return a supplier for the number of unprocessed tasks in the PostHashCollector
*/
@NonNull
public LongSupplier getHasherUnprocessedTaskCountSupplier() {
return eventHasherWiring.unprocessedTaskCountSupplier();
public LongSupplier getIntakeQueueSizeSupplier() {
return postHashCollectorWiring.unprocessedTaskCountSupplier();
}

/**
Expand Down
Expand Up @@ -23,30 +23,23 @@
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.event.hashing.EventHasher;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.function.LongSupplier;

/**
* Wiring for the {@link EventHasher}.
*
* @param eventInput the input wire for events to be hashed
* @param eventOutput the output wire for hashed events
* @param unprocessedTaskCountSupplier the supplier for the number of unprocessed tasks
* @param eventInput the input wire for events to be hashed
* @param eventOutput the output wire for hashed events
*/
public record EventHasherWiring(
@NonNull InputWire<GossipEvent> eventInput,
@NonNull OutputWire<GossipEvent> eventOutput,
@NonNull LongSupplier unprocessedTaskCountSupplier) {
@NonNull InputWire<GossipEvent> eventInput, @NonNull OutputWire<GossipEvent> eventOutput) {
/**
* Create a new instance of this wiring.
*
* @param taskScheduler the task scheduler for this wiring
* @return the new wiring instance
*/
public static EventHasherWiring create(@NonNull final TaskScheduler<GossipEvent> taskScheduler) {
return new EventHasherWiring(
taskScheduler.buildInputWire("events to hash"),
taskScheduler.getOutputWire(),
taskScheduler::getUnprocessedTaskCount);
return new EventHasherWiring(taskScheduler.buildInputWire("events to hash"), taskScheduler.getOutputWire());
}

/**
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.platform.event.GossipEvent;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.function.LongSupplier;

/**
* Wiring object that allows for the staging of events that have been hashed, but haven't been passed further down the
Expand All @@ -41,11 +42,14 @@
* The concurrent scheduler will refuse to accept additional work based on the number of tasks that are waiting in the
* sequential scheduler's queue.
*
* @param eventInput the input wire for events that have been hashed
* @param eventOutput the output wire for events to be passed further along the pipeline
* @param eventInput the input wire for events that have been hashed
* @param eventOutput the output wire for events to be passed further along the pipeline
* @param unprocessedTaskCountSupplier the supplier for the number of unprocessed tasks
*/
public record PostHashCollectorWiring(
@NonNull InputWire<GossipEvent> eventInput, @NonNull OutputWire<GossipEvent> eventOutput) {
@NonNull InputWire<GossipEvent> eventInput,
@NonNull OutputWire<GossipEvent> eventOutput,
@NonNull LongSupplier unprocessedTaskCountSupplier) {

/**
* Create a new instance of this wiring.
Expand All @@ -60,6 +64,7 @@ public static PostHashCollectorWiring create(@NonNull final TaskScheduler<Gossip
// component in the pipeline is ready to receive them
inputWire.bind(hashedEvent -> hashedEvent);

return new PostHashCollectorWiring(inputWire, taskScheduler.getOutputWire());
return new PostHashCollectorWiring(
inputWire, taskScheduler.getOutputWire(), taskScheduler::getUnprocessedTaskCount);
}
}

0 comments on commit 4324340

Please sign in to comment.