Skip to content

Commit

Permalink
signed state file manager component (#9933)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <lpetrovic05@gmail.com>
  • Loading branch information
lpetrovic05 committed Nov 21, 2023
1 parent fe5882f commit 7055bf6
Show file tree
Hide file tree
Showing 23 changed files with 762 additions and 1,172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,12 @@ public class StateWriteToDiskCompleteNotification extends AbstractNotification {

private final long roundNumber;
private final Instant consensusTimestamp;
private final SwirldState state;
private final Path folder;
private final boolean isFreezeState;

public StateWriteToDiskCompleteNotification(
final long roundNumber,
final Instant consensusTimestamp,
final SwirldState state,
final Path folder,
final boolean isFreezeState) {
final long roundNumber, final Instant consensusTimestamp, final boolean isFreezeState) {
this.roundNumber = roundNumber;
this.consensusTimestamp = consensusTimestamp;
this.state = state;
this.folder = folder;
this.isFreezeState = isFreezeState;
}

Expand All @@ -64,21 +56,23 @@ public Instant getConsensusTimestamp() {
}

/**
* Gets the {@link SwirldState} instance that was sigend and saved to disk.
*
* @return the signed {@link SwirldState} instance
* Deprecated method, always returns null
* @return null
* @deprecated used by PTT for an obsolete feature
*/
@Deprecated(forRemoval = true)
public SwirldState getState() {
return state;
return null;
}

/**
* Gets the path where the signed state was written to disk.
*
* @return the path containing the saved state
* Deprecated method, always returns null
* @return null
* @deprecated used by PTT for an obsolete feature
*/
@Deprecated(forRemoval = true)
public Path getFolder() {
return folder;
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public BindableInputWire<IN, OUT> bind(@NonNull final Function<IN, OUT> handler)
taskSchedulerInput.forward(output);
}
});
model.registerInputWireBinding(taskSchedulerName, getName());

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.swirlds.platform.event.creation.EventCreationManagerFactory.buildEventCreationManager;
import static com.swirlds.platform.state.address.AddressBookMetrics.registerAddressBookMetrics;
import static com.swirlds.platform.state.iss.ConsensusHashManager.DO_NOT_IGNORE_ROUNDS;
import static com.swirlds.platform.state.signed.SignedStateFileReader.getSavedStateFiles;

import com.swirlds.base.state.Startable;
import com.swirlds.base.time.Time;
Expand Down Expand Up @@ -81,9 +82,13 @@
import com.swirlds.common.utility.Clearable;
import com.swirlds.common.utility.LoggingClearables;
import com.swirlds.common.utility.StackTrace;
import com.swirlds.common.wiring.TaskScheduler;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.model.WiringModel;
import com.swirlds.logging.legacy.LogMarker;
import com.swirlds.logging.legacy.payload.FatalErrorPayload;
import com.swirlds.platform.components.EventIntake;
import com.swirlds.platform.components.SavedStateController;
import com.swirlds.platform.components.appcomm.AppCommunicationComponent;
import com.swirlds.platform.components.state.DefaultStateManagementComponent;
import com.swirlds.platform.components.state.StateManagementComponent;
Expand Down Expand Up @@ -155,15 +160,20 @@
import com.swirlds.platform.state.iss.IssHandler;
import com.swirlds.platform.state.iss.IssScratchpad;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SavedStateInfo;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.state.signed.SignedStateFileManager;
import com.swirlds.platform.state.signed.SignedStateManager;
import com.swirlds.platform.state.signed.SignedStateMetrics;
import com.swirlds.platform.state.signed.SourceOfSignedState;
import com.swirlds.platform.state.signed.StartupStateUtils;
import com.swirlds.platform.state.signed.StateSavingResult;
import com.swirlds.platform.state.signed.StateToDiskReason;
import com.swirlds.platform.stats.StatConstructor;
import com.swirlds.platform.system.Shutdown;
import com.swirlds.platform.threading.PauseAndLoad;
import com.swirlds.platform.util.PlatformComponents;
import com.swirlds.platform.wiring.SignedStateFileManagerWiring;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
Expand Down Expand Up @@ -460,24 +470,53 @@ public class SwirldsPlatform implements Platform {

components.add(new IssMetrics(platformContext.getMetrics(), currentAddressBook));

// Manages the pipeline of signed states to be written to disk
final SignedStateFileManager signedStateFileManager = new SignedStateFileManager(
platformContext,
new SignedStateMetrics(platformContext.getMetrics()),
Time.getCurrent(),
actualMainClassName,
selfId,
swirldName);
// FUTURE WORK: at some point this should be part of the unified platform wiring
final WiringModel model = WiringModel.create(platformContext, Time.getCurrent());
components.add(model);
final TaskScheduler<StateSavingResult> savedStateScheduler = model.schedulerBuilder("signed_state_file_manager")
.withType(TaskSchedulerType.SEQUENTIAL_THREAD)
.withUnhandledTaskCapacity(stateConfig.stateSavingQueueSize())
.build()
.cast();
final SignedStateFileManagerWiring signedStateFileManagerWiring =
new SignedStateFileManagerWiring(savedStateScheduler);
signedStateFileManagerWiring.bind(signedStateFileManager);
signedStateFileManagerWiring.solderPces(preconsensusEventWriter);
signedStateFileManagerWiring.solderStatusManager(platformStatusManager);
signedStateFileManagerWiring.solderAppCommunication(appCommunicationComponent);

final SavedStateController savedStateController =
new SavedStateController(stateConfig, signedStateFileManagerWiring.saveStateToDisk()::offer);

stateManagementComponent = new DefaultStateManagementComponent(
platformContext,
threadManager,
dispatchBuilder,
getAddressBook(),
new PlatformSigner(keysAndCerts),
actualMainClassName,
selfId,
swirldName,
txn -> this.createSystemTransaction(txn, true),
appCommunicationComponent,
appCommunicationComponent,
appCommunicationComponent,
this::haltRequested,
this::handleFatalError,
preconsensusEventWriter,
platformStatusManager,
platformStatusManager);
savedStateController,
signedStateFileManagerWiring.dumpStateToDisk()::put);

// Load the minimum generation into the pre-consensus event writer
final List<SavedStateInfo> savedStates = getSavedStateFiles(actualMainClassName, selfId, swirldName);
if (!savedStates.isEmpty()) {
// The minimum generation of non-ancient events for the oldest state snapshot on disk.
final long minimumGenerationNonAncientForOldestState =
savedStates.get(savedStates.size() - 1).metadata().minimumGenerationNonAncient();
preconsensusEventWriter.setMinimumGenerationToStoreUninterruptably(
minimumGenerationNonAncientForOldestState);
}

components.add(stateManagementComponent);

Expand Down Expand Up @@ -671,8 +710,7 @@ public class SwirldsPlatform implements Platform {
intakeQueue,
eventObserverDispatcher,
platformStatusManager::getCurrentStatus,
latestReconnectRound::get,
stateManagementComponent::getLatestSavedStateRound);
latestReconnectRound::get);

transactionSubmitter = new SwirldTransactionSubmitter(
platformStatusManager::getCurrentStatus,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.components;

import static com.swirlds.logging.legacy.LogMarker.STATE_TO_DISK;
import static com.swirlds.platform.state.signed.StateToDiskReason.FIRST_ROUND_AFTER_GENESIS;
import static com.swirlds.platform.state.signed.StateToDiskReason.FREEZE_STATE;
import static com.swirlds.platform.state.signed.StateToDiskReason.PERIODIC_SNAPSHOT;
import static com.swirlds.platform.state.signed.StateToDiskReason.RECONNECT;

import com.swirlds.base.function.BooleanFunction;
import com.swirlds.common.config.StateConfig;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.state.signed.StateToDiskReason;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Instant;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Controls which signed states should be written to disk based on input from other components
*/
public class SavedStateController {
private static final Logger logger = LogManager.getLogger(SavedStateController.class);
/**
* The timestamp of the signed state that was most recently written to disk, or null if no timestamp was recently
* written to disk.
*/
private Instant previousSavedStateTimestamp;
/** the state config */
private final StateConfig stateConfig;
/** a function that writes a signed state to disk asynchronously */
private final BooleanFunction<ReservedSignedState> stateWrite;

/**
* Create a new SavedStateController
*
* @param stateConfig the state config
* @param stateWrite a function that writes a signed state to disk asynchronously
*/
public SavedStateController(
@NonNull final StateConfig stateConfig, @NonNull final BooleanFunction<ReservedSignedState> stateWrite) {
this.stateConfig = Objects.requireNonNull(stateConfig);
this.stateWrite = Objects.requireNonNull(stateWrite);
}

/**
* Determine if a signed state should be written to disk. If the state should be written, the state will be passed
* on to the writer to be written asynchronously.
*
* @param signedState the signed state in question
*/
public synchronized void maybeSaveState(@NonNull final SignedState signedState) {

final StateToDiskReason reason = shouldSaveToDisk(signedState, previousSavedStateTimestamp);

if (reason != null) {
saveToDisk(signedState.reserve("saving to disk"), reason);
}
// if a null reason is returned, then there isn't anything to do, since the state shouldn't be saved
}

/**
* Notifies the controller that a signed state was received from another node during reconnect. The controller saves
* its timestamp and pass it on to be written to disk.
*
* @param signedState the signed state that was received from another node during reconnect
*/
public synchronized void reconnectStateReceived(@NonNull final SignedState signedState) {
saveToDisk(signedState.reserve("saving to disk after reconnect"), RECONNECT);
}

/**
* This should be called at boot time when a signed state is read from the disk.
*
* @param signedState the signed state that was read from file at boot time
*/
public synchronized void registerSignedStateFromDisk(@NonNull final SignedState signedState) {
previousSavedStateTimestamp = signedState.getConsensusTimestamp();
}

private void saveToDisk(@NonNull final ReservedSignedState state, @NonNull final StateToDiskReason reason) {
final SignedState signedState = state.get();
logger.info(
STATE_TO_DISK.getMarker(),
"Signed state from round {} created, will eventually be written to disk, for reason: {}",
signedState.getRound(),
reason);

previousSavedStateTimestamp = signedState.getConsensusTimestamp();
signedState.markAsStateToSave(reason);
final boolean accepted = stateWrite.apply(state);

if (!accepted) {
logger.error(
STATE_TO_DISK.getMarker(),
"Unable to save signed state to disk for round {} due to backlog of "
+ "operations in the SignedStateManager task queue.",
signedState.getRound());

state.close();
}
}

/**
* Determines whether a signed state should eventually be written to disk
* <p>
* If it is determined that the state should be written to disk, this method returns the reason why
* <p>
* If it is determined that the state shouldn't be written to disk, then this method returns null
*
* @param signedState the state in question
* @param previousTimestamp the timestamp of the previous state that was saved to disk, or null if no previous state
* was saved to disk
* @return the reason why the state should be written to disk, or null if it shouldn't be written to disk
*/
@Nullable
private StateToDiskReason shouldSaveToDisk(
@NonNull final SignedState signedState, @Nullable final Instant previousTimestamp) {

if (signedState.isFreezeState()) {
// the state right before a freeze should be written to disk
return FREEZE_STATE;
}

final int saveStatePeriod = stateConfig.saveStatePeriod();
if (saveStatePeriod <= 0) {
// periodic state saving is disabled
return null;
}

// FUTURE WORK: writing genesis state to disk is currently disabled if the saveStatePeriod is 0.
// This is for testing purposes, to have a method of disabling state saving for tests.
// Once a feature to disable all state saving has been added, this block should be moved in front of the
// saveStatePeriod <=0 block, so that saveStatePeriod doesn't impact the saving of genesis state.
if (previousTimestamp == null) {
// the first round should be saved
return FIRST_ROUND_AFTER_GENESIS;
}

if ((signedState.getConsensusTimestamp().getEpochSecond() / saveStatePeriod)
> (previousTimestamp.getEpochSecond() / saveStatePeriod)) {
return PERIODIC_SNAPSHOT;
} else {
// the period hasn't yet elapsed
return null;
}
}
}
Loading

0 comments on commit 7055bf6

Please sign in to comment.