Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

06516 flatten build event handlers #6517

Merged
merged 4 commits into from May 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,9 +28,9 @@
import com.swirlds.common.system.NodeId;
import com.swirlds.common.threading.manager.ThreadManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -92,10 +92,6 @@ public class EventStreamManager<T extends StreamAligned & Timestamped & RunningH
* a predicate which checks whether this event is the last event before restart
* @param threadManager
* responsible for managing thread lifecycles
* @throws NoSuchAlgorithmException
* is thrown when fails to get required MessageDigest instance
* @throws IOException
* is thrown when fails to create directory for event streaming
*/
public EventStreamManager(
final ThreadManager threadManager,
Expand All @@ -106,13 +102,16 @@ public EventStreamManager(
final String eventsLogDir,
final long eventsLogPeriod,
final int eventStreamQueueCapacity,
final Predicate<T> isLastEventInFreezeCheck)
throws NoSuchAlgorithmException, IOException {
final Predicate<T> isLastEventInFreezeCheck) {

if (enableEventStreaming) {
// the directory to which event stream files are written
final String eventStreamDir = eventsLogDir + "/events_" + nodeName;
Files.createDirectories(Paths.get(eventStreamDir));
try {
Files.createDirectories(Paths.get(eventStreamDir));
} catch (final IOException e) {
throw new UncheckedIOException(e);
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
}

streamFileWriter = new TimestampStreamFileWriter<>(
eventStreamDir,
Expand Down
Expand Up @@ -219,7 +219,6 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -236,7 +235,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -256,7 +254,7 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
*/
private final NodeId selfId;
/** tell which pairs of members should establish connections */
final NetworkTopology topology;
private final NetworkTopology topology;
/**
* This object is responsible for rate limiting reconnect attempts (in the role of sender)
*/
Expand Down Expand Up @@ -323,7 +321,7 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
/** a long name including (app, swirld, member id, member self name) */
private final String platformName;
/** is used for calculating runningHash of all consensus events and writing consensus events to file */
private EventStreamManager<EventImpl> eventStreamManager;
private final EventStreamManager<EventImpl> eventStreamManager;
/**
* True if this node started from genesis.
*/
Expand Down Expand Up @@ -359,11 +357,11 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
*/
private ShadowGraphSynchronizer syncShadowgraphSynchronizer;
/** Stores and passes pre-consensus events to {@link SwirldStateManager} for handling */
private PreConsensusEventHandler preConsensusEventHandler;
private final PreConsensusEventHandler preConsensusEventHandler;
/** Stores and processes consensus events including sending them to {@link SwirldStateManager} for handling */
private ConsensusRoundHandler consensusRoundHandler;
/** Handles all interaction with {@link SwirldState} */
private SwirldStateManager swirldStateManager;
private final SwirldStateManager swirldStateManager;
/** Checks the validity of transactions and submits valid ones to the event transaction pool */
private final SwirldTransactionSubmitter transactionSubmitter;
/** clears all pipelines to prepare for a reconnect */
Expand Down Expand Up @@ -613,13 +611,24 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
logger.info(STARTUP.getMarker(), "startUpEventFrozenEndTime: {}", () -> startUpEventFrozenEndTime);
}

// initializes EventStreamManager instance
final Address address = getSelfAddress();
final String eventStreamManagerName;
if (address.getMemo() != null && !address.getMemo().isEmpty()) {
initEventStreamManager(address.getMemo());
eventStreamManagerName = address.getMemo();
} else {
initEventStreamManager(String.valueOf(selfId));
eventStreamManagerName = String.valueOf(selfId);
}
logger.info(STARTUP.getMarker(), "initialize eventStreamManager");
eventStreamManager = new EventStreamManager<>(
threadManager,
getSelfId(),
this,
eventStreamManagerName,
settings.isEnableEventStreaming(),
settings.getEventsLogDir(),
settings.getEventsLogPeriod(),
settings.getEventStreamQueueCapacity(),
this::isLastEventBeforeRestart);

if (settings.getChatter().isChatterUsed()) {
criticalQuorum =
Expand All @@ -632,7 +641,57 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
try (loadedState.signedStateFromDisk) {
final SignedState signedStateFromDisk = loadedState.signedStateFromDisk.getNullable();

buildEventHandlers(signedStateFromDisk, loadedState.initialState, genesisStateBuilder);
// Queue thread that stores and handles signed states that need to be hashed and have signatures collected.
final QueueThread<ReservedSignedState> stateHashSignQueueThread = PlatformConstructor.stateHashSignQueue(
threadManager, selfId.getId(), stateManagementComponent::newSignedStateFromTransactions);
stateHashSignQueueThread.start();

final State stateToLoad;
if (signedStateFromDisk != null) {
logger.debug(STARTUP.getMarker(), () -> new SavedStateLoadedPayload(
signedStateFromDisk.getRound(),
signedStateFromDisk.getConsensusTimestamp(),
startUpEventFrozenManager.getStartUpEventFrozenEndTime())
.toString());

stateToLoad = loadedState.initialState;
consensusRoundHandler.loadDataFromSignedState(signedStateFromDisk, false);
alittley marked this conversation as resolved.
Show resolved Hide resolved
} else {
stateToLoad = buildGenesisState(this, initialAddressBook, appVersion, genesisStateBuilder);

// if we are not starting from a saved state, don't freeze on startup
startUpEventFrozenManager.setStartUpEventFrozenEndTime(null);
}

swirldStateManager = PlatformConstructor.swirldStateManager(
platformContext,
initialAddressBook,
selfId,
preConsensusSystemTransactionManager,
postConsensusSystemTransactionManager,
metrics,
PlatformConstructor.settingsProvider(),
freezeManager::isFreezeStarted,
stateToLoad);
alittley marked this conversation as resolved.
Show resolved Hide resolved

// SwirldStateManager will get a copy of the state loaded, that copy will become stateCons.
// The original state will be saved in the SignedStateMgr and will be deleted when it becomes old

preConsensusEventHandler = components.add(PlatformConstructor.preConsensusEventHandler(
threadManager, selfId, swirldStateManager, consensusMetrics));
consensusRoundHandler = components.add(PlatformConstructor.consensusHandler(
platformContext,
threadManager,
selfId.getId(),
PlatformConstructor.settingsProvider(),
swirldStateManager,
new ConsensusHandlingMetrics(metrics, time),
eventStreamManager,
stateHashSignQueueThread,
preConsensusEventWriter::waitUntilDurable,
freezeManager::freezeStarted,
stateManagementComponent::roundAppliedToState,
appVersion));

final AddedEventMetrics addedEventMetrics = new AddedEventMetrics(this.selfId, metrics);
final PreconsensusEventStreamSequencer sequencer = new PreconsensusEventStreamSequencer();
Expand Down Expand Up @@ -1159,72 +1218,6 @@ private PreConsensusEventWriter buildPreConsensusEventWriter() {
return new AsyncPreConsensusEventWriter(platformContext, threadManager, syncWriter);
}

/**
* Build all the classes required for events and transactions to flow through the system
*/
private void buildEventHandlers(
@Nullable final SignedState signedStateFromDisk,
@Nullable final State initialState,
@NonNull final Supplier<SwirldState> genesisStateBuilder) {

// Queue thread that stores and handles signed states that need to be hashed and have signatures collected.
final QueueThread<ReservedSignedState> stateHashSignQueueThread = PlatformConstructor.stateHashSignQueue(
threadManager, selfId.getId(), stateManagementComponent::newSignedStateFromTransactions);
stateHashSignQueueThread.start();

if (signedStateFromDisk != null) {
logger.debug(STARTUP.getMarker(), () -> new SavedStateLoadedPayload(
signedStateFromDisk.getRound(),
signedStateFromDisk.getConsensusTimestamp(),
startUpEventFrozenManager.getStartUpEventFrozenEndTime())
.toString());

buildEventHandlersFromState(initialState, stateHashSignQueueThread);

consensusRoundHandler.loadDataFromSignedState(signedStateFromDisk, false);
} else {
final State state = buildGenesisState(this, initialAddressBook, appVersion, genesisStateBuilder);
buildEventHandlersFromState(state, stateHashSignQueueThread);

// if we are not starting from a saved state, don't freeze on startup
startUpEventFrozenManager.setStartUpEventFrozenEndTime(null);
}
}

private void buildEventHandlersFromState(
final State state, final QueueThread<ReservedSignedState> stateHashSignQueueThread) {

swirldStateManager = PlatformConstructor.swirldStateManager(
platformContext,
initialAddressBook,
selfId,
preConsensusSystemTransactionManager,
postConsensusSystemTransactionManager,
metrics,
PlatformConstructor.settingsProvider(),
freezeManager::isFreezeStarted,
state);

// SwirldStateManager will get a copy of the state loaded, that copy will become stateCons.
// The original state will be saved in the SignedStateMgr and will be deleted when it becomes old

preConsensusEventHandler = components.add(PlatformConstructor.preConsensusEventHandler(
threadManager, selfId, swirldStateManager, consensusMetrics));
consensusRoundHandler = components.add(PlatformConstructor.consensusHandler(
platformContext,
threadManager,
selfId.getId(),
PlatformConstructor.settingsProvider(),
swirldStateManager,
new ConsensusHandlingMetrics(metrics, time),
eventStreamManager,
stateHashSignQueueThread,
preConsensusEventWriter::waitUntilDurable,
freezeManager::freezeStarted,
stateManagementComponent::roundAppliedToState,
appVersion));
}

/**
* Start this platform.
*/
Expand Down Expand Up @@ -2110,33 +2103,6 @@ StartUpEventFrozenManager getStartUpEventFrozenManager() {
return startUpEventFrozenManager;
}

/**
* Initializes EventStreamManager instance, which will start threads for calculating RunningHash, and writing event
* stream files when event streaming is enabled
*
* @param name name of this node
*/
void initEventStreamManager(final String name) {
try {
logger.info(STARTUP.getMarker(), "initialize eventStreamManager");
eventStreamManager = new EventStreamManager<>(
threadManager,
getSelfId(),
this,
name,
settings.isEnableEventStreaming(),
settings.getEventsLogDir(),
settings.getEventsLogPeriod(),
settings.getEventStreamQueueCapacity(),
this::isLastEventBeforeRestart);
} catch (final NoSuchAlgorithmException | IOException e) {
logger.error(
EXCEPTION.getMarker(),
"Fail to initialize eventStreamHelper. Exception: {}",
ExceptionUtils.getStackTrace(e));
}
}

/**
* check whether the given event is the last event in its round, and the platform enters freeze period
*
Expand Down