diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/GenesisPlatformStateCommand.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/GenesisPlatformStateCommand.java index 69d4721948b6..28567778d6de 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/GenesisPlatformStateCommand.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/GenesisPlatformStateCommand.java @@ -95,7 +95,7 @@ public Integer call() throws IOException, ExecutionException, InterruptedExcepti .digestTreeAsync(reservedSignedState.get().getState()) .get(); System.out.printf("Writing modified state to %s %n", outputDir.toAbsolutePath()); - writeSignedStateFilesToDirectory(NO_NODE_ID, outputDir, reservedSignedState.get(), configuration); + writeSignedStateFilesToDirectory(platformContext, NO_NODE_ID, outputDir, reservedSignedState.get()); } return 0; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventFileManager.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventFileManager.java index 0488130e4e52..160ebcefab1c 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventFileManager.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventFileManager.java @@ -162,7 +162,7 @@ public PreconsensusEventFileManager( * @return the directory where event files are stored */ @NonNull - private static Path getDatabaseDirectory( + public static Path getDatabaseDirectory( @NonNull final PlatformContext platformContext, @NonNull final NodeId selfId) throws IOException { final StateConfig stateConfig = platformContext.getConfiguration().getConfigData(StateConfig.class); diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventStreamConfig.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventStreamConfig.java index cd4e1346fc51..a0600cc1bd43 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventStreamConfig.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventStreamConfig.java @@ -77,7 +77,8 @@ * Both. Use this with caution. * @param databaseDirectory the directory where preconsensus events will be stored, * relative to - * {@link com.swirlds.common.config.StateConfig#savedStateDirectory()}. + * {@link + * com.swirlds.common.config.StateConfig#savedStateDirectory()}. * @param enableStorage if true, then stream preconsensus events to files on disk. If * this is disabled then a network wide crash (perhaps due to a * bug) can cause transactions that previously reached consensus @@ -90,6 +91,10 @@ * @param replayQueueSize the size of the queue used for holding preconsensus events * that are waiting to be replayed * @param replayHashPoolSize the number of threads used for hashing events during replay + * @param copyRecentStreamToStateSnapshots if true, then copy recent PCES files into the saved state + * snapshot directories every time we take a state snapshot. The + * files copied are guaranteed to contain all non-ancient events + * w.r.t. the state snapshot. */ @ConfigData("event.preconsensus") public record PreconsensusEventStreamConfig( @@ -106,4 +111,5 @@ public record PreconsensusEventStreamConfig( @ConfigProperty(defaultValue = "true") boolean enableStorage, @ConfigProperty(defaultValue = "true") boolean enableReplay, @ConfigProperty(defaultValue = "1024") int replayQueueSize, - @ConfigProperty(defaultValue = "8") int replayHashPoolSize) {} + @ConfigProperty(defaultValue = "8") int replayHashPoolSize, + @ConfigProperty(defaultValue = "true") boolean copyRecentStreamToStateSnapshots) {} diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/recovery/EventRecoveryWorkflow.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/recovery/EventRecoveryWorkflow.java index c87990429f13..3f716b855bf5 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/recovery/EventRecoveryWorkflow.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/recovery/EventRecoveryWorkflow.java @@ -168,7 +168,10 @@ public static void recoverState( resultingStateDirectory); SignedStateFileWriter.writeSignedStateFilesToDirectory( - selfId, resultingStateDirectory, recoveredState.state().get(), platformContext.getConfiguration()); + platformContext, + selfId, + resultingStateDirectory, + recoveredState.state().get()); final StateConfig stateConfig = platformContext.getConfiguration().getConfigData(StateConfig.class); updateEmergencyRecoveryFile( stateConfig, resultingStateDirectory, initialState.get().getConsensusTimestamp()); diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/editor/StateEditorSave.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/editor/StateEditorSave.java index a9cd6bcebc4f..e4eb4cbae559 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/editor/StateEditorSave.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/editor/StateEditorSave.java @@ -22,7 +22,11 @@ import static com.swirlds.platform.state.signed.SignedStateFileWriter.writeSignedStateFilesToDirectory; import com.swirlds.cli.utility.SubcommandOf; +import com.swirlds.common.context.DefaultPlatformContext; +import com.swirlds.common.context.PlatformContext; +import com.swirlds.common.crypto.CryptographyHolder; import com.swirlds.common.merkle.crypto.MerkleCryptoFactory; +import com.swirlds.common.metrics.noop.NoOpMetrics; import com.swirlds.config.api.Configuration; import com.swirlds.logging.legacy.LogMarker; import com.swirlds.platform.config.DefaultConfiguration; @@ -69,8 +73,12 @@ public void run() { } final Configuration configuration = DefaultConfiguration.buildBasicConfiguration(); + + final PlatformContext platformContext = + new DefaultPlatformContext(configuration, new NoOpMetrics(), CryptographyHolder.get()); + try (final ReservedSignedState signedState = getStateEditor().getSignedStateCopy()) { - writeSignedStateFilesToDirectory(NO_NODE_ID, directory, signedState.get(), configuration); + writeSignedStateFilesToDirectory(platformContext, NO_NODE_ID, directory, signedState.get()); } } catch (final IOException e) { diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileManager.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileManager.java index 4dc7b6509754..00af87b0bdfd 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileManager.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileManager.java @@ -103,6 +103,7 @@ public class SignedStateFileManager implements Startable { private final SignedStateMetrics metrics; private final Configuration configuration; + private final PlatformContext platformContext; /** * Provides system time @@ -162,6 +163,7 @@ public SignedStateFileManager( this.mainClassName = mainClassName; this.swirldName = swirldName; this.stateToDiskAttemptConsumer = stateToDiskAttemptConsumer; + this.platformContext = Objects.requireNonNull(context); this.configuration = Objects.requireNonNull(context).getConfiguration(); this.minimumGenerationNonAncientConsumer = Objects.requireNonNull( minimumGenerationNonAncientConsumer, "minimumGenerationNonAncientConsumer must not be null"); @@ -288,7 +290,7 @@ private void saveStateTask( if (outOfBand) { // states requested to be written out-of-band are always written to disk SignedStateFileWriter.writeSignedStateToDisk( - selfId, directory, reservedSignedState.get(), reason, configuration); + platformContext, selfId, directory, reservedSignedState.get(), reason); success = true; } else { @@ -303,7 +305,7 @@ private void saveStateTask( } SignedStateFileWriter.writeSignedStateToDisk( - selfId, directory, reservedSignedState.get(), reason, configuration); + platformContext, selfId, directory, reservedSignedState.get(), reason); stateWrittenToDiskInBand(reservedSignedState.get(), directory, start); success = true; @@ -408,8 +410,8 @@ public boolean saveSignedStateToDisk(@NonNull final SignedState signedState, fin * Dump a state to disk out-of-band. *

* Writing a state "out-of-band" means the state is being written for the sake of a human, whether for debug - * purposes, or because of a fault. States written out-of-band will not be read automatically by the platform, - * and will not be used as an initial state at boot time. + * purposes, or because of a fault. States written out-of-band will not be read automatically by the platform, and + * will not be used as an initial state at boot time. *

* A dumped state will be saved in a subdirectory of the signed states base directory, with the subdirectory being * named after the reason the state is being written out-of-band. diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java index d29ea314a10b..34057ed51a76 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java @@ -29,12 +29,15 @@ import com.swirlds.common.config.StateConfig; import com.swirlds.common.config.singleton.ConfigurationHolder; +import com.swirlds.common.context.PlatformContext; import com.swirlds.common.io.streams.MerkleDataOutputStream; import com.swirlds.common.merkle.utility.MerkleTreeVisualizer; import com.swirlds.common.system.NodeId; import com.swirlds.common.system.address.AddressBook; -import com.swirlds.config.api.Configuration; import com.swirlds.logging.legacy.payload.StateSavedToDiskPayload; +import com.swirlds.platform.event.preconsensus.PreconsensusEventFile; +import com.swirlds.platform.event.preconsensus.PreconsensusEventFileManager; +import com.swirlds.platform.event.preconsensus.PreconsensusEventStreamConfig; import com.swirlds.platform.recovery.emergencyfile.EmergencyRecoveryFile; import com.swirlds.platform.state.State; import edu.umd.cs.findbugs.annotations.NonNull; @@ -42,9 +45,14 @@ import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Objects; +import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -135,27 +143,257 @@ public static void writeStateFile(final Path directory, final SignedState signed /** * Write all files that belong in the signed state directory into a directory. * - * @param selfId the id of the platform - * @param directory the directory where all files should be placed - * @param signedState the signed state being written to disk - * @param configuration the configuration used + * @param platformContext the platform context + * @param selfId the id of the platform + * @param directory the directory where all files should be placed + * @param signedState the signed state being written to disk */ public static void writeSignedStateFilesToDirectory( - @Nullable NodeId selfId, + @Nullable final PlatformContext platformContext, + @Nullable final NodeId selfId, @NonNull final Path directory, - @NonNull final SignedState signedState, - @NonNull final Configuration configuration) + @NonNull final SignedState signedState) throws IOException { - Objects.requireNonNull(directory, "directory must not be null"); - Objects.requireNonNull(signedState, "signedState must not be null"); - Objects.requireNonNull(configuration, "configuration must not be null"); + Objects.requireNonNull(platformContext); + Objects.requireNonNull(directory); + Objects.requireNonNull(signedState); writeStateFile(directory, signedState); writeHashInfoFile(directory, signedState.getState()); writeMetadataFile(selfId, directory, signedState); writeEmergencyRecoveryFile(directory, signedState); writeStateAddressBookFile(directory, signedState.getAddressBook()); - writeSettingsUsed(directory, configuration); + writeSettingsUsed(directory, platformContext.getConfiguration()); + + if (selfId != null) { + copyPreconsensusEventStreamFiles( + platformContext, + selfId, + directory, + signedState.getState().getPlatformState().getPlatformData().getMinimumGenerationNonAncient()); + } + } + + /** + * Copy preconsensus event files into the signed state directory. These files are necessary for the platform to use + * the state file as a starting point. Note: starting a node using the PCES files in the state directory does not + * guarantee that there is no data loss (i.e. there may be transactions that reach consensus after the state + * snapshot), but it does allow a node to start up and participate in gossip. + * + *

+ * This general strategy is not very elegant is very much a hack. But it will allow us to do migration testing using + * real production states and streams, in the short term. In the longer term we should consider alternate and + * cleaner strategies. + * + * @param platformContext the platform context + * @param destinationDirectory the directory where the state is being written + * @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the + * state that is being written + */ + private static void copyPreconsensusEventStreamFiles( + @NonNull final PlatformContext platformContext, + @NonNull final NodeId selfId, + @NonNull final Path destinationDirectory, + final long minimumGenerationNonAncient) + throws IOException { + + final boolean copyPreconsensusStream = platformContext + .getConfiguration() + .getConfigData(PreconsensusEventStreamConfig.class) + .copyRecentStreamToStateSnapshots(); + if (!copyPreconsensusStream) { + // PCES copying is disabled + return; + } + + // The PCES files will be copied into this directory + final Path pcesDestination = + destinationDirectory.resolve("preconsensus-events").resolve(Long.toString(selfId.id())); + Files.createDirectories(pcesDestination); + + final List allFiles = gatherPreconsensusFilesOnDisk(selfId, platformContext); + if (allFiles.isEmpty()) { + return; + } + + // Sort by sequence number + Collections.sort(allFiles); + + // Discard all files that either have an incorrect origin or that do not contain non-ancient events. + final List filesToCopy = + getRequiredPreconsensusFiles(allFiles, minimumGenerationNonAncient); + if (filesToCopy.isEmpty()) { + return; + } + + copyPreconsensusFileList(filesToCopy, pcesDestination); + } + + /** + * Get the preconsensus files that we need to copy to a state. We need any file that has a matching origin and that + * contains non-ancient events (w.r.t. the state). + * + * @param allFiles all PCES files on disk + * @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the + * state that is being written + * @return the list of files to copy + */ + @NonNull + private static List getRequiredPreconsensusFiles( + @NonNull final List allFiles, final long minimumGenerationNonAncient) { + + final List filesToCopy = new ArrayList<>(); + final PreconsensusEventFile lastFile = allFiles.get(allFiles.size() - 1); + for (final PreconsensusEventFile file : allFiles) { + if (file.getOrigin() == lastFile.getOrigin() + && file.getMaximumGeneration() >= minimumGenerationNonAncient) { + filesToCopy.add(file); + } + } + + if (filesToCopy.isEmpty()) { + logger.warn( + STATE_TO_DISK.getMarker(), + "No preconsensus event files meeting specified criteria found to copy. " + + "Minimum generation non-ancient: {}", + minimumGenerationNonAncient); + } else if (filesToCopy.size() == 1) { + logger.info( + STATE_TO_DISK.getMarker(), + """ + Found 1 preconsensus event file meeting specified criteria to copy. + Minimum generation non-ancient: {} + File: {} + """, + minimumGenerationNonAncient, + filesToCopy.get(0).getPath()); + } else { + logger.info( + STATE_TO_DISK.getMarker(), + """ + Found {} preconsensus event files meeting specified criteria to copy. + Minimum generation non-ancient: {} + First file to copy: {} + Last file to copy: {} + """, + filesToCopy.size(), + minimumGenerationNonAncient, + filesToCopy.get(0).getPath(), + filesToCopy.get(filesToCopy.size() - 1).getPath()); + } + + return filesToCopy; + } + + /** + * Gather all PCES files on disk. + * + * @param selfId the id of this node + * @param platformContext the platform context + * @return a list of all PCES files on disk + */ + @NonNull + private static List gatherPreconsensusFilesOnDisk( + @NonNull final NodeId selfId, @NonNull final PlatformContext platformContext) throws IOException { + final List allFiles = new ArrayList<>(); + final Path preconsensusEventStreamDirectory = + PreconsensusEventFileManager.getDatabaseDirectory(platformContext, selfId); + try (final Stream stream = Files.walk(preconsensusEventStreamDirectory)) { + stream.filter(Files::isRegularFile).forEach(path -> { + try { + allFiles.add(PreconsensusEventFile.of(path)); + } catch (final IOException e) { + // Ignore, this will get thrown for each file that is not a PCES file + } + }); + } + + if (allFiles.isEmpty()) { + logger.warn(STATE_TO_DISK.getMarker(), "No preconsensus event files found to copy"); + } else if (allFiles.size() == 1) { + logger.info( + STATE_TO_DISK.getMarker(), + """ + Found 1 preconsensus file on disk. + File: {}""", + allFiles.get(0).getPath()); + } else { + logger.info( + STATE_TO_DISK.getMarker(), + """ + Found {} preconsensus files on disk. + First file: {} + Last file: {}""", + allFiles.size(), + allFiles.get(0).getPath(), + allFiles.get(allFiles.size() - 1).getPath()); + } + + return allFiles; + } + + /** + * Copy a list of preconsensus event files into a directory. + * + * @param filesToCopy the files to copy + * @param pcesDestination the directory where the files should be copied + */ + private static void copyPreconsensusFileList( + @NonNull final List filesToCopy, @NonNull final Path pcesDestination) { + logger.info( + STATE_TO_DISK.getMarker(), + "Copying {} preconsensus event files to state snapshot directory", + filesToCopy.size()); + + // Although the last file may be currently in the process of being written, all previous files will + // be closed and immutable and so it's safe to hard link them. + for (int index = 0; index < filesToCopy.size() - 1; index++) { + hardLinkPreconsensusFile(filesToCopy.get(index), pcesDestination); + } + + // The last file might be in the process of being written, so we need to do a deep copy of it. + deepCopyPreconsensusFile(filesToCopy.get(filesToCopy.size() - 1), pcesDestination); + + logger.info( + STATE_TO_DISK.getMarker(), + "Finished copying {} preconsensus event files to state snapshot directory", + filesToCopy.size()); + } + + /** + * Hard link a PCES file. + * + * @param file the file to link + * @param pcesDestination the directory where the file should be linked into + */ + private static void hardLinkPreconsensusFile( + @NonNull final PreconsensusEventFile file, @NonNull final Path pcesDestination) { + final Path destination = pcesDestination.resolve(file.getFileName()); + try { + Files.createLink(destination, file.getPath()); + } catch (final IOException e) { + logger.error( + EXCEPTION.getMarker(), + "Exception when hard linking preconsensus event file {} to {}", + file.getPath(), + destination, + e); + } + } + + /** + * Deep copy a PCES file. + * + * @param file the file to copy + * @param pcesDestination the directory where the file should be copied into + */ + private static void deepCopyPreconsensusFile( + @NonNull final PreconsensusEventFile file, @NonNull final Path pcesDestination) { + try { + Files.copy(file.getPath(), pcesDestination.resolve(file.getFileName())); + } catch (final IOException e) { + logger.error(EXCEPTION.getMarker(), "Exception when copying preconsensus event file", e); + } } /** @@ -177,23 +415,23 @@ private static void writeStateAddressBookFile(@NonNull final Path directory, @No * Writes a SignedState to a file. Also writes auxiliary files such as "settingsUsed.txt". This is the top level * method called by the platform when it is ready to write a state. * + * @param platformContext the platform context * @param selfId the id of the platform * @param savedStateDirectory the directory where the state will be stored * @param signedState the object to be written * @param stateToDiskReason the reason the state is being written to disk - * @param configuration the configuration for the platform */ public static void writeSignedStateToDisk( + @NonNull final PlatformContext platformContext, @Nullable final NodeId selfId, @NonNull final Path savedStateDirectory, @NonNull final SignedState signedState, - @Nullable final StateToDiskReason stateToDiskReason, - @NonNull final Configuration configuration) + @Nullable final StateToDiskReason stateToDiskReason) throws IOException { + Objects.requireNonNull(platformContext); Objects.requireNonNull(savedStateDirectory); Objects.requireNonNull(signedState); - Objects.requireNonNull(configuration); try { logger.info( @@ -205,7 +443,7 @@ public static void writeSignedStateToDisk( executeAndRename( savedStateDirectory, - directory -> writeSignedStateFilesToDirectory(selfId, directory, signedState, configuration)); + directory -> writeSignedStateFilesToDirectory(platformContext, selfId, directory, signedState)); logger.info(STATE_TO_DISK.getMarker(), () -> new StateSavedToDiskPayload( signedState.getRound(), diff --git a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/SignedStateFileReadWriteTest.java b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/SignedStateFileReadWriteTest.java index 4f0c98ee9ce5..badbef034dd5 100644 --- a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/SignedStateFileReadWriteTest.java +++ b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/SignedStateFileReadWriteTest.java @@ -40,6 +40,7 @@ import com.swirlds.common.config.StateConfig; import com.swirlds.common.constructable.ConstructableRegistry; import com.swirlds.common.constructable.ConstructableRegistryException; +import com.swirlds.common.context.PlatformContext; import com.swirlds.common.io.utility.TemporaryFileBuilder; import com.swirlds.common.merkle.crypto.MerkleCryptoFactory; import com.swirlds.common.merkle.utility.MerkleTreeVisualizer; @@ -148,8 +149,13 @@ void writeSavedStateToDiskTest() throws IOException { throwIfFileExists(stateFile, hashInfoFile, settingsUsedFile, directory); final Configuration configuration = changeConfigAndConfigHolder("data/saved"); + + final PlatformContext platformContext = TestPlatformContextBuilder.create() + .withConfiguration(configuration) + .build(); + writeSignedStateToDisk( - new NodeId(0), directory, signedState, StateToDiskReason.PERIODIC_SNAPSHOT, configuration); + platformContext, new NodeId(0), directory, signedState, StateToDiskReason.PERIODIC_SNAPSHOT); assertTrue(exists(stateFile), "state file should exist"); assertTrue(exists(hashInfoFile), "hash info file should exist"); diff --git a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/signed/StartupStateUtilsTests.java b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/signed/StartupStateUtilsTests.java index d9de90e86ef4..51ed8452aa3f 100644 --- a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/signed/StartupStateUtilsTests.java +++ b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/signed/StartupStateUtilsTests.java @@ -136,11 +136,7 @@ private SignedState writeState( final Path savedStateDirectory = getSignedStateDirectory(mainClassName, selfId, swirldName, round); writeSignedStateToDisk( - selfId, - savedStateDirectory, - signedState, - StateToDiskReason.PERIODIC_SNAPSHOT, - platformContext.getConfiguration()); + platformContext, selfId, savedStateDirectory, signedState, StateToDiskReason.PERIODIC_SNAPSHOT); if (corrupted) { final Path stateFilePath = savedStateDirectory.resolve("SignedState.swh");