Skip to content

Commit

Permalink
06516 flatten build event handlers (#6517)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@swirldslabs.com>
  • Loading branch information
cody-littley committed May 11, 2023
1 parent d4c3217 commit 7160e00
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 113 deletions.
Expand Up @@ -30,7 +30,6 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -92,10 +91,6 @@ public class EventStreamManager<T extends StreamAligned & Timestamped & RunningH
* a predicate which checks whether this event is the last event before restart
* @param threadManager
* responsible for managing thread lifecycles
* @throws NoSuchAlgorithmException
* is thrown when fails to get required MessageDigest instance
* @throws IOException
* is thrown when fails to create directory for event streaming
*/
public EventStreamManager(
final ThreadManager threadManager,
Expand All @@ -106,13 +101,16 @@ public EventStreamManager(
final String eventsLogDir,
final long eventsLogPeriod,
final int eventStreamQueueCapacity,
final Predicate<T> isLastEventInFreezeCheck)
throws NoSuchAlgorithmException, IOException {
final Predicate<T> isLastEventInFreezeCheck) {

if (enableEventStreaming) {
// the directory to which event stream files are written
final String eventStreamDir = eventsLogDir + "/events_" + nodeName;
Files.createDirectories(Paths.get(eventStreamDir));
try {
Files.createDirectories(Paths.get(eventStreamDir));
} catch (final IOException e) {
throw new IllegalStateException("Can not create directory for event stream", e);
}

streamFileWriter = new TimestampStreamFileWriter<>(
eventStreamDir,
Expand Down
Expand Up @@ -219,7 +219,6 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -236,7 +235,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -256,7 +254,7 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
*/
private final NodeId selfId;
/** tell which pairs of members should establish connections */
final NetworkTopology topology;
private final NetworkTopology topology;
/**
* This object is responsible for rate limiting reconnect attempts (in the role of sender)
*/
Expand Down Expand Up @@ -323,7 +321,7 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
/** a long name including (app, swirld, member id, member self name) */
private final String platformName;
/** is used for calculating runningHash of all consensus events and writing consensus events to file */
private EventStreamManager<EventImpl> eventStreamManager;
private final EventStreamManager<EventImpl> eventStreamManager;
/**
* True if this node started from genesis.
*/
Expand Down Expand Up @@ -359,11 +357,11 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
*/
private ShadowGraphSynchronizer syncShadowgraphSynchronizer;
/** Stores and passes pre-consensus events to {@link SwirldStateManager} for handling */
private PreConsensusEventHandler preConsensusEventHandler;
private final PreConsensusEventHandler preConsensusEventHandler;
/** Stores and processes consensus events including sending them to {@link SwirldStateManager} for handling */
private ConsensusRoundHandler consensusRoundHandler;
private final ConsensusRoundHandler consensusRoundHandler;
/** Handles all interaction with {@link SwirldState} */
private SwirldStateManager swirldStateManager;
private final SwirldStateManager swirldStateManager;
/** Checks the validity of transactions and submits valid ones to the event transaction pool */
private final SwirldTransactionSubmitter transactionSubmitter;
/** clears all pipelines to prepare for a reconnect */
Expand Down Expand Up @@ -538,7 +536,6 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,

this.shadowGraph = new ShadowGraph(syncMetrics, initialAddressBook.getSize());

this.consensusRoundHandler = null;
this.swirldId = swirldId.clone();
this.crypto = crypto;

Expand Down Expand Up @@ -614,13 +611,24 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
logger.info(STARTUP.getMarker(), "startUpEventFrozenEndTime: {}", () -> startUpEventFrozenEndTime);
}

// initializes EventStreamManager instance
final Address address = getSelfAddress();
final String eventStreamManagerName;
if (address.getMemo() != null && !address.getMemo().isEmpty()) {
initEventStreamManager(address.getMemo());
eventStreamManagerName = address.getMemo();
} else {
initEventStreamManager(String.valueOf(selfId));
eventStreamManagerName = String.valueOf(selfId);
}
logger.info(STARTUP.getMarker(), "initialize eventStreamManager");
eventStreamManager = new EventStreamManager<>(
threadManager,
getSelfId(),
this,
eventStreamManagerName,
settings.isEnableEventStreaming(),
settings.getEventsLogDir(),
settings.getEventsLogPeriod(),
settings.getEventStreamQueueCapacity(),
this::isLastEventBeforeRestart);

if (chatterConfig.useChatter()) {
criticalQuorum = new CriticalQuorumImpl(initialAddressBook, false, chatterConfig.criticalQuorumSoftening());
Expand All @@ -632,7 +640,66 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
try (loadedState.signedStateFromDisk) {
final SignedState signedStateFromDisk = loadedState.signedStateFromDisk.getNullable();

buildEventHandlers(signedStateFromDisk, loadedState.initialState, genesisStateBuilder);
// Queue thread that stores and handles signed states that need to be hashed and have signatures collected.
final QueueThread<ReservedSignedState> stateHashSignQueueThread = PlatformConstructor.stateHashSignQueue(
threadManager, selfId.getId(), stateManagementComponent::newSignedStateFromTransactions);
stateHashSignQueueThread.start();

final State stateToLoad;
if (signedStateFromDisk != null) {
logger.debug(STARTUP.getMarker(), () -> new SavedStateLoadedPayload(
signedStateFromDisk.getRound(),
signedStateFromDisk.getConsensusTimestamp(),
startUpEventFrozenManager.getStartUpEventFrozenEndTime())
.toString());

stateToLoad = loadedState.initialState;

} else {
stateToLoad = buildGenesisState(this, initialAddressBook, appVersion, genesisStateBuilder);

// if we are not starting from a saved state, don't freeze on startup
startUpEventFrozenManager.setStartUpEventFrozenEndTime(null);
}

if (stateToLoad == null) {
// this should be impossible
throw new IllegalStateException("stateToLoad is null");
}

swirldStateManager = PlatformConstructor.swirldStateManager(
platformContext,
initialAddressBook,
selfId,
preConsensusSystemTransactionManager,
postConsensusSystemTransactionManager,
metrics,
PlatformConstructor.settingsProvider(),
freezeManager::isFreezeStarted,
stateToLoad);

// SwirldStateManager will get a copy of the state loaded, that copy will become stateCons.
// The original state will be saved in the SignedStateMgr and will be deleted when it becomes old

preConsensusEventHandler = components.add(PlatformConstructor.preConsensusEventHandler(
threadManager, selfId, swirldStateManager, consensusMetrics));
consensusRoundHandler = components.add(PlatformConstructor.consensusHandler(
platformContext,
threadManager,
selfId.getId(),
PlatformConstructor.settingsProvider(),
swirldStateManager,
new ConsensusHandlingMetrics(metrics, time),
eventStreamManager,
stateHashSignQueueThread,
preConsensusEventWriter::waitUntilDurable,
freezeManager::freezeStarted,
stateManagementComponent::roundAppliedToState,
appVersion));

if (signedStateFromDisk != null) {
consensusRoundHandler.loadDataFromSignedState(signedStateFromDisk, false);
}

final AddedEventMetrics addedEventMetrics = new AddedEventMetrics(this.selfId, metrics);
final PreconsensusEventStreamSequencer sequencer = new PreconsensusEventStreamSequencer();
Expand Down Expand Up @@ -1160,72 +1227,6 @@ private PreConsensusEventWriter buildPreConsensusEventWriter() {
return new AsyncPreConsensusEventWriter(platformContext, threadManager, syncWriter);
}

/**
* Build all the classes required for events and transactions to flow through the system
*/
private void buildEventHandlers(
@Nullable final SignedState signedStateFromDisk,
@Nullable final State initialState,
@NonNull final Supplier<SwirldState> genesisStateBuilder) {

// Queue thread that stores and handles signed states that need to be hashed and have signatures collected.
final QueueThread<ReservedSignedState> stateHashSignQueueThread = PlatformConstructor.stateHashSignQueue(
threadManager, selfId.getId(), stateManagementComponent::newSignedStateFromTransactions);
stateHashSignQueueThread.start();

if (signedStateFromDisk != null) {
logger.debug(STARTUP.getMarker(), () -> new SavedStateLoadedPayload(
signedStateFromDisk.getRound(),
signedStateFromDisk.getConsensusTimestamp(),
startUpEventFrozenManager.getStartUpEventFrozenEndTime())
.toString());

buildEventHandlersFromState(initialState, stateHashSignQueueThread);

consensusRoundHandler.loadDataFromSignedState(signedStateFromDisk, false);
} else {
final State state = buildGenesisState(this, initialAddressBook, appVersion, genesisStateBuilder);
buildEventHandlersFromState(state, stateHashSignQueueThread);

// if we are not starting from a saved state, don't freeze on startup
startUpEventFrozenManager.setStartUpEventFrozenEndTime(null);
}
}

private void buildEventHandlersFromState(
final State state, final QueueThread<ReservedSignedState> stateHashSignQueueThread) {

swirldStateManager = PlatformConstructor.swirldStateManager(
platformContext,
initialAddressBook,
selfId,
preConsensusSystemTransactionManager,
postConsensusSystemTransactionManager,
metrics,
PlatformConstructor.settingsProvider(),
freezeManager::isFreezeStarted,
state);

// SwirldStateManager will get a copy of the state loaded, that copy will become stateCons.
// The original state will be saved in the SignedStateMgr and will be deleted when it becomes old

preConsensusEventHandler = components.add(PlatformConstructor.preConsensusEventHandler(
threadManager, selfId, swirldStateManager, consensusMetrics));
consensusRoundHandler = components.add(PlatformConstructor.consensusHandler(
platformContext,
threadManager,
selfId.getId(),
PlatformConstructor.settingsProvider(),
swirldStateManager,
new ConsensusHandlingMetrics(metrics, time),
eventStreamManager,
stateHashSignQueueThread,
preConsensusEventWriter::waitUntilDurable,
freezeManager::freezeStarted,
stateManagementComponent::roundAppliedToState,
appVersion));
}

/**
* Start this platform.
*/
Expand Down Expand Up @@ -2114,33 +2115,6 @@ StartUpEventFrozenManager getStartUpEventFrozenManager() {
return startUpEventFrozenManager;
}

/**
* Initializes EventStreamManager instance, which will start threads for calculating RunningHash, and writing event
* stream files when event streaming is enabled
*
* @param name name of this node
*/
void initEventStreamManager(final String name) {
try {
logger.info(STARTUP.getMarker(), "initialize eventStreamManager");
eventStreamManager = new EventStreamManager<>(
threadManager,
getSelfId(),
this,
name,
settings.isEnableEventStreaming(),
settings.getEventsLogDir(),
settings.getEventsLogPeriod(),
settings.getEventStreamQueueCapacity(),
this::isLastEventBeforeRestart);
} catch (final NoSuchAlgorithmException | IOException e) {
logger.error(
EXCEPTION.getMarker(),
"Fail to initialize eventStreamHelper. Exception: {}",
ExceptionUtils.getStackTrace(e));
}
}

/**
* check whether the given event is the last event in its round, and the platform enters freeze period
*
Expand Down

0 comments on commit 7160e00

Please sign in to comment.