Skip to content

Commit

Permalink
09674 Rework intake components for framework compatibility (#9706)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <austin@swirldslabs.com>
Signed-off-by: Ivo Yankov <ivo@devlabs.bg>
  • Loading branch information
alittley authored and Ivo-Yankov committed Nov 9, 2023
1 parent f47f20b commit a0dd88a
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.gossip.IntakeEventCounter;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand All @@ -50,11 +50,6 @@ public class EventDeduplicator {
*/
private static final int INITIAL_CAPACITY = 1024;

/**
* Deduplicated events are passed to this consumer.
*/
private final Consumer<GossipEvent> eventConsumer;

/**
* The current minimum generation required for an event to be non-ancient.
*/
Expand Down Expand Up @@ -88,15 +83,11 @@ public class EventDeduplicator {
* Constructor
*
* @param platformContext the platform context
* @param eventConsumer deduplicated events are passed to this consumer
* @param intakeEventCounter keeps track of the number of events in the intake pipeline from each peer
*/
public EventDeduplicator(
@NonNull final PlatformContext platformContext,
@NonNull final Consumer<GossipEvent> eventConsumer,
@NonNull final IntakeEventCounter intakeEventCounter) {
@NonNull final PlatformContext platformContext, @NonNull final IntakeEventCounter intakeEventCounter) {

this.eventConsumer = Objects.requireNonNull(eventConsumer);
this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter);

this.duplicateEventAccumulator = platformContext.getMetrics().getOrCreate(DUPLICATE_EVENT_CONFIG);
Expand All @@ -106,16 +97,17 @@ public EventDeduplicator(
/**
* Handle a potentially duplicate event
* <p>
* Ancient events are ignored. If the input event has not already been observed by this deduplicator, it is passed
* to the event consumer.
* Ancient events are ignored. If the input event has not already been observed by this deduplicator, it is returned.
*
* @param event the event to handle
* @return the event if it is not a duplicate, or null if it is a duplicate
*/
public void handleEvent(@NonNull final GossipEvent event) {
@Nullable
public GossipEvent handleEvent(@NonNull final GossipEvent event) {
if (event.getGeneration() < minimumGenerationNonAncient) {
// Ancient events can be safely ignored.
intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());
return;
return null;
}

final Set<ByteBuffer> signatures = observedEvents.computeIfAbsent(event.getDescriptor(), NEW_HASH_SET);
Expand All @@ -125,11 +117,13 @@ public void handleEvent(@NonNull final GossipEvent event) {
disparateSignatureAccumulator.update(1);
}

eventConsumer.accept(event);
return event;
} else {
// duplicate descriptor and signature
duplicateEventAccumulator.update(1);
intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());

return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import com.swirlds.platform.gossip.IntakeEventCounter;
import com.swirlds.platform.internal.EventImpl;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -68,11 +68,6 @@ public class InOrderLinker {
*/
private final Map<Hash, EventImpl> parentHashMap = new HashMap<>(INITIAL_CAPACITY);

/**
* Linked events are passed to this consumer.
*/
private final Consumer<EventImpl> eventConsumer;

/**
* The current minimum generation required for an event to be non-ancient.
*/
Expand All @@ -86,26 +81,24 @@ public class InOrderLinker {
/**
* Constructor
*
* @param eventConsumer the consumer that successfully linked events are passed to
* @param intakeEventCounter keeps track of the number of events in the intake pipeline from each peer
*/
public InOrderLinker(
@NonNull final Consumer<EventImpl> eventConsumer, @NonNull final IntakeEventCounter intakeEventCounter) {

this.eventConsumer = Objects.requireNonNull(eventConsumer);
public InOrderLinker(@NonNull final IntakeEventCounter intakeEventCounter) {
this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter);
}

/**
* Find and link the parents of the given event.
*
* @param event the event to link
* @return the linked event, or null if linking fails
*/
public void linkEvent(@NonNull final GossipEvent event) {
@Nullable
public EventImpl linkEvent(@NonNull final GossipEvent event) {
if (event.getGeneration() < minimumGenerationNonAncient) {
// This event is ancient, so we don't need to link it.
this.intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());
return;
return null;
}

final BaseEventHashedData hashedData = event.getHashedData();
Expand All @@ -124,7 +117,7 @@ public void linkEvent(@NonNull final GossipEvent event) {
selfParentHash,
selfParentGen);
this.intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());
return;
return null;
}
} else {
// ancient parents don't need to be linked
Expand All @@ -145,7 +138,7 @@ public void linkEvent(@NonNull final GossipEvent event) {
otherParentHash,
otherParentGen);
this.intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());
return;
return null;
}
} else {
// ancient parents don't need to be linked
Expand All @@ -159,7 +152,7 @@ public void linkEvent(@NonNull final GossipEvent event) {
parentDescriptorMap.put(eventDescriptor, linkedEvent);
parentHashMap.put(eventDescriptor.getHash(), linkedEvent);

eventConsumer.accept(linkedEvent);
return linkedEvent;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.security.PublicKey;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -59,11 +58,6 @@ public class EventSignatureValidator {
*/
private final SignatureVerifier signatureVerifier;

/**
* Events with valid signature are passed to this consumer.
*/
private final Consumer<GossipEvent> eventConsumer;

/**
* The previous address book. May be null.
*/
Expand Down Expand Up @@ -109,7 +103,6 @@ public class EventSignatureValidator {
* @param currentSoftwareVersion the current software version
* @param previousAddressBook the previous address book
* @param currentAddressBook the current address book
* @param eventConsumer validated events are passed to this consumer
* @param intakeEventCounter keeps track of the number of events in the intake pipeline from each peer
*/
public EventSignatureValidator(
Expand All @@ -119,7 +112,6 @@ public EventSignatureValidator(
@NonNull final SoftwareVersion currentSoftwareVersion,
@Nullable final AddressBook previousAddressBook,
@NonNull final AddressBook currentAddressBook,
@NonNull final Consumer<GossipEvent> eventConsumer,
@NonNull final IntakeEventCounter intakeEventCounter) {

Objects.requireNonNull(time);
Expand All @@ -128,7 +120,6 @@ public EventSignatureValidator(
this.currentSoftwareVersion = Objects.requireNonNull(currentSoftwareVersion);
this.previousAddressBook = previousAddressBook;
this.currentAddressBook = Objects.requireNonNull(currentAddressBook);
this.eventConsumer = Objects.requireNonNull(eventConsumer);
this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter);

this.rateLimitedLogger = new RateLimitedLogger(logger, time, MINIMUM_LOG_PERIOD);
Expand Down Expand Up @@ -224,22 +215,26 @@ private boolean isSignatureValid(@NonNull final GossipEvent event) {
}

/**
* Verify event signature
* Validate event signature
*
* @param event the event to verify the signature of
* @return the event if the signature is valid, otherwise null
*/
public void validateEventSignature(@NonNull final GossipEvent event) {
@Nullable
public GossipEvent validateSignature(@NonNull final GossipEvent event) {
if (event.getGeneration() < minimumGenerationNonAncient) {
// ancient events can be safely ignored
intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());
return;
return null;
}

if (isSignatureValid(event)) {
eventConsumer.accept(event);
return event;
} else {
intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());
validationFailedAccumulator.update(1);

return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.gossip.IntakeEventCounter;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -53,11 +53,6 @@ public class InternalEventValidator {
*/
private final boolean singleNodeNetwork;

/**
* Valid events are passed to this consumer.
*/
private final Consumer<GossipEvent> eventConsumer;

/**
* Keeps track of the number of events in the intake pipeline from each peer
*/
Expand Down Expand Up @@ -87,20 +82,17 @@ public class InternalEventValidator {
* @param platformContext the platform context
* @param time a time object, for rate limiting logging
* @param singleNodeNetwork true if this node is in a single-node network, otherwise false
* @param eventConsumer validated events are passed to this consumer
* @param intakeEventCounter keeps track of the number of events in the intake pipeline from each peer
*/
public InternalEventValidator(
@NonNull final PlatformContext platformContext,
@NonNull final Time time,
final boolean singleNodeNetwork,
@NonNull final Consumer<GossipEvent> eventConsumer,
@NonNull final IntakeEventCounter intakeEventCounter) {

Objects.requireNonNull(time);

this.singleNodeNetwork = singleNodeNetwork;
this.eventConsumer = Objects.requireNonNull(eventConsumer);
this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter);

this.transactionConfig = platformContext.getConfiguration().getConfigData(TransactionConfig.class);
Expand Down Expand Up @@ -271,18 +263,22 @@ private boolean isEventGenerationValid(@NonNull final GossipEvent event) {
/**
* Validate the internal data integrity of an event.
* <p>
* If the event is determined to be valid, it is passed to the event consumer.
* If the event is determined to be valid, it is returned.
*
* @param event the event to validate
* @return the event if it is valid, otherwise null
*/
public void handleEvent(@NonNull final GossipEvent event) {
@Nullable
public GossipEvent validateEvent(@NonNull final GossipEvent event) {
if (areRequiredFieldsNonNull(event)
&& isTransactionByteCountValid(event)
&& areParentsInternallyConsistent(event)
&& isEventGenerationValid(event)) {
eventConsumer.accept(event);
return event;
} else {
intakeEventCounter.eventExitedIntakePipeline(event.getSenderId());

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public OutputWire<GossipEvent> getEventOutput() {
* @param eventSignatureValidator the event signature validator to bind
*/
public void bind(@NonNull final EventSignatureValidator eventSignatureValidator) {
eventInput.bind(eventSignatureValidator::validateEventSignature);
eventInput.bind(eventSignatureValidator::validateSignature);
minimumGenerationNonAncientInput.bind(eventSignatureValidator::setMinimumGenerationNonAncient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public OutputWire<GossipEvent> getEventOutput() {
* @param internalEventValidator the validator to bind
*/
public void bind(@NonNull final InternalEventValidator internalEventValidator) {
eventInput.bind(internalEventValidator::handleEvent);
eventInput.bind(internalEventValidator::validateEvent);
}
}

0 comments on commit a0dd88a

Please sign in to comment.