Skip to content

Commit

Permalink
feat: use ordered soldering for PCES flush requests (#11451)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <austin@swirldslabs.com>
  • Loading branch information
alittley committed Feb 14, 2024
1 parent 4ae42e0 commit 601f92b
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 52 deletions.
Expand Up @@ -610,12 +610,8 @@ public class SwirldsPlatform implements Platform {
intakeEventCounter);
final OrphanBuffer orphanBuffer = new OrphanBuffer(platformContext, intakeEventCounter);
final InOrderLinker inOrderLinker = new InOrderLinker(platformContext, time, intakeEventCounter);
final LinkedEventIntake linkedEventIntake = new LinkedEventIntake(
consensusRef::get,
eventObserverDispatcher,
shadowGraph,
intakeEventCounter,
platformWiring.getKeystoneEventSequenceNumberOutput());
final LinkedEventIntake linkedEventIntake =
new LinkedEventIntake(consensusRef::get, eventObserverDispatcher, shadowGraph, intakeEventCounter);

final EventCreationManager eventCreationManager = buildEventCreationManager(
platformContext,
Expand Down
Expand Up @@ -16,7 +16,6 @@

package com.swirlds.platform.components;

import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import com.swirlds.platform.Consensus;
import com.swirlds.platform.gossip.IntakeEventCounter;
import com.swirlds.platform.gossip.shadowgraph.Shadowgraph;
Expand Down Expand Up @@ -53,31 +52,23 @@ public class LinkedEventIntake {
*/
private final IntakeEventCounter intakeEventCounter;

/**
* The secondary wire that outputs the keystone event sequence number
*/
private final StandardOutputWire<Long> keystoneEventSequenceNumberOutput;

/**
* Constructor
*
* @param consensusSupplier provides the current consensus instance
* @param dispatcher invokes event related callbacks
* @param shadowGraph tracks events in the hashgraph
* @param keystoneEventSequenceNumberOutput the secondary wire that outputs the keystone event sequence number
*/
public LinkedEventIntake(
@NonNull final Supplier<Consensus> consensusSupplier,
@NonNull final EventObserverDispatcher dispatcher,
@NonNull final Shadowgraph shadowGraph,
@NonNull final IntakeEventCounter intakeEventCounter,
@NonNull final StandardOutputWire<Long> keystoneEventSequenceNumberOutput) {
@NonNull final IntakeEventCounter intakeEventCounter) {

this.consensusSupplier = Objects.requireNonNull(consensusSupplier);
this.dispatcher = Objects.requireNonNull(dispatcher);
this.shadowGraph = Objects.requireNonNull(shadowGraph);
this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter);
this.keystoneEventSequenceNumberOutput = Objects.requireNonNull(keystoneEventSequenceNumberOutput);
}

/**
Expand Down Expand Up @@ -106,12 +97,6 @@ public List<ConsensusRound> addEvent(@NonNull final EventImpl event) {

if (consensusRounds != null) {
consensusRounds.forEach(round -> {
// it is important that a flush request for the keystone event is submitted before starting
// to handle the transactions in the round. Otherwise, the system could arrive at a place
// where the transaction handler is waiting for a given event to become durable, but the
// PCES writer hasn't been notified yet that the event should be flushed.
keystoneEventSequenceNumberOutput.forward(
round.getKeystoneEvent().getBaseEvent().getStreamSequenceNumber());
// Future work: this dispatcher now only handles metrics. Remove this and put the metrics where
// they belong
dispatcher.consensusRound(round);
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.swirlds.common.wiring.wires.input.BindableInputWire;
import com.swirlds.common.wiring.wires.input.InputWire;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import com.swirlds.platform.components.LinkedEventIntake;
import com.swirlds.platform.internal.ConsensusRound;
import com.swirlds.platform.internal.EventImpl;
Expand All @@ -30,20 +29,18 @@
/**
* Wiring for the {@link LinkedEventIntake}.
*
* @param eventInput the input wire for events to be added to the hashgraph
* @param consensusRoundOutput the output wire for consensus rounds
* @param consensusEventsOutput the output wire for consensus events, transformed from the consensus round
* output
* @param keystoneEventSequenceNumberOutput the output wire for the keystone event sequence number
* @param flushRunnable the runnable to flush the intake
* @param startSquelchingRunnable the runnable to start squelching
* @param stopSquelchingRunnable the runnable to stop squelching
* @param eventInput the input wire for events to be added to the hashgraph
* @param consensusRoundOutput the output wire for consensus rounds
* @param consensusEventsOutput the output wire for consensus events, transformed from the consensus round
* output
* @param flushRunnable the runnable to flush the intake
* @param startSquelchingRunnable the runnable to start squelching
* @param stopSquelchingRunnable the runnable to stop squelching
*/
public record LinkedEventIntakeWiring(
@NonNull InputWire<EventImpl> eventInput,
@NonNull OutputWire<ConsensusRound> consensusRoundOutput,
@NonNull OutputWire<List<EventImpl>> consensusEventsOutput,
@NonNull StandardOutputWire<Long> keystoneEventSequenceNumberOutput,
@NonNull Runnable flushRunnable,
@NonNull Runnable startSquelchingRunnable,
@NonNull Runnable stopSquelchingRunnable) {
Expand All @@ -62,7 +59,6 @@ public static LinkedEventIntakeWiring create(@NonNull final TaskScheduler<List<C
taskScheduler.buildInputWire("linked events"),
consensusRoundOutput,
consensusRoundOutput.buildTransformer("getEvents", "rounds", ConsensusRound::getConsensusEvents),
taskScheduler.buildSecondaryOutputWire(),
taskScheduler::flush,
taskScheduler::startSquelching,
taskScheduler::stopSquelching);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.swirlds.common.wiring.counters.BackpressureObjectCounter;
import com.swirlds.common.wiring.counters.ObjectCounter;
import com.swirlds.common.wiring.model.WiringModel;
import com.swirlds.common.wiring.transformers.WireTransformer;
import com.swirlds.common.wiring.wires.input.InputWire;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
Expand Down Expand Up @@ -83,6 +84,7 @@
import com.swirlds.platform.wiring.components.StateSignatureCollectorWiring;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.LongSupplier;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -264,8 +266,26 @@ private void wire() {

pcesReplayerWiring.doneStreamingPcesOutputWire().solderTo(pcesWriterWiring.doneStreamingPcesInputWire());
pcesReplayerWiring.eventOutput().solderTo(eventHasherWiring.eventInput());
linkedEventIntakeWiring.keystoneEventSequenceNumberOutput().solderTo(pcesWriterWiring.flushRequestInputWire());
linkedEventIntakeWiring.consensusRoundOutput().solderTo(consensusRoundHandlerWiring.roundInput());

// Create the transformer that extracts keystone event sequence number from consensus rounds.
// This is done here instead of in LinkedEventIntake wiring, since the transformer needs to be soldered with
// specified ordering, relative to the wire carrying consensus rounds to the round handler
final WireTransformer<ConsensusRound, Long> keystoneEventSequenceNumberTransformer = new WireTransformer<>(
model, "getKeystoneEventSequenceNumber", "rounds", round -> round.getKeystoneEvent()
.getBaseEvent()
.getStreamSequenceNumber());
keystoneEventSequenceNumberTransformer.getOutputWire().solderTo(pcesWriterWiring.flushRequestInputWire());

// The request to flush the keystone event for a round must be sent to the PCES writer before the consensus
// round is passed to the round handler. This prevents a deadlock scenario where the consensus round
// handler has a full queue and won't accept additional rounds, and is waiting on a keystone event to be
// durably flushed to disk. Meanwhile, the PCES writer hasn't even received the flush request yet, so the
// necessary keystone event is *never* flushed.
linkedEventIntakeWiring
.consensusRoundOutput()
.orderedSolderTo(List.of(
keystoneEventSequenceNumberTransformer.getInputWire(),
consensusRoundHandlerWiring.roundInput()));
linkedEventIntakeWiring.consensusRoundOutput().solderTo(eventWindowManagerWiring.consensusRoundInput());
linkedEventIntakeWiring.consensusEventsOutput().solderTo(eventStreamManagerWiring.eventsInput());
pcesWriterWiring
Expand Down Expand Up @@ -515,17 +535,6 @@ public LongSupplier getIntakeQueueSizeSupplier() {
return postHashCollectorWiring.unprocessedTaskCountSupplier();
}

/**
* Get the output wire that {@link LinkedEventIntake} uses to pass keystone event sequence numbers to the
* {@link PcesWriter}, to be flushed.
*
* @return the output wire for keystone event sequence numbers
*/
@NonNull
public StandardOutputWire<Long> getKeystoneEventSequenceNumberOutput() {
return linkedEventIntakeWiring.keystoneEventSequenceNumberOutput();
}

/**
* Update the running hash for all components that need it.
*
Expand Down
Expand Up @@ -11,17 +11,17 @@ pcli diagram \
-s 'pcesReplayer:done streaming pces:@' \
-s 'inOrderLinker:events to gossip:g' \
-s 'runningHashUpdate:running hash update:§' \
-s 'linkedEventIntake:flush request:Ξ' \
-s 'getKeystoneEventSequenceNumber:flush request:Ξ' \
-g 'Event Validation:internalEventValidator,eventDeduplicator,eventSignatureValidator' \
-g 'Event Hashing:eventHasher,postHashCollector' \
-g 'Orphan Buffer:orphanBuffer,orphanBufferSplitter' \
-g 'Linked Event Intake:linkedEventIntake,linkedEventIntakeSplitter,eventWindowManager' \
-g 'Linked Event Intake:linkedEventIntake,linkedEventIntakeSplitter,eventWindowManager,getKeystoneEventSequenceNumber' \
-g 'State File Management:saveToDiskFilter,signedStateFileManager,extractOldestMinimumGenerationOnDisk,toStateWrittenToDiskAction' \
-g 'State Signature Collection:stateSignatureCollector,reservedStateSplitter,allStatesReserver,completeStateFilter,completeStatesReserver,extractConsensusSignatureTransactions,extractPreconsensusSignatureTransactions' \
-g 'Intake Pipeline:Event Validation,Orphan Buffer,Event Hashing' \
-g 'Preconsensus Event Stream:pcesSequencer,pcesWriter,eventDurabilityNexus' \
-g 'Consensus Event Stream:getEvents,eventStreamManager' \
-g 'Consensus Pipeline:inOrderLinker,Linked Event Intake,g' \
-g 'Consensus Pipeline:inOrderLinker,Linked Event Intake,g,ʘ,Ξ' \
-g 'Event Creation:futureEventBuffer,futureEventBufferSplitter,eventCreationManager' \
-g 'Gossip:gossip,shadowgraph' \
-c 'Consensus Event Stream' \
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.swirlds.common.test.fixtures.platform.TestPlatformContextBuilder;
import com.swirlds.common.wiring.counters.BackpressureObjectCounter;
import com.swirlds.common.wiring.model.WiringModel;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import com.swirlds.config.extensions.test.fixtures.TestConfigBuilder;
import com.swirlds.platform.Consensus;
import com.swirlds.platform.ConsensusImpl;
Expand Down Expand Up @@ -126,8 +125,8 @@ public TestIntake(@NonNull final AddressBook addressBook, @NonNull final Consens
final EventObserverDispatcher dispatcher =
new EventObserverDispatcher(new ShadowGraphEventObserver(shadowGraph), output);

final LinkedEventIntake linkedEventIntake = new LinkedEventIntake(
() -> consensus, dispatcher, shadowGraph, intakeEventCounter, mock(StandardOutputWire.class));
final LinkedEventIntake linkedEventIntake =
new LinkedEventIntake(() -> consensus, dispatcher, shadowGraph, intakeEventCounter);

linkedEventIntakeWiring = LinkedEventIntakeWiring.create(schedulers.linkedEventIntakeScheduler());
linkedEventIntakeWiring.bind(linkedEventIntake);
Expand Down

0 comments on commit 601f92b

Please sign in to comment.