Skip to content

Commit

Permalink
Out of Order Gossip Simulator (#6535)
Browse files Browse the repository at this point in the history
Signed-off-by: Kelly Greco <kelly@swirldslabs.com>
  • Loading branch information
poulok authored and iwsimon committed May 19, 2023
1 parent 83afe58 commit 9f1a8e0
Show file tree
Hide file tree
Showing 51 changed files with 3,279 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.swirlds.common.utility;

import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;

/**
Expand All @@ -25,24 +26,29 @@ public final class DurationUtils {
private DurationUtils() {}

/**
* @param a
* the first duration to compare
* @param b
* the second duration to compare
* @param a the first duration to compare
* @param b the second duration to compare
* @return true if 'a' is longer than 'b', false if 'a' is shorter or equal
*/
public static boolean isLonger(final Duration a, final Duration b) {
public static boolean isLonger(@NonNull final Duration a, @NonNull final Duration b) {
return a.compareTo(b) > 0;
}

/**
* @param a
* the first duration
* @param b
* the second duration
* @param a the first duration to compare
* @param b the second duration to compare
* @return true if 'a' is shorter than 'b', false if 'a' is shorter or equal
*/
public static boolean isShorter(@NonNull final Duration a, @NonNull final Duration b) {
return a.compareTo(b) < 0;
}

/**
* @param a the first duration
* @param b the second duration
* @return the longer of the two durations
*/
public static Duration max(final Duration a, final Duration b) {
public static Duration max(@NonNull final Duration a, @NonNull final Duration b) {
return isLonger(a, b) ? a : b;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
import com.swirlds.platform.gossip.chatter.communication.ChatterProtocol;
import com.swirlds.platform.gossip.chatter.config.ChatterConfig;
import com.swirlds.platform.gossip.chatter.protocol.ChatterCore;
import com.swirlds.platform.gossip.chatter.protocol.messages.ChatterEventDescriptor;
import com.swirlds.platform.gossip.chatter.protocol.messages.EventDescriptor;
import com.swirlds.platform.gossip.chatter.protocol.peer.PeerInstance;
import com.swirlds.platform.gossip.shadowgraph.ShadowGraph;
import com.swirlds.platform.gossip.shadowgraph.ShadowGraphEventObserver;
Expand Down Expand Up @@ -709,7 +709,7 @@ public class SwirldsPlatform implements Platform, ConnectionTracker, Startable {

final ParentFinder parentFinder = new ParentFinder(shadowGraph::hashgraphEvent);

final List<Predicate<ChatterEventDescriptor>> isDuplicateChecks = new ArrayList<>();
final List<Predicate<EventDescriptor>> isDuplicateChecks = new ArrayList<>();
isDuplicateChecks.add(d -> shadowGraph.isHashInGraph(d.getHash()));
if (chatterConfig.useChatter()) {
final OrphanBufferingLinker orphanBuffer = new OrphanBufferingLinker(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022-2023 Hedera Hashgraph, LLC
* Copyright (C) 2016-2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import com.swirlds.platform.EventStrings;
import com.swirlds.platform.gossip.chatter.protocol.messages.ChatterEvent;
import com.swirlds.platform.gossip.chatter.protocol.messages.ChatterEventDescriptor;
import com.swirlds.platform.gossip.chatter.protocol.messages.EventDescriptor;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
Expand All @@ -44,10 +45,8 @@ public class GossipEvent implements EventIntakeTask, BaseEvent, ChatterEvent {
public GossipEvent() {}

/**
* @param hashedData
* the hashed data for the event
* @param unhashedData
* the unhashed data for the event
* @param hashedData the hashed data for the event
* @param unhashedData the unhashed data for the event
*/
public GossipEvent(final BaseEventHashedData hashedData, final BaseEventUnhashedData unhashedData) {
this.hashedData = hashedData;
Expand Down Expand Up @@ -96,7 +95,7 @@ public BaseEventUnhashedData getUnhashedData() {
* {@inheritDoc}
*/
@Override
public ChatterEventDescriptor getDescriptor() {
public EventDescriptor getDescriptor() {
return descriptor;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022-2023 Hedera Hashgraph, LLC
* Copyright (C) 2016-2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@
import com.swirlds.logging.LogMarker;
import com.swirlds.platform.consensus.GraphGenerations;
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.gossip.chatter.protocol.messages.ChatterEventDescriptor;
import com.swirlds.platform.gossip.chatter.protocol.messages.EventDescriptor;
import com.swirlds.platform.internal.EventImpl;
import com.swirlds.platform.state.signed.SignedState;
import java.util.ArrayDeque;
Expand All @@ -44,25 +44,22 @@ public class OrphanBufferingLinker extends AbstractEventLinker {
private final Queue<EventImpl> eventOutput;
private final Queue<EventImpl> newlyLinkedEvents;
private final SequenceMap<ParentDescriptor, Set<ChildEvent>> missingParents;
private final SequenceMap<ChatterEventDescriptor, ChildEvent> orphanMap;
private final SequenceMap<EventDescriptor, ChildEvent> orphanMap;

/**
* Create a new orphan buffer.
*
* @param config
* consensus configuration
* @param parentFinder
* responsible for finding parents of an event
* @param futureGenerationLimit
* the maximum number of future generations we are willing to store
* @param config consensus configuration
* @param parentFinder responsible for finding parents of an event
* @param futureGenerationLimit the maximum number of future generations we are willing to store
*/
public OrphanBufferingLinker(
final ConsensusConfig config, final ParentFinder parentFinder, final int futureGenerationLimit) {
super(config);
this.parentFinder = parentFinder;
this.eventOutput = new ArrayDeque<>();
this.newlyLinkedEvents = new ArrayDeque<>();
this.orphanMap = new StandardSequenceMap<>(0, futureGenerationLimit, ChatterEventDescriptor::getGeneration);
this.orphanMap = new StandardSequenceMap<>(0, futureGenerationLimit, EventDescriptor::getGeneration);
this.missingParents = new StandardSequenceMap<>(0, futureGenerationLimit, ParentDescriptor::generation);
}

Expand All @@ -79,7 +76,7 @@ private static void parentNoLongerMissing(final ChildEvent child, final Hash par
}
}

private static void orphanPurged(final ChatterEventDescriptor key, final ChildEvent orphan) {
private static void orphanPurged(final EventDescriptor key, final ChildEvent orphan) {
// this should never happen. an events parents should become ancient and at that point it will no longer be an
// orphan
if (orphan == null) {
Expand Down Expand Up @@ -200,11 +197,10 @@ private void parentPurged(final ParentDescriptor purgedParent, final Set<ChildEv
/**
* Is the event described an orphan we are keeping in the buffer
*
* @param descriptor
* the event descriptor
* @param descriptor the event descriptor
* @return true if the event is an orphan this linker is buffering
*/
public boolean isOrphan(final ChatterEventDescriptor descriptor) {
public boolean isOrphan(final EventDescriptor descriptor) {
return orphanMap.get(descriptor) != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.swirlds.platform.event.validation;

import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.gossip.chatter.protocol.messages.ChatterEventDescriptor;
import com.swirlds.platform.gossip.chatter.protocol.messages.EventDescriptor;
import com.swirlds.platform.metrics.EventIntakeMetrics;
import java.util.List;
import java.util.function.Predicate;
Expand All @@ -26,18 +26,19 @@
* A {@link GossipEventValidator} that checks a list of predicates to see if this event is a duplicate
*/
public class EventDeduplication implements GossipEventValidator {
private final Predicate<ChatterEventDescriptor> isDuplicate;

private final Predicate<EventDescriptor> isDuplicate;
private final EventIntakeMetrics stats;

public EventDeduplication(
final Predicate<ChatterEventDescriptor> isDuplicateCheck, final EventIntakeMetrics stats) {
public EventDeduplication(final Predicate<EventDescriptor> isDuplicateCheck, final EventIntakeMetrics stats) {
this(List.of(isDuplicateCheck), stats);
}

public EventDeduplication(
final List<Predicate<ChatterEventDescriptor>> isDuplicateChecks, final EventIntakeMetrics stats) {
Predicate<ChatterEventDescriptor> chain = null;
for (final Predicate<ChatterEventDescriptor> check : isDuplicateChecks) {
final List<Predicate<EventDescriptor>> isDuplicateChecks, final EventIntakeMetrics stats) {
Predicate<EventDescriptor> chain = null;
for (final Predicate<EventDescriptor> check : isDuplicateChecks) {

if (chain == null) {
chain = check;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.swirlds.platform.gossip.chatter.protocol.input.InputDelegateBuilder;
import com.swirlds.platform.gossip.chatter.protocol.input.MessageTypeHandlerBuilder;
import com.swirlds.platform.gossip.chatter.protocol.messages.ChatterEvent;
import com.swirlds.platform.gossip.chatter.protocol.messages.ChatterEventDescriptor;
import com.swirlds.platform.gossip.chatter.protocol.messages.EventDescriptor;
import com.swirlds.platform.gossip.chatter.protocol.output.MessageOutput;
import com.swirlds.platform.gossip.chatter.protocol.output.OtherEventDelay;
import com.swirlds.platform.gossip.chatter.protocol.output.PriorityOutputAggregator;
Expand All @@ -45,7 +45,6 @@
import com.swirlds.platform.state.signed.SignedState;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -71,7 +70,7 @@ public class ChatterCore<E extends ChatterEvent> implements Shiftable, LoadableF
private final BiConsumer<NodeId, Long> pingConsumer;
private final MessageOutput<E> selfEventOutput;
private final MessageOutput<E> otherEventOutput;
private final MessageOutput<ChatterEventDescriptor> hashOutput;
private final MessageOutput<EventDescriptor> hashOutput;
private final Map<Long, PeerInstance> peerInstances;

private final CountPerSecond msgsPerSecRead;
Expand Down Expand Up @@ -128,6 +127,7 @@ public ChatterCore(

/**
* Creates an instance that will handle all communication with a peer
* <p>
*
* @param peerId the peer's ID
* @param eventHandler a handler that will send the event outside of chatter
Expand Down Expand Up @@ -156,7 +156,7 @@ public void newPeerInstance(final long peerId, final MessageHandler<E> eventHand
processingTimeSendReceive::getPeerProcessingTime,
config.otherEventDelay())::getOtherEventDelay,
state,
Instant::now));
time::now));
final PriorityOutputAggregator outputAggregator = new PriorityOutputAggregator(
List.of(
// heartbeat is first so that responses are not delayed
Expand All @@ -172,7 +172,7 @@ public void newPeerInstance(final long peerId, final MessageHandler<E> eventHand
.addHandler(state::handleEvent)
.addHandler(eventHandler)
.build())
.addHandler(MessageTypeHandlerBuilder.builder(ChatterEventDescriptor.class)
.addHandler(MessageTypeHandlerBuilder.builder(EventDescriptor.class)
.addHandler(state::handleDescriptor)
.build())
.addHandler(MessageTypeHandlerBuilder.builder(HeartbeatMessage.class)
Expand Down Expand Up @@ -204,6 +204,7 @@ public Collection<PeerInstance> getPeerInstances() {

/**
* Notify chatter that a new event has been created
* <p>
*
* @param event the new event
*/
Expand All @@ -214,6 +215,7 @@ public void eventCreated(final E event) {

/**
* Notify chatter that an event has been received and validated
* <p>
*
* @param event the event received
*/
Expand All @@ -224,7 +226,7 @@ public void eventReceived(final E event) {
}

private void recordProcessingTime(final E event) {
selfProcessingTime.set(Duration.between(event.getTimeReceived(), Instant.now()));
selfProcessingTime.set(Duration.between(event.getTimeReceived(), time.now()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ private HeartbeatMessage(final long heartbeatId, final boolean response) {
/**
* Create a new heartbeat request
*
* @param heartbeatId
* the ID of the request
* @param heartbeatId the ID of the request
* @return the request message
*/
public static HeartbeatMessage request(final long heartbeatId) {
Expand All @@ -55,8 +54,7 @@ public static HeartbeatMessage request(final long heartbeatId) {
/**
* Create a new heartbeat response
*
* @param heartbeatId
* the ID of the response
* @param heartbeatId the ID of the response
* @return the response message
*/
public static HeartbeatMessage response(final long heartbeatId) {
Expand Down Expand Up @@ -98,4 +96,9 @@ public long getClassId() {
public int getVersion() {
return ClassVersion.ORIGINAL;
}

@Override
public String toString() {
return "HeartbeatMsg(" + heartbeatId + ")";
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2018-2023 Hedera Hashgraph, LLC
* Copyright (C) 2016-2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@ public interface ChatterEvent extends SelfSerializable {
*
* @return the descriptor
*/
ChatterEventDescriptor getDescriptor();
EventDescriptor getDescriptor();

/**
* @return the time at which the event has been received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;

import com.swirlds.common.crypto.Hash;
import com.swirlds.common.io.SelfSerializable;
import com.swirlds.common.io.streams.SerializableDataInputStream;
import com.swirlds.common.io.streams.SerializableDataOutputStream;
import com.swirlds.common.utility.CommonUtils;
Expand All @@ -28,9 +27,9 @@
import org.apache.commons.lang3.builder.ToStringBuilder;

/**
* A stripped down description of an event.
* A stripped down description of a chatter event.
*/
public class ChatterEventDescriptor implements SelfSerializable {
public class ChatterEventDescriptor implements EventDescriptor {

public static final long CLASS_ID = 0x825e17f25c6e2566L;

Expand All @@ -49,12 +48,9 @@ public ChatterEventDescriptor() {}
/**
* Create a new gossip event descriptor.
*
* @param hash
* the hash of the event
* @param creator
* the creator of the event
* @param generation
* the age of an event, smaller is older
* @param hash the hash of the event
* @param creator the creator of the event
* @param generation the age of an event, smaller is older
*/
public ChatterEventDescriptor(final Hash hash, final long creator, final long generation) {
this.hash = Objects.requireNonNull(hash);
Expand Down Expand Up @@ -103,27 +99,21 @@ public int getVersion() {
}

/**
* Get the hash of the event.
*
* @return the event's hash
* {@inheritDoc}
*/
public Hash getHash() {
return hash;
}

/**
* Get the node ID of the event's creator.
*
* @return a node ID
* {@inheritDoc}
*/
public long getCreator() {
return creator;
}

/**
* Get the generation of the event described
*
* @return the generation of the event described
* {@inheritDoc}
*/
public long getGeneration() {
return generation;
Expand Down

0 comments on commit 9f1a8e0

Please sign in to comment.