Skip to content

Commit

Permalink
feat: migrate event creation to new wiring framework (#10236)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@swirldslabs.com>
  • Loading branch information
cody-littley committed Dec 11, 2023
1 parent 23a28ea commit 6353aeb
Show file tree
Hide file tree
Showing 19 changed files with 780 additions and 95 deletions.
2 changes: 1 addition & 1 deletion platform-sdk/sdk/settings.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ prometheus.endpointEnabled, false
# Misc. #
#############################

loadKeysFromPfxFiles, false
loadKeysFromPfxFiles, false
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
* Contains configuration values for the platform schedulers.
*
* @param internalEventValidatorSchedulerType the internal event validator scheduler type
* @param internalEventValidatorUnhandledCapacity number of unhandled events allowed in the internal event validator scheduler
* @param internalEventValidatorUnhandledCapacity number of unhandled events allowed in the internal event validator
* scheduler
* @param eventDeduplicatorSchedulerType the event deduplicator scheduler type
* @param eventDeduplicatorUnhandledCapacity number of unhandled tasks allowed in the event deduplicator scheduler
* @param eventDeduplicatorUnhandledCapacity number of unhandled tasks allowed in the event deduplicator
* scheduler
* @param eventSignatureValidatorSchedulerType the event signature validator scheduler type
* @param eventSignatureValidatorUnhandledCapacity number of unhandled tasks allowed in the event signature validator scheduler
* @param eventSignatureValidatorUnhandledCapacity number of unhandled tasks allowed in the event signature validator
* scheduler
* @param orphanBufferSchedulerType the orphan buffer scheduler type
* @param orphanBufferUnhandledCapacity number of unhandled tasks allowed in the orphan buffer scheduler
* @param inOrderLinkerSchedulerType the in-order linker scheduler type
* @param inOrderLinkerUnhandledCapacity number of unhandled tasks allowed in the in-order linker scheduler
* @param linkedEventIntakeSchedulerType the linked event intake scheduler type
* @param linkedEventIntakeUnhandledCapacity number of unhandled tasks allowed in the linked event intake scheduler
* @param linkedEventIntakeUnhandledCapacity number of unhandled tasks allowed in the linked event intake
* scheduler
* @param eventCreationManagerSchedulerType the event creation manager scheduler type
* @param eventCreationManagerUnhandledCapacity number of unhandled tasks allowed in the event creation manager
* scheduler
* @param signedStateFileManagerSchedulerType the signed state file manager scheduler type
* @param signedStateFileManagerUnhandledCapacity number of unhandled tasks allowed in the signed state file manager scheduler
* @param signedStateFileManagerUnhandledCapacity number of unhandled tasks allowed in the signed state file manager
* scheduler
*/
@ConfigData("platformSchedulers")
public record PlatformSchedulersConfig(
Expand All @@ -53,6 +61,8 @@ public record PlatformSchedulersConfig(
@ConfigProperty(defaultValue = "500") int inOrderLinkerUnhandledCapacity,
@ConfigProperty(defaultValue = "SEQUENTIAL") String linkedEventIntakeSchedulerType,
@ConfigProperty(defaultValue = "500") int linkedEventIntakeUnhandledCapacity,
@ConfigProperty(defaultValue = "SEQUENTIAL") String eventCreationManagerSchedulerType,
@ConfigProperty(defaultValue = "500") int eventCreationManagerUnhandledCapacity,
@ConfigProperty(defaultValue = "SEQUENTIAL_THREAD") String signedStateFileManagerSchedulerType,
@ConfigProperty(defaultValue = "20") int signedStateFileManagerUnhandledCapacity) {

Expand All @@ -61,6 +71,7 @@ public record PlatformSchedulersConfig(
*
* @return the internal event validator scheduler type
*/
@NonNull
public TaskSchedulerType getInternalEventValidatorSchedulerType() {
return TaskSchedulerType.valueOf(internalEventValidatorSchedulerType);
}
Expand All @@ -70,6 +81,7 @@ public TaskSchedulerType getInternalEventValidatorSchedulerType() {
*
* @return the event deduplicator scheduler type
*/
@NonNull
public TaskSchedulerType getEventDeduplicatorSchedulerType() {
return TaskSchedulerType.valueOf(eventDeduplicatorSchedulerType);
}
Expand All @@ -79,6 +91,7 @@ public TaskSchedulerType getEventDeduplicatorSchedulerType() {
*
* @return the event signature validator scheduler type
*/
@NonNull
public TaskSchedulerType getEventSignatureValidatorSchedulerType() {
return TaskSchedulerType.valueOf(eventSignatureValidatorSchedulerType);
}
Expand All @@ -88,6 +101,7 @@ public TaskSchedulerType getEventSignatureValidatorSchedulerType() {
*
* @return the orphan buffer scheduler type
*/
@NonNull
public TaskSchedulerType getOrphanBufferSchedulerType() {
return TaskSchedulerType.valueOf(orphanBufferSchedulerType);
}
Expand All @@ -97,6 +111,7 @@ public TaskSchedulerType getOrphanBufferSchedulerType() {
*
* @return the in-order linker scheduler type
*/
@NonNull
public TaskSchedulerType getInOrderLinkerSchedulerType() {
return TaskSchedulerType.valueOf(inOrderLinkerSchedulerType);
}
Expand All @@ -106,10 +121,21 @@ public TaskSchedulerType getInOrderLinkerSchedulerType() {
*
* @return the linked event intake scheduler type
*/
@NonNull
public TaskSchedulerType getLinkedEventIntakeSchedulerType() {
return TaskSchedulerType.valueOf(linkedEventIntakeSchedulerType);
}

/**
* Get the event creation manager scheduler type
*
* @return the event creation manager scheduler type
*/
@NonNull
public TaskSchedulerType getEventCreationManagerSchedulerType() {
return TaskSchedulerType.valueOf(eventCreationManagerSchedulerType);
}

/**
* Get the signed state file manager scheduler type
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.swirlds.common.wiring.model.internal;

import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
import static com.swirlds.logging.legacy.LogMarker.STARTUP;

import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Set;
Expand All @@ -43,6 +44,7 @@ public static boolean checkForUnboundInputWires(
@NonNull final Set<InputWireDescriptor> inputWires,
@NonNull final Set<InputWireDescriptor> boundInputWires) {
if (inputWires.size() == boundInputWires.size()) {
logger.info(STARTUP.getMarker(), "All input wires have been bound.");
return false;
}

Expand All @@ -51,11 +53,11 @@ public static boolean checkForUnboundInputWires(
for (final InputWireDescriptor inputWire : inputWires) {
if (!boundInputWires.contains(inputWire)) {
sb.append(" - ")
.append("Input wire ")
.append("Input wire '")
.append(inputWire.name())
.append(" in scheduler ")
.append("' in scheduler '")
.append(inputWire.taskSchedulerName())
.append("\n");
.append("'\n");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.swirlds.logging.legacy.LogMarker.RECONNECT;
import static com.swirlds.logging.legacy.LogMarker.STARTUP;
import static com.swirlds.platform.event.creation.EventCreationManagerFactory.buildEventCreationManager;
import static com.swirlds.platform.event.creation.EventCreationManagerFactory.buildLegacyEventCreationManager;
import static com.swirlds.platform.state.address.AddressBookMetrics.registerAddressBookMetrics;
import static com.swirlds.platform.state.iss.ConsensusHashManager.DO_NOT_IGNORE_ROUNDS;
import static com.swirlds.platform.state.signed.SignedStateFileReader.getSavedStateFiles;
Expand Down Expand Up @@ -83,6 +84,7 @@
import com.swirlds.platform.event.EventUtils;
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.event.creation.AsyncEventCreationManager;
import com.swirlds.platform.event.creation.EventCreationManager;
import com.swirlds.platform.event.deduplication.EventDeduplicator;
import com.swirlds.platform.event.linking.EventLinker;
import com.swirlds.platform.event.linking.InOrderLinker;
Expand Down Expand Up @@ -503,8 +505,7 @@ public class SwirldsPlatform implements Platform {
final WiringModel model = WiringModel.create(platformContext, Time.getCurrent());
components.add(model);

platformWiring = new PlatformWiring(platformContext, time);
components.add(platformWiring);
platformWiring = components.add(new PlatformWiring(platformContext, time));
platformWiring.wireExternalComponents(
preconsensusEventWriter, platformStatusManager, appCommunicationComponent);

Expand Down Expand Up @@ -731,13 +732,25 @@ public class SwirldsPlatform implements Platform {
preConsensusEventHandler::preconsensusEvent,
intakeEventCounter);

final EventCreationManager eventCreationManager = buildEventCreationManager(
platformContext,
time,
this,
currentAddressBook,
selfId,
appVersion,
transactionPool,
platformStatusManager::getCurrentStatus,
latestReconnectRound::get);

platformWiring.bindIntake(
internalEventValidator,
eventDeduplicator,
eventSignatureValidator,
orphanBuffer,
inOrderLinker,
linkedEventIntake);
linkedEventIntake,
eventCreationManager);

intakeHandler = platformWiring.getEventInput()::put;
}
Expand All @@ -752,19 +765,23 @@ public class SwirldsPlatform implements Platform {
.setMetricsConfiguration(new QueueThreadMetricsConfiguration(metrics).enableMaxSizeMetric())
.build());

eventCreator = buildEventCreationManager(
platformContext,
threadManager,
time,
this,
currentAddressBook,
selfId,
appVersion,
transactionPool,
intakeQueue,
eventObserverDispatcher,
platformStatusManager::getCurrentStatus,
latestReconnectRound::get);
if (eventConfig.useLegacyIntake()) {
eventCreator = buildLegacyEventCreationManager(
platformContext,
threadManager,
time,
this,
currentAddressBook,
selfId,
appVersion,
transactionPool,
intakeQueue,
eventObserverDispatcher,
platformStatusManager::getCurrentStatus,
latestReconnectRound::get);
} else {
eventCreator = null;
}

transactionSubmitter = new SwirldTransactionSubmitter(
platformStatusManager::getCurrentStatus,
Expand Down Expand Up @@ -838,7 +855,12 @@ public class SwirldsPlatform implements Platform {
});
}

final Clearable pauseEventCreation = eventCreator::pauseEventCreation;
final Clearable pauseEventCreation;
if (eventCreator != null) {
pauseEventCreation = eventCreator::pauseEventCreation;
} else {
pauseEventCreation = () -> {};
}

if (eventConfig.useLegacyIntake()) {
clearAllPipelines = new LoggingClearables(
Expand All @@ -853,7 +875,6 @@ public class SwirldsPlatform implements Platform {
clearAllPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(
Pair.of(pauseEventCreation, "eventCreator"),
Pair.of(intakeQueue, "intakeQueue"),
Pair.of(platformWiring, "platformWiring"),
Pair.of(shadowGraph, "shadowGraph"),
Expand Down Expand Up @@ -963,6 +984,12 @@ private void initializeState(@NonNull final SignedState signedState) {
private void loadStateIntoEventCreator(@NonNull final SignedState signedState) {
Objects.requireNonNull(signedState);

if (eventCreator == null) {
// Event creator is null when using the new intake pipeline. New intake pipeline
// is not compatible with old states that contain events that need to be loaded.
return;
}

try {
eventCreator.setMinimumGenerationNonAncient(
signedState.getState().getPlatformState().getPlatformData().getMinimumGenerationNonAncient());
Expand Down Expand Up @@ -1078,10 +1105,9 @@ private void loadReconnectState(final SignedState signedState) {
// from the ones we had before the reconnect
intakeQueue.pause();
try {
if (platformContext
.getConfiguration()
.getConfigData(EventConfig.class)
.useLegacyIntake()) {
final EventConfig eventConfig =
platformContext.getConfiguration().getConfigData(EventConfig.class);
if (eventConfig.useLegacyIntake()) {
eventValidators.replaceValidator(
SignatureValidator.VALIDATOR_NAME,
new SignatureValidator(
Expand Down Expand Up @@ -1131,7 +1157,10 @@ private void loadReconnectState(final SignedState signedState) {
}

gossip.resetFallenBehind();
eventCreator.resumeEventCreation();

if (eventCreator != null) {
eventCreator.resumeEventCreation();
}
}

/**
Expand Down Expand Up @@ -1176,7 +1205,7 @@ private EventLinker buildEventLinker(
/**
* Build the preconsensus event file manager.
*
* @param startingRound the round number of the initial state being loaded into the system
* @param startingRound the round number of the initial state being loaded into the system
*/
@NonNull
private PreconsensusEventFileManager buildPreconsensusEventFileManager(final long startingRound) {
Expand Down Expand Up @@ -1218,10 +1247,12 @@ public void start() {

metrics.start();

// The event creator is intentionally started before replaying the preconsensus event stream.
// This prevents the event creator's intake queue from filling up and blocking. Note that
// this component won't actually create events until the platform has the appropriate status.
eventCreator.start();
if (eventCreator != null) {
// The event creator is intentionally started before replaying the preconsensus event stream.
// This prevents the event creator's intake queue from filling up and blocking. Note that
// this component won't actually create events until the platform has the appropriate status.
eventCreator.start();
}

replayPreconsensusEvents();
gossip.start();
Expand All @@ -1238,7 +1269,9 @@ public void start() {
*/
public void performPcesRecovery() {
components.start();
eventCreator.start();
if (eventCreator != null) {
eventCreator.start();
}
replayPreconsensusEvents();
stateManagementComponent.dumpLatestImmutableState(StateToDiskReason.PCES_RECOVERY_COMPLETE, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void setMinimumGenerationNonAncient(final long minimumGenerationNonAncien
* @param event the event to pass
*/
private void handleEvent(@NonNull final EventImpl event) {
eventCreator.registerEvent(event);
eventCreator.registerEvent(event.getBaseEvent());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
* for the entire network is equal to this value times the number of nodes. A
* value of 0 means that there is no limit to the number of events that can be
* created (as long as those events are legal to create).
* @param creationAttemptRate the rate (in hz) at which a node will attempt to create new events. If this
* value is higher than the max creation rate, it will still be constrained by the
* max creation rate. This being said, it is recommended to attempt event creation
* faster than the max creation rate in situations where creation rate is also
* throttled by the tipset algorithm (i.e. we are waiting for new events to use as
* parents).
* @param antiSelfishnessFactor the lower this number, the more likely it is that a new event will be created
* that reduces this node's selfishness score. Setting this too low may result in
* a suboptimal hashgraph topology. Setting this number too high may lead to some
Expand All @@ -45,6 +51,7 @@
@ConfigData("event.creation")
public record EventCreationConfig(
@ConfigProperty(defaultValue = "20") double maxCreationRate,
@ConfigProperty(defaultValue = "100") double creationAttemptRate,
@ConfigProperty(defaultValue = "10") double antiSelfishnessFactor,
@ConfigProperty(defaultValue = "10") int tipsetSnapshotHistorySize,
@ConfigProperty(defaultValue = "1024") int eventIntakeThrottle,
Expand Down
Loading

0 comments on commit 6353aeb

Please sign in to comment.