From fe1483b02d58b624b2797851f5475ae751307b26 Mon Sep 17 00:00:00 2001 From: Austin Littley Date: Thu, 26 Oct 2023 16:24:40 -0400 Subject: [PATCH 1/5] Implement LinkedEventIntake Signed-off-by: Austin Littley --- .../components/LinkedEventIntake.java | 226 ++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java new file mode 100644 index 000000000000..c66a7f6afe3b --- /dev/null +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java @@ -0,0 +1,226 @@ +/* + * Copyright (C) 2021-2023 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.platform.components; + +import static com.swirlds.logging.LogMarker.INTAKE_EVENT; +import static com.swirlds.logging.LogMarker.STALE_EVENTS; + +import com.swirlds.base.time.Time; +import com.swirlds.common.config.EventConfig; +import com.swirlds.common.context.PlatformContext; +import com.swirlds.common.threading.manager.ThreadManager; +import com.swirlds.platform.Consensus; +import com.swirlds.platform.eventhandling.ConsensusRoundHandler; +import com.swirlds.platform.gossip.IntakeEventCounter; +import com.swirlds.platform.gossip.shadowgraph.ShadowGraph; +import com.swirlds.platform.internal.ConsensusRound; +import com.swirlds.platform.internal.EventImpl; +import com.swirlds.platform.observers.EventObserverDispatcher; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This class is responsible for adding events to {@link Consensus} and notifying event observers, including + * {@link ConsensusRoundHandler} and {@link com.swirlds.platform.eventhandling.PreConsensusEventHandler}. + *

+ * This class differs from {@link EventIntake} in that it accepts events that have already been linked with their + * parents. This version of event intake was written to be compatible with the new intake pipeline, whereas + * {@link EventIntake} works with the legacy intake monolith. + */ +public class LinkedEventIntake { + private static final Logger logger = LogManager.getLogger(LinkedEventIntake.class); + + /** + * A functor that provides access to a {@code Consensus} instance. + */ + private final Supplier consensusSupplier; + + /** + * An {@link EventObserverDispatcher} instance + */ + private final EventObserverDispatcher dispatcher; + + /** + * Stores events, expires them, provides event lookup methods + */ + private final ShadowGraph shadowGraph; + + private final ExecutorService prehandlePool; + private final Consumer prehandleEvent; + + private final EventIntakeMetrics metrics; + private final Time time; + + /** + * Tracks the number of events from each peer have been received, but aren't yet through the intake pipeline + */ + private final IntakeEventCounter intakeEventCounter; + + /** + * Constructor + * + * @param platformContext the platform context + * @param threadManager creates new threading resources + * @param time provides the wall clock time + * @param consensusSupplier provides the current consensus instance + * @param dispatcher invokes event related callbacks + * @param shadowGraph tracks events in the hashgraph + * @param prehandleEvent prehandles transactions in an event + * @param intakeEventCounter tracks the number of events from each peer that are currently in the intake pipeline + */ + public LinkedEventIntake( + @NonNull final PlatformContext platformContext, + @NonNull final ThreadManager threadManager, + @NonNull final Time time, + @NonNull final Supplier consensusSupplier, + @NonNull final EventObserverDispatcher dispatcher, + @NonNull final ShadowGraph shadowGraph, + @NonNull final Consumer prehandleEvent, + @NonNull final IntakeEventCounter intakeEventCounter) { + + this.time = Objects.requireNonNull(time); + this.consensusSupplier = Objects.requireNonNull(consensusSupplier); + this.dispatcher = Objects.requireNonNull(dispatcher); + this.shadowGraph = Objects.requireNonNull(shadowGraph); + this.prehandleEvent = Objects.requireNonNull(prehandleEvent); + this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter); + + final EventConfig eventConfig = platformContext.getConfiguration().getConfigData(EventConfig.class); + + final BlockingQueue prehandlePoolQueue = new LinkedBlockingQueue<>(); + + prehandlePool = new ThreadPoolExecutor( + eventConfig.prehandlePoolSize(), + eventConfig.prehandlePoolSize(), + 0L, + TimeUnit.MILLISECONDS, + prehandlePoolQueue, + threadManager.createThreadFactory("platform", "txn-prehandle")); + + metrics = new EventIntakeMetrics(platformContext, prehandlePoolQueue::size); + } + + /** + * Add an event to the hashgraph + * + * @param event an event to be added + */ + public void addEvent(final EventImpl event) { + try { + // an expired event will cause ShadowGraph to throw an exception, so we just to discard it + if (consensusSupplier.get().isExpired(event)) { + return; + } + + dispatcher.preConsensusEvent(event); + + logger.debug(INTAKE_EVENT.getMarker(), "Adding {} ", event::toShortString); + final long minGenNonAncientBeforeAdding = consensusSupplier.get().getMinGenerationNonAncient(); + + // Prehandle transactions on the thread pool. + prehandlePool.submit(buildPrehandleTask(event)); + + // record the event in the hashgraph, which results in the events in consEvent reaching consensus + final List consensusRounds = consensusSupplier.get().addEvent(event); + + dispatcher.eventAdded(event); + + if (consensusRounds != null) { + consensusRounds.forEach(this::handleConsensus); + } + + if (consensusSupplier.get().getMinGenerationNonAncient() > minGenNonAncientBeforeAdding) { + // consensus rounds can be null and the minNonAncient might change, this is probably because of a round + // with no consensus events, so we check the diff in generations to look for stale events + handleStale(minGenNonAncientBeforeAdding); + } + } finally { + intakeEventCounter.eventExitedIntakePipeline(event.getBaseEvent().getSenderId()); + } + } + + /** + * Build a task that will prehandle transactions in an event. Executed on a thread pool. + * + * @param event the event to prehandle + */ + @NonNull + private Runnable buildPrehandleTask(@NonNull final EventImpl event) { + return () -> { + prehandleEvent.accept(event); + event.signalPrehandleCompletion(); + }; + } + + /** + * Notify observer of stale events, of all event in the consensus stale event queue. + * + * @param previousGenerationNonAncient the previous minimum generation of non-ancient events + */ + private void handleStale(final long previousGenerationNonAncient) { + // find all events that just became ancient and did not reach consensus, these events will be considered stale + final Collection staleEvents = shadowGraph.findByGeneration( + previousGenerationNonAncient, + consensusSupplier.get().getMinGenerationNonAncient(), + LinkedEventIntake::isNotConsensus); + + for (final EventImpl staleEvent : staleEvents) { + staleEvent.setStale(true); + dispatcher.staleEvent(staleEvent); + logger.warn(STALE_EVENTS.getMarker(), "Stale event {}", staleEvent::toShortString); + } + } + + /** + * Returns true if the event has not reached consensus + * + * @param event the event to check + * @return true if the event has not reached consensus + */ + private static boolean isNotConsensus(final EventImpl event) { + return !event.isConsensus(); + } + + /** + * Notify observers that an event has reach consensus. + * + * @param consensusRound the new consensus round + */ + private void handleConsensus(final @NonNull ConsensusRound consensusRound) { + // We need to wait for prehandles to finish before proceeding. + // It is critically important that prehandle is always called prior to handleConsensusRound(). + + final long start = time.nanoTime(); + consensusRound.forEach(event -> ((EventImpl) event).awaitPrehandleCompletion()); + final long end = time.nanoTime(); + metrics.reportTimeWaitedForPrehandlingTransaction(end - start); + + dispatcher.consensusRound(consensusRound); + } +} From c016677880a86de2e5a8e6049aa81c9fc70e3551 Mon Sep 17 00:00:00 2001 From: Austin Littley Date: Thu, 26 Oct 2023 16:50:35 -0400 Subject: [PATCH 2/5] Apply spotless Signed-off-by: Austin Littley --- .../java/com/swirlds/platform/components/LinkedEventIntake.java | 1 - 1 file changed, 1 deletion(-) diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java index c66a7f6afe3b..7bc035bbefd4 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java @@ -31,7 +31,6 @@ import com.swirlds.platform.internal.EventImpl; import com.swirlds.platform.observers.EventObserverDispatcher; import edu.umd.cs.findbugs.annotations.NonNull; -import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Collection; import java.util.List; import java.util.Objects; From 765126ee09ada8e84b12e5c21ebadf9b22ef9ace Mon Sep 17 00:00:00 2001 From: Austin Littley Date: Thu, 2 Nov 2023 14:57:04 -0400 Subject: [PATCH 3/5] Fix various bugs Signed-off-by: Austin Littley --- .../components/LinkedEventIntake.java | 4 +- .../deduplication/EventDeduplicator.java | 9 ++-- .../platform/event/orphan/ParentIterator.java | 54 ++++++++++--------- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java index 7bc035bbefd4..61f7482a494d 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java @@ -16,8 +16,8 @@ package com.swirlds.platform.components; -import static com.swirlds.logging.LogMarker.INTAKE_EVENT; -import static com.swirlds.logging.LogMarker.STALE_EVENTS; +import static com.swirlds.logging.legacy.LogMarker.INTAKE_EVENT; +import static com.swirlds.logging.legacy.LogMarker.STALE_EVENTS; import com.swirlds.base.time.Time; import com.swirlds.common.config.EventConfig; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/deduplication/EventDeduplicator.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/deduplication/EventDeduplicator.java index 1582b04050be..806649fa57b8 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/deduplication/EventDeduplicator.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/deduplication/EventDeduplicator.java @@ -26,6 +26,7 @@ import com.swirlds.platform.event.GossipEvent; import com.swirlds.platform.gossip.IntakeEventCounter; import edu.umd.cs.findbugs.annotations.NonNull; +import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Objects; import java.util.Set; @@ -42,7 +43,7 @@ public class EventDeduplicator { /** * Avoid the creation of lambdas for Map.computeIfAbsent() by reusing this lambda. */ - private static final Function> NEW_HASH_SET = ignored -> new HashSet<>(); + private static final Function> NEW_HASH_SET = ignored -> new HashSet<>(); /** * Initial capacity of {@link #observedEvents}. @@ -67,7 +68,7 @@ public class EventDeduplicator { /** * A map from event descriptor to a set of signatures that have been received for that event. */ - private final SequenceMap> observedEvents = + private final SequenceMap> observedEvents = new StandardSequenceMap<>(0, INITIAL_CAPACITY, true, EventDescriptor::getGeneration); private static final LongAccumulator.Config DUPLICATE_EVENT_CONFIG = new LongAccumulator.Config( @@ -117,8 +118,8 @@ public void handleEvent(@NonNull final GossipEvent event) { return; } - final Set signatures = observedEvents.computeIfAbsent(event.getDescriptor(), NEW_HASH_SET); - if (signatures.add(event.getUnhashedData().getSignature())) { + final Set signatures = observedEvents.computeIfAbsent(event.getDescriptor(), NEW_HASH_SET); + if (signatures.add(ByteBuffer.wrap(event.getUnhashedData().getSignature()))) { if (signatures.size() != 1) { // signature is unique, but descriptor is not disparateSignatureAccumulator.update(1); diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java index 3984107cc149..eb326893db60 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java @@ -19,7 +19,9 @@ import com.swirlds.common.system.events.EventDescriptor; import com.swirlds.platform.event.GossipEvent; import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; /** @@ -27,26 +29,37 @@ * for the current binary event parentage and the future n-ary event parentage. */ class ParentIterator implements Iterator { - - private final EventDescriptor selfParent; - private final EventDescriptor otherParent; + /** + * The number of parents that have been returned so far. + */ private int returnedEvents; + /** + * The parents of the event. + */ + private final List parents; + /** * Constructor. * * @param event the event whose parents we want to iterate over */ - public ParentIterator(@NonNull final GossipEvent event) { - selfParent = new EventDescriptor( - event.getHashedData().getSelfParentHash(), - event.getHashedData().getCreatorId(), - event.getHashedData().getSelfParentGen()); + ParentIterator(@NonNull final GossipEvent event) { + parents = new ArrayList<>(); - otherParent = new EventDescriptor( - event.getHashedData().getOtherParentHash(), - event.getUnhashedData().getOtherId(), - event.getHashedData().getOtherParentGen()); + if (event.getHashedData().hasSelfParent()) { + parents.add(new EventDescriptor( + event.getHashedData().getSelfParentHash(), + event.getHashedData().getCreatorId(), + event.getHashedData().getSelfParentGen())); + } + + if (event.getHashedData().hasOtherParent()) { + parents.add(new EventDescriptor( + event.getHashedData().getOtherParentHash(), + event.getUnhashedData().getOtherId(), + event.getHashedData().getOtherParentGen())); + } } /** @@ -54,7 +67,7 @@ public ParentIterator(@NonNull final GossipEvent event) { */ @Override public boolean hasNext() { - return returnedEvents < 2; + return returnedEvents < parents.size(); } /** @@ -67,16 +80,9 @@ public EventDescriptor next() { throw new NoSuchElementException(); } - switch (returnedEvents) { - case 0 -> { - returnedEvents++; - return selfParent; - } - case 1 -> { - returnedEvents++; - return otherParent; - } - default -> throw new NoSuchElementException(); - } + final int indexToReturn = returnedEvents; + returnedEvents++; + + return parents.get(indexToReturn); } } From 805b5e58958c6c02a1d95989eb860560e289ece0 Mon Sep 17 00:00:00 2001 From: Austin Littley Date: Thu, 2 Nov 2023 15:53:57 -0400 Subject: [PATCH 4/5] Fix unit test Signed-off-by: Austin Littley --- .../com/swirlds/platform/event/orphan/ParentIterator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java index eb326893db60..2fd45d7e6e42 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/ParentIterator.java @@ -47,14 +47,14 @@ class ParentIterator implements Iterator { ParentIterator(@NonNull final GossipEvent event) { parents = new ArrayList<>(); - if (event.getHashedData().hasSelfParent()) { + if (event.getHashedData().getSelfParentHash() != null) { parents.add(new EventDescriptor( event.getHashedData().getSelfParentHash(), event.getHashedData().getCreatorId(), event.getHashedData().getSelfParentGen())); } - if (event.getHashedData().hasOtherParent()) { + if (event.getHashedData().getOtherParentHash() != null) { parents.add(new EventDescriptor( event.getHashedData().getOtherParentHash(), event.getUnhashedData().getOtherId(), From 6fe9cd0106f8c521a7819ea76813aecca5a87dd9 Mon Sep 17 00:00:00 2001 From: Austin Littley Date: Fri, 3 Nov 2023 12:34:13 -0400 Subject: [PATCH 5/5] Address PR comments Signed-off-by: Austin Littley --- .../components/LinkedEventIntake.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java index 61f7482a494d..7bbce513ac5a 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java @@ -16,9 +16,6 @@ package com.swirlds.platform.components; -import static com.swirlds.logging.legacy.LogMarker.INTAKE_EVENT; -import static com.swirlds.logging.legacy.LogMarker.STALE_EVENTS; - import com.swirlds.base.time.Time; import com.swirlds.common.config.EventConfig; import com.swirlds.common.context.PlatformContext; @@ -41,8 +38,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * This class is responsible for adding events to {@link Consensus} and notifying event observers, including @@ -53,8 +48,6 @@ * {@link EventIntake} works with the legacy intake monolith. */ public class LinkedEventIntake { - private static final Logger logger = LogManager.getLogger(LinkedEventIntake.class); - /** * A functor that provides access to a {@code Consensus} instance. */ @@ -130,16 +123,17 @@ public LinkedEventIntake( * * @param event an event to be added */ - public void addEvent(final EventImpl event) { + public void addEvent(@NonNull final EventImpl event) { + Objects.requireNonNull(event); + try { - // an expired event will cause ShadowGraph to throw an exception, so we just to discard it - if (consensusSupplier.get().isExpired(event)) { + if (event.getGeneration() < consensusSupplier.get().getMinGenerationNonAncient()) { + // ancient events *may* be discarded, and stale events *must* be discarded return; } dispatcher.preConsensusEvent(event); - logger.debug(INTAKE_EVENT.getMarker(), "Adding {} ", event::toShortString); final long minGenNonAncientBeforeAdding = consensusSupplier.get().getMinGenerationNonAncient(); // Prehandle transactions on the thread pool. @@ -192,7 +186,6 @@ private void handleStale(final long previousGenerationNonAncient) { for (final EventImpl staleEvent : staleEvents) { staleEvent.setStale(true); dispatcher.staleEvent(staleEvent); - logger.warn(STALE_EVENTS.getMarker(), "Stale event {}", staleEvent::toShortString); } } @@ -202,7 +195,7 @@ private void handleStale(final long previousGenerationNonAncient) { * @param event the event to check * @return true if the event has not reached consensus */ - private static boolean isNotConsensus(final EventImpl event) { + private static boolean isNotConsensus(@NonNull final EventImpl event) { return !event.isConsensus(); }