Skip to content

Commit

Permalink
Write preconsensus event stream (#5633)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@swirldslabs.com>
  • Loading branch information
cody-littley authored Mar 30, 2023
1 parent d4fd292 commit 6583dc1
Show file tree
Hide file tree
Showing 49 changed files with 926 additions and 440 deletions.
2 changes: 1 addition & 1 deletion platform-sdk/docs/threads/eventflow-threads.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion platform-sdk/docs/threads/eventflow-threads.html

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions platform-sdk/sdk/settings.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
# All other values are true.
#######################################################################################

maxOutgoingSyncs, 1
state.saveStatePeriod, 0
showInternalStats, 1
doUpnp, false
useLoopbackIp, false
csvFileName, PlatformTesting
checkSignedStateFromDisk, 1
loadKeysFromPfxFiles, 0
maxOutgoingSyncs, 1
state.saveStatePeriod, 0
showInternalStats, true
doUpnp, false
useLoopbackIp, false
csvFileName, PlatformTesting
checkSignedStateFromDisk, true
loadKeysFromPfxFiles, false
event.preconsensus.enableStorage, false
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.base.functions;

/**
* Similar to {@link java.util.function.Consumer} but throws an exception.
*
* @param <T> the type accepted by the consumer
* @param <E> the type thrown by the consumer
*/
@FunctionalInterface
public interface ThrowingConsumer<T, E extends Exception> {

/**
* Accept the value.
*
* @param t the value to accept
* @throws E the exception type thrown by the consumer
*/
void accept(T t) throws E;
}
1 change: 1 addition & 0 deletions platform-sdk/swirlds-base/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
module com.swirlds.base {
exports com.swirlds.base;
exports com.swirlds.base.functions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import java.nio.file.Path;

/**
* Config that control the SignedStateManager and SignedStateFileManager behaviors.
Expand Down Expand Up @@ -94,7 +95,7 @@
*/
@ConfigData("state")
public record StateConfig(
@ConfigProperty(defaultValue = "data/saved") String savedStateDirectory,
@ConfigProperty(defaultValue = "data/saved") Path savedStateDirectory,
@ConfigProperty(defaultValue = "") String mainClassNameOverride,
@ConfigProperty(defaultValue = "false") boolean cleanSavedStateDirectory,
@ConfigProperty(defaultValue = "20") int stateSavingQueueSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.swirlds.common.config.StateConfig;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import java.nio.file.Path;

/**
* Settings for temporary files
Expand All @@ -39,6 +38,6 @@ public record TemporaryFileConfig(@ConfigProperty(defaultValue = "swirlds-tmp")
* @return the location where temporary files are stored
*/
public String getTemporaryFilePath(final StateConfig stateConfig) {
return Path.of(stateConfig.savedStateDirectory(), temporaryFilePath()).toString();
return stateConfig.savedStateDirectory().resolve(temporaryFilePath()).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package com.swirlds.common.threading.interrupt;

import static com.swirlds.base.ArgumentUtils.throwArgNull;
import static com.swirlds.logging.LogMarker.EXCEPTION;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.swirlds.base.functions.ThrowingConsumer;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Duration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -29,10 +33,9 @@
* </p>
*
* <p>
* WITH GREAT POWER COMES GREAT RESPONSIBILITY. It's really easy to shoot yourself in the
* foot with these methods. Be EXTRA confident that you understand the big picture on
* any thread where you use one of these methods. Incorrectly handing an interrupt
* can cause a lot of headache.
* WITH GREAT POWER COMES GREAT RESPONSIBILITY. It's really easy to shoot yourself in the foot with these methods. Be
* EXTRA confident that you understand the big picture on any thread where you use one of these methods. Incorrectly
* handing an interrupt can cause a lot of headache.
* </p>
*/
public final class Uninterruptable {
Expand All @@ -43,21 +46,21 @@ private Uninterruptable() {}

/**
* <p>
* Perform an action. If that action is interrupted, re-attempt that action. If interrupted again
* then re-attempt again, until the action is eventually successful. Unless this thread is being
* interrupted many times, the action is most likely to be run 1 or 2 times.
* Perform an action. If that action is interrupted, re-attempt that action. If interrupted again then re-attempt
* again, until the action is eventually successful. Unless this thread is being interrupted many times, the action
* is most likely to be run 1 or 2 times.
* </p>
*
* <p>
* This method is useful when operating in a context where it is inconvenient to throw an
* {@link InterruptedException}, or when performing an action using an interruptable interface
* but where the required operation is needed to always succeed regardless of interrupts.
* {@link InterruptedException}, or when performing an action using an interruptable interface but where the
* required operation is needed to always succeed regardless of interrupts.
* </p>
*
* @param action
* the action to perform, may be called multiple times if interrupted
* @param action the action to perform, may be called multiple times if interrupted
*/
public static void retryIfInterrupted(final InterruptableRunnable action) {
public static void retryIfInterrupted(@NonNull final InterruptableRunnable action) {
throwArgNull(action, "action");
boolean finished = false;
boolean interrupted = false;
while (!finished) {
Expand All @@ -76,21 +79,21 @@ public static void retryIfInterrupted(final InterruptableRunnable action) {

/**
* <p>
* Perform an action that returns a value. If that action is interrupted, re-attempt that action.
* If interrupted again then re-attempt again, until the action is eventually successful.
* Unless this thread is being interrupted many times, the action is most likely to be run 1 or 2 times.
* Perform an action that returns a value. If that action is interrupted, re-attempt that action. If interrupted
* again then re-attempt again, until the action is eventually successful. Unless this thread is being interrupted
* many times, the action is most likely to be run 1 or 2 times.
* </p>
*
* <p>
* This method is useful when operating in a context where it is inconvenient to throw an
* {@link InterruptedException}, or when performing an action using an interruptable interface
* but where the required operation is needed to always succeed regardless of interrupts.
* {@link InterruptedException}, or when performing an action using an interruptable interface but where the
* required operation is needed to always succeed regardless of interrupts.
* </p>
*
* @param action
* the action to perform, may be called multiple times if interrupted
* @param action the action to perform, may be called multiple times if interrupted
*/
public static <T> T retryIfInterrupted(final InterruptableSupplier<T> action) {
public static @Nullable <T> T retryIfInterrupted(@NonNull final InterruptableSupplier<T> action) {
throwArgNull(action, "action");
boolean finished = false;
boolean interrupted = false;
T value = null;
Expand All @@ -111,13 +114,13 @@ public static <T> T retryIfInterrupted(final InterruptableSupplier<T> action) {
}

/**
* Perform an action. If the thread is interrupted, the action will be aborted and the thread's interrupt
* flag will be reset.
* Perform an action. If the thread is interrupted, the action will be aborted and the thread's interrupt flag will
* be reset.
*
* @param action
* the action to perform
* @param action the action to perform
*/
public static void abortIfInterrupted(final InterruptableRunnable action) {
public static void abortIfInterrupted(@NonNull InterruptableRunnable action) {
throwArgNull(action, "action");
try {
action.run();
} catch (final InterruptedException e) {
Expand All @@ -127,20 +130,20 @@ public static void abortIfInterrupted(final InterruptableRunnable action) {

/**
* <p>
* Perform an action. If the thread is interrupted, the action will be aborted and the thread's interrupt
* flag will be set. Also writes an error message to the log.
* Perform an action. If the thread is interrupted, the action will be aborted and the thread's interrupt flag will
* be set. Also writes an error message to the log.
* </p>
*
* <p>
* This method is useful for situations where interrupts are only expected if there has been an error condition.
* </p>
*
* @param action
* the action to perform
* @param errorMessage
* the error message to write to the log if this thread is inerrupted
* @param action the action to perform
* @param errorMessage the error message to write to the log if this thread is inerrupted
*/
public static void abortAndLogIfInterrupted(final InterruptableRunnable action, final String errorMessage) {
public static void abortAndLogIfInterrupted(
@NonNull final InterruptableRunnable action, @NonNull final String errorMessage) {
throwArgNull(action, "action");
try {
action.run();
} catch (final InterruptedException e) {
Expand All @@ -151,23 +154,52 @@ public static void abortAndLogIfInterrupted(final InterruptableRunnable action,

/**
* <p>
* Perform an action. If the thread is interrupted, the action will be aborted, the thread's interrupt
* flag will be set, and an exception will be thrown. Also writes an error message to the log.
* Pass an object to a consumer that may throw an {@link InterruptedException}. If the thread is interrupted, the
* action will be aborted and the thread's interrupt flag will be set. Also writes an error message to the log.
* </p>
*
* <p>
* This method is useful for situations where interrupts are only expected if there has been an error condition
* and if it is preferred to immediately crash the current thread.
* This method is useful for situations where interrupts are only expected if there has been an error condition.
* </p>
*
* @param consumer an object that consumes something and may throw an {@link InterruptedException}
* @param object the object to pass to the consumer
* @param errorMessage the error message to write to the log if this thread is inerrupted
*/
public static <T> void abortAndLogIfInterrupted(
@NonNull final ThrowingConsumer<T, InterruptedException> consumer,
@Nullable final T object,
@NonNull final String errorMessage) {

throwArgNull(consumer, "consumer");
throwArgNull(errorMessage, "errorMessage");

try {
consumer.accept(object);
} catch (final InterruptedException e) {
logger.error(EXCEPTION.getMarker(), errorMessage);
Thread.currentThread().interrupt();
}
}

/**
* <p>
* Perform an action. If the thread is interrupted, the action will be aborted, the thread's interrupt flag will be
* set, and an exception will be thrown. Also writes an error message to the log.
* </p>
*
* <p>
* This method is useful for situations where interrupts are only expected if there has been an error condition and
* if it is preferred to immediately crash the current thread.
* </p>
*
* @param action
* the action to perform
* @param errorMessage
* the error message to write to the log if this thread is interrupted
* @throws IllegalStateException
* if interrupted
* @param action the action to perform
* @param errorMessage the error message to write to the log if this thread is interrupted
* @throws IllegalStateException if interrupted
*/
public static void abortAndThrowIfInterrupted(final InterruptableRunnable action, final String errorMessage) {
public static void abortAndThrowIfInterrupted(
@NonNull InterruptableRunnable action, @NonNull final String errorMessage) {
throwArgNull(action, "action");
try {
action.run();
} catch (final InterruptedException e) {
Expand All @@ -180,10 +212,9 @@ public static void abortAndThrowIfInterrupted(final InterruptableRunnable action
/**
* Attempt to sleep for a period of time. If interrupted, the sleep may finish early.
*
* @param duration
* the amount of time to sleep
* @param duration the amount of time to sleep
*/
public static void tryToSleep(final Duration duration) {
public static void tryToSleep(@NonNull Duration duration) {
abortIfInterrupted(() -> MILLISECONDS.sleep(duration.toMillis()));
}
}
1 change: 1 addition & 0 deletions platform-sdk/swirlds-platform-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
api(project(":swirlds-fcqueue"))
api(project(":swirlds-jasperdb"))
api(project(":swirlds-cli"))
api(project(":swirlds-base"))
compileOnly(libs.spotbugs.annotations)
runtimeOnly(project(":swirlds-config-impl"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import com.swirlds.platform.config.legacy.LegacyConfigPropertiesLoader;
import com.swirlds.platform.crypto.CryptoConstants;
import com.swirlds.platform.dispatch.DispatchConfiguration;
import com.swirlds.platform.event.preconsensus.PreConsensusEventStreamConfig;
import com.swirlds.platform.gui.internal.InfoApp;
import com.swirlds.platform.gui.internal.InfoMember;
import com.swirlds.platform.gui.internal.InfoSwirld;
Expand Down Expand Up @@ -209,7 +210,8 @@ private Browser(final Set<Integer> localNodesToStart) throws IOException {
.withConfigDataType(MetricsConfig.class)
.withConfigDataType(PrometheusConfig.class)
.withConfigDataType(OSHealthCheckConfig.class)
.withConfigDataType(WiringConfig.class);
.withConfigDataType(WiringConfig.class)
.withConfigDataType(PreConsensusEventStreamConfig.class);

// Assume all locally run instances provide the same configuration definitions to the configuration builder.
if (appMains.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.swirlds.platform.SwirldsPlatform.PLATFORM_THREAD_POOL_NAME;

import com.swirlds.base.functions.ThrowingConsumer;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.crypto.config.CryptoConfig;
import com.swirlds.common.metrics.Metrics;
Expand Down Expand Up @@ -49,6 +50,7 @@
import com.swirlds.platform.state.SwirldStateManagerImpl;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.system.PlatformConstructionException;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.KeyManagementException;
Expand Down Expand Up @@ -216,6 +218,7 @@ static PreConsensusEventHandler preConsensusEventHandler(
* the instance that streams consensus events to disk
* @param stateHashSignQueue
* the queue for signed states that need signatures collected
* @param waitForEventDurability a method that blocks until an event becomes durable.
* @param enterFreezePeriod
* a runnable executed when a freeze is entered
* @param roundAppliedToStateConsumer
Expand All @@ -225,17 +228,18 @@ static PreConsensusEventHandler preConsensusEventHandler(
* @return the newly constructed instance of {@link ConsensusRoundHandler}
*/
static ConsensusRoundHandler consensusHandler(
final PlatformContext platformContext,
final ThreadManager threadManager,
@NonNull final PlatformContext platformContext,
@NonNull final ThreadManager threadManager,
final long selfId,
final SettingsProvider settingsProvider,
final SwirldStateManager swirldStateManager,
final ConsensusHandlingMetrics consensusHandlingMetrics,
final EventStreamManager<EventImpl> eventStreamManager,
final BlockingQueue<SignedState> stateHashSignQueue,
final Runnable enterFreezePeriod,
final RoundAppliedToStateConsumer roundAppliedToStateConsumer,
final SoftwareVersion softwareVersion) {
@NonNull final SettingsProvider settingsProvider,
@NonNull final SwirldStateManager swirldStateManager,
@NonNull final ConsensusHandlingMetrics consensusHandlingMetrics,
@NonNull final EventStreamManager<EventImpl> eventStreamManager,
@NonNull final BlockingQueue<SignedState> stateHashSignQueue,
@NonNull final ThrowingConsumer<EventImpl, InterruptedException> waitForEventDurability,
@NonNull final Runnable enterFreezePeriod,
@NonNull final RoundAppliedToStateConsumer roundAppliedToStateConsumer,
@NonNull final SoftwareVersion softwareVersion) {

return new ConsensusRoundHandler(
platformContext,
Expand All @@ -246,6 +250,7 @@ static ConsensusRoundHandler consensusHandler(
consensusHandlingMetrics,
eventStreamManager,
stateHashSignQueue,
waitForEventDurability,
enterFreezePeriod,
roundAppliedToStateConsumer,
softwareVersion);
Expand Down
Loading

0 comments on commit 6583dc1

Please sign in to comment.