Skip to content

Commit

Permalink
Flatten SwirldsPlatform.buildEventHandlers()
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@swirldslabs.com>
  • Loading branch information
cody-littley committed May 9, 2023
1 parent ea2f117 commit c2114cb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 42 deletions.
Expand Up @@ -28,9 +28,9 @@
import com.swirlds.common.system.NodeId;
import com.swirlds.common.threading.manager.ThreadManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -92,10 +92,6 @@ public class EventStreamManager<T extends StreamAligned & Timestamped & RunningH
* a predicate which checks whether this event is the last event before restart
* @param threadManager
* responsible for managing thread lifecycles
* @throws NoSuchAlgorithmException
* is thrown when fails to get required MessageDigest instance
* @throws IOException
* is thrown when fails to create directory for event streaming
*/
public EventStreamManager(
final ThreadManager threadManager,
Expand All @@ -106,13 +102,16 @@ public EventStreamManager(
final String eventsLogDir,
final long eventsLogPeriod,
final int eventStreamQueueCapacity,
final Predicate<T> isLastEventInFreezeCheck)
throws NoSuchAlgorithmException, IOException {
final Predicate<T> isLastEventInFreezeCheck) {

if (enableEventStreaming) {
// the directory to which event stream files are written
final String eventStreamDir = eventsLogDir + "/events_" + nodeName;
Files.createDirectories(Paths.get(eventStreamDir));
try {
Files.createDirectories(Paths.get(eventStreamDir));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}

streamFileWriter = new TimestampStreamFileWriter<>(
eventStreamDir,
Expand Down
Expand Up @@ -219,7 +219,6 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -236,7 +235,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -256,7 +254,7 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
*/
private final NodeId selfId;
/** tell which pairs of members should establish connections */
final NetworkTopology topology;
private final NetworkTopology topology;
/**
* This object is responsible for rate limiting reconnect attempts (in the role of sender)
*/
Expand Down Expand Up @@ -323,7 +321,7 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
/** a long name including (app, swirld, member id, member self name) */
private final String platformName;
/** is used for calculating runningHash of all consensus events and writing consensus events to file */
private EventStreamManager<EventImpl> eventStreamManager;
private final EventStreamManager<EventImpl> eventStreamManager;
/**
* True if this node started from genesis.
*/
Expand Down Expand Up @@ -613,13 +611,24 @@ public class SwirldsPlatform implements Platform, PlatformWithDeprecatedMethods,
logger.info(STARTUP.getMarker(), "startUpEventFrozenEndTime: {}", () -> startUpEventFrozenEndTime);
}

// initializes EventStreamManager instance
final Address address = getSelfAddress();
final String eventStreamManagerName;
if (address.getMemo() != null && !address.getMemo().isEmpty()) {
initEventStreamManager(address.getMemo());
eventStreamManagerName = address.getMemo();
} else {
initEventStreamManager(String.valueOf(selfId));
eventStreamManagerName = String.valueOf(selfId);
}
logger.info(STARTUP.getMarker(), "initialize eventStreamManager");
eventStreamManager = new EventStreamManager<>(
threadManager,
getSelfId(),
this,
eventStreamManagerName,
settings.isEnableEventStreaming(),
settings.getEventsLogDir(),
settings.getEventsLogPeriod(),
settings.getEventStreamQueueCapacity(),
this::isLastEventBeforeRestart);

if (settings.getChatter().isChatterUsed()) {
criticalQuorum =
Expand Down Expand Up @@ -2094,33 +2103,6 @@ StartUpEventFrozenManager getStartUpEventFrozenManager() {
return startUpEventFrozenManager;
}

/**
* Initializes EventStreamManager instance, which will start threads for calculating RunningHash, and writing event
* stream files when event streaming is enabled
*
* @param name name of this node
*/
void initEventStreamManager(final String name) {
try {
logger.info(STARTUP.getMarker(), "initialize eventStreamManager");
eventStreamManager = new EventStreamManager<>(
threadManager,
getSelfId(),
this,
name,
settings.isEnableEventStreaming(),
settings.getEventsLogDir(),
settings.getEventsLogPeriod(),
settings.getEventStreamQueueCapacity(),
this::isLastEventBeforeRestart);
} catch (final NoSuchAlgorithmException | IOException e) {
logger.error(
EXCEPTION.getMarker(),
"Fail to initialize eventStreamHelper. Exception: {}",
ExceptionUtils.getStackTrace(e));
}
}

/**
* check whether the given event is the last event in its round, and the platform enters freeze period
*
Expand Down

0 comments on commit c2114cb

Please sign in to comment.