From e0c236b940760faa9f9b9b36ea7bd49de5dcacde Mon Sep 17 00:00:00 2001 From: Austin Littley <102969658+alittley@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:06:23 -0500 Subject: [PATCH] feat: use ordered soldering for PCES flush requests (#11451) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Austin Littley Signed-off-by: Timo Brandstätter --- .../com/swirlds/platform/SwirldsPlatform.java | 8 ++--- .../components/LinkedEventIntake.java | 17 +-------- .../wiring/LinkedEventIntakeWiring.java | 18 ++++------ .../platform/wiring/PlatformWiring.java | 35 ++++++++++++------- .../wiring/generate-platform-diagram.sh | 6 ++-- .../platform/test/consensus/TestIntake.java | 5 ++- 6 files changed, 37 insertions(+), 52 deletions(-) diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java index 06f4d2c1a600..4a9e9e38a224 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java @@ -610,12 +610,8 @@ public class SwirldsPlatform implements Platform { intakeEventCounter); final OrphanBuffer orphanBuffer = new OrphanBuffer(platformContext, intakeEventCounter); final InOrderLinker inOrderLinker = new InOrderLinker(platformContext, time, intakeEventCounter); - final LinkedEventIntake linkedEventIntake = new LinkedEventIntake( - consensusRef::get, - eventObserverDispatcher, - shadowGraph, - intakeEventCounter, - platformWiring.getKeystoneEventSequenceNumberOutput()); + final LinkedEventIntake linkedEventIntake = + new LinkedEventIntake(consensusRef::get, eventObserverDispatcher, shadowGraph, intakeEventCounter); final EventCreationManager eventCreationManager = buildEventCreationManager( platformContext, diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java index d975da5f8323..144d38775ce1 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java @@ -16,7 +16,6 @@ package com.swirlds.platform.components; -import com.swirlds.common.wiring.wires.output.StandardOutputWire; import com.swirlds.platform.Consensus; import com.swirlds.platform.gossip.IntakeEventCounter; import com.swirlds.platform.gossip.shadowgraph.Shadowgraph; @@ -53,31 +52,23 @@ public class LinkedEventIntake { */ private final IntakeEventCounter intakeEventCounter; - /** - * The secondary wire that outputs the keystone event sequence number - */ - private final StandardOutputWire keystoneEventSequenceNumberOutput; - /** * Constructor * * @param consensusSupplier provides the current consensus instance * @param dispatcher invokes event related callbacks * @param shadowGraph tracks events in the hashgraph - * @param keystoneEventSequenceNumberOutput the secondary wire that outputs the keystone event sequence number */ public LinkedEventIntake( @NonNull final Supplier consensusSupplier, @NonNull final EventObserverDispatcher dispatcher, @NonNull final Shadowgraph shadowGraph, - @NonNull final IntakeEventCounter intakeEventCounter, - @NonNull final StandardOutputWire keystoneEventSequenceNumberOutput) { + @NonNull final IntakeEventCounter intakeEventCounter) { this.consensusSupplier = Objects.requireNonNull(consensusSupplier); this.dispatcher = Objects.requireNonNull(dispatcher); this.shadowGraph = Objects.requireNonNull(shadowGraph); this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter); - this.keystoneEventSequenceNumberOutput = Objects.requireNonNull(keystoneEventSequenceNumberOutput); } /** @@ -106,12 +97,6 @@ public List addEvent(@NonNull final EventImpl event) { if (consensusRounds != null) { consensusRounds.forEach(round -> { - // it is important that a flush request for the keystone event is submitted before starting - // to handle the transactions in the round. Otherwise, the system could arrive at a place - // where the transaction handler is waiting for a given event to become durable, but the - // PCES writer hasn't been notified yet that the event should be flushed. - keystoneEventSequenceNumberOutput.forward( - round.getKeystoneEvent().getBaseEvent().getStreamSequenceNumber()); // Future work: this dispatcher now only handles metrics. Remove this and put the metrics where // they belong dispatcher.consensusRound(round); diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeWiring.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeWiring.java index ac032b3bd41a..9cf2a145d580 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeWiring.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeWiring.java @@ -20,7 +20,6 @@ import com.swirlds.common.wiring.wires.input.BindableInputWire; import com.swirlds.common.wiring.wires.input.InputWire; import com.swirlds.common.wiring.wires.output.OutputWire; -import com.swirlds.common.wiring.wires.output.StandardOutputWire; import com.swirlds.platform.components.LinkedEventIntake; import com.swirlds.platform.internal.ConsensusRound; import com.swirlds.platform.internal.EventImpl; @@ -30,20 +29,18 @@ /** * Wiring for the {@link LinkedEventIntake}. * - * @param eventInput the input wire for events to be added to the hashgraph - * @param consensusRoundOutput the output wire for consensus rounds - * @param consensusEventsOutput the output wire for consensus events, transformed from the consensus round - * output - * @param keystoneEventSequenceNumberOutput the output wire for the keystone event sequence number - * @param flushRunnable the runnable to flush the intake - * @param startSquelchingRunnable the runnable to start squelching - * @param stopSquelchingRunnable the runnable to stop squelching + * @param eventInput the input wire for events to be added to the hashgraph + * @param consensusRoundOutput the output wire for consensus rounds + * @param consensusEventsOutput the output wire for consensus events, transformed from the consensus round + * output + * @param flushRunnable the runnable to flush the intake + * @param startSquelchingRunnable the runnable to start squelching + * @param stopSquelchingRunnable the runnable to stop squelching */ public record LinkedEventIntakeWiring( @NonNull InputWire eventInput, @NonNull OutputWire consensusRoundOutput, @NonNull OutputWire> consensusEventsOutput, - @NonNull StandardOutputWire keystoneEventSequenceNumberOutput, @NonNull Runnable flushRunnable, @NonNull Runnable startSquelchingRunnable, @NonNull Runnable stopSquelchingRunnable) { @@ -62,7 +59,6 @@ public static LinkedEventIntakeWiring create(@NonNull final TaskScheduler keystoneEventSequenceNumberTransformer = new WireTransformer<>( + model, "getKeystoneEventSequenceNumber", "rounds", round -> round.getKeystoneEvent() + .getBaseEvent() + .getStreamSequenceNumber()); + keystoneEventSequenceNumberTransformer.getOutputWire().solderTo(pcesWriterWiring.flushRequestInputWire()); + + // The request to flush the keystone event for a round must be sent to the PCES writer before the consensus + // round is passed to the round handler. This prevents a deadlock scenario where the consensus round + // handler has a full queue and won't accept additional rounds, and is waiting on a keystone event to be + // durably flushed to disk. Meanwhile, the PCES writer hasn't even received the flush request yet, so the + // necessary keystone event is *never* flushed. + linkedEventIntakeWiring + .consensusRoundOutput() + .orderedSolderTo(List.of( + keystoneEventSequenceNumberTransformer.getInputWire(), + consensusRoundHandlerWiring.roundInput())); linkedEventIntakeWiring.consensusRoundOutput().solderTo(eventWindowManagerWiring.consensusRoundInput()); linkedEventIntakeWiring.consensusEventsOutput().solderTo(eventStreamManagerWiring.eventsInput()); pcesWriterWiring @@ -515,17 +535,6 @@ public LongSupplier getIntakeQueueSizeSupplier() { return postHashCollectorWiring.unprocessedTaskCountSupplier(); } - /** - * Get the output wire that {@link LinkedEventIntake} uses to pass keystone event sequence numbers to the - * {@link PcesWriter}, to be flushed. - * - * @return the output wire for keystone event sequence numbers - */ - @NonNull - public StandardOutputWire getKeystoneEventSequenceNumberOutput() { - return linkedEventIntakeWiring.keystoneEventSequenceNumberOutput(); - } - /** * Update the running hash for all components that need it. * diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh index bcfb05c61b2a..d6f84d5afbb7 100755 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh @@ -11,17 +11,17 @@ pcli diagram \ -s 'pcesReplayer:done streaming pces:@' \ -s 'inOrderLinker:events to gossip:g' \ -s 'runningHashUpdate:running hash update:§' \ - -s 'linkedEventIntake:flush request:Ξ' \ + -s 'getKeystoneEventSequenceNumber:flush request:Ξ' \ -g 'Event Validation:internalEventValidator,eventDeduplicator,eventSignatureValidator' \ -g 'Event Hashing:eventHasher,postHashCollector' \ -g 'Orphan Buffer:orphanBuffer,orphanBufferSplitter' \ - -g 'Linked Event Intake:linkedEventIntake,linkedEventIntakeSplitter,eventWindowManager' \ + -g 'Linked Event Intake:linkedEventIntake,linkedEventIntakeSplitter,eventWindowManager,getKeystoneEventSequenceNumber' \ -g 'State File Management:saveToDiskFilter,signedStateFileManager,extractOldestMinimumGenerationOnDisk,toStateWrittenToDiskAction' \ -g 'State Signature Collection:stateSignatureCollector,reservedStateSplitter,allStatesReserver,completeStateFilter,completeStatesReserver,extractConsensusSignatureTransactions,extractPreconsensusSignatureTransactions' \ -g 'Intake Pipeline:Event Validation,Orphan Buffer,Event Hashing' \ -g 'Preconsensus Event Stream:pcesSequencer,pcesWriter,eventDurabilityNexus' \ -g 'Consensus Event Stream:getEvents,eventStreamManager' \ - -g 'Consensus Pipeline:inOrderLinker,Linked Event Intake,g' \ + -g 'Consensus Pipeline:inOrderLinker,Linked Event Intake,g,ʘ,Ξ' \ -g 'Event Creation:futureEventBuffer,futureEventBufferSplitter,eventCreationManager' \ -g 'Gossip:gossip,shadowgraph' \ -c 'Consensus Event Stream' \ diff --git a/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/main/java/com/swirlds/platform/test/consensus/TestIntake.java b/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/main/java/com/swirlds/platform/test/consensus/TestIntake.java index 8d73cf5765ca..da6760a27903 100644 --- a/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/main/java/com/swirlds/platform/test/consensus/TestIntake.java +++ b/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/main/java/com/swirlds/platform/test/consensus/TestIntake.java @@ -24,7 +24,6 @@ import com.swirlds.common.test.fixtures.platform.TestPlatformContextBuilder; import com.swirlds.common.wiring.counters.BackpressureObjectCounter; import com.swirlds.common.wiring.model.WiringModel; -import com.swirlds.common.wiring.wires.output.StandardOutputWire; import com.swirlds.config.extensions.test.fixtures.TestConfigBuilder; import com.swirlds.platform.Consensus; import com.swirlds.platform.ConsensusImpl; @@ -126,8 +125,8 @@ public TestIntake(@NonNull final AddressBook addressBook, @NonNull final Consens final EventObserverDispatcher dispatcher = new EventObserverDispatcher(new ShadowGraphEventObserver(shadowGraph), output); - final LinkedEventIntake linkedEventIntake = new LinkedEventIntake( - () -> consensus, dispatcher, shadowGraph, intakeEventCounter, mock(StandardOutputWire.class)); + final LinkedEventIntake linkedEventIntake = + new LinkedEventIntake(() -> consensus, dispatcher, shadowGraph, intakeEventCounter); linkedEventIntakeWiring = LinkedEventIntakeWiring.create(schedulers.linkedEventIntakeScheduler()); linkedEventIntakeWiring.bind(linkedEventIntake);