Skip to content

Commit

Permalink
chore: introduce latest complete state nexus (#10134)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <lpetrovic05@gmail.com>
  • Loading branch information
lpetrovic05 authored and dikel committed Dec 12, 2023
1 parent bd04743 commit a48901b
Show file tree
Hide file tree
Showing 41 changed files with 698 additions and 683 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,6 @@ public <T extends SwirldState> AutoCloseableWrapper<T> getLatestImmutableState(@
return null;
}

@Override
public <T extends SwirldState> AutoCloseableWrapper<T> getLatestSignedState(@NonNull String s) {
return null;
}

@Override
public boolean createTransaction(@NonNull byte[] bytes) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.swirlds.platform.components.appcomm.AppCommunicationComponent;
import com.swirlds.platform.components.state.DefaultStateManagementComponent;
import com.swirlds.platform.components.state.StateManagementComponent;
import com.swirlds.platform.components.state.output.NewLatestCompleteStateConsumer;
import com.swirlds.platform.components.transaction.system.ConsensusSystemTransactionManager;
import com.swirlds.platform.components.transaction.system.PreconsensusSystemTransactionManager;
import com.swirlds.platform.config.ThreadConfig;
Expand Down Expand Up @@ -151,6 +152,7 @@
import com.swirlds.platform.state.iss.IssHandler;
import com.swirlds.platform.state.iss.IssScratchpad;
import com.swirlds.platform.state.nexus.EmergencyStateNexus;
import com.swirlds.platform.state.nexus.LatestCompleteStateNexus;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SavedStateInfo;
import com.swirlds.platform.state.signed.SignedState;
Expand Down Expand Up @@ -511,16 +513,25 @@ public class SwirldsPlatform implements Platform {

platformWiring.bind(signedStateFileManager);

final LatestCompleteStateNexus latestCompleteState =
new LatestCompleteStateNexus(stateConfig, platformContext.getMetrics());
final SavedStateController savedStateController =
new SavedStateController(stateConfig, platformWiring.getSaveStateToDiskInput()::offer);
final NewLatestCompleteStateConsumer newLatestCompleteStateConsumer = ss -> {
// the app comm component will reserve the state, this should be done by the wiring in the future
appCommunicationComponent.newLatestCompleteStateEvent(ss);
// the nexus expects a state to be reserved for it
// in the future, all of these reservations will be done by the wiring
latestCompleteState.setState(ss.reserve("setting latest complete state"));
};

stateManagementComponent = new DefaultStateManagementComponent(
platformContext,
threadManager,
dispatchBuilder,
new PlatformSigner(keysAndCerts),
txn -> this.createSystemTransaction(txn, true),
appCommunicationComponent,
newLatestCompleteStateConsumer,
this::handleFatalError,
platformStatusManager,
savedStateController,
Expand Down Expand Up @@ -599,11 +610,16 @@ public class SwirldsPlatform implements Platform {
initialState.getState(),
appVersion);

final InterruptableConsumer<ReservedSignedState> newSignedStateFromTransactionsConsumer = rs -> {
latestCompleteState.newIncompleteState(rs.get().getRound());
stateManagementComponent.newSignedStateFromTransactions(rs);
};

stateHashSignQueue = components.add(new QueueThreadConfiguration<ReservedSignedState>(threadManager)
.setNodeId(selfId)
.setComponent(PLATFORM_THREAD_POOL_NAME)
.setThreadName("state-hash-sign")
.setHandler(stateManagementComponent::newSignedStateFromTransactions)
.setHandler(newSignedStateFromTransactionsConsumer)
.setCapacity(1)
.setMetricsConfiguration(new QueueThreadMetricsConfiguration(metrics).enableBusyTimeMetric())
.build());
Expand Down Expand Up @@ -806,7 +822,7 @@ public class SwirldsPlatform implements Platform {
consensusRef,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
eventValidator,
eventObserverDispatcher,
syncMetrics,
Expand Down Expand Up @@ -891,6 +907,7 @@ public class SwirldsPlatform implements Platform {
GuiPlatformAccessor.getInstance().setShadowGraph(selfId, shadowGraph);
GuiPlatformAccessor.getInstance().setStateManagementComponent(selfId, stateManagementComponent);
GuiPlatformAccessor.getInstance().setConsensusReference(selfId, consensusRef);
GuiPlatformAccessor.getInstance().setLatestCompleteStateComponent(selfId, latestCompleteState);
}

/**
Expand Down Expand Up @@ -1364,17 +1381,6 @@ public <T extends SwirldState> AutoCloseableWrapper<T> getLatestImmutableState(@
wrapper.isNull() ? null : (T) wrapper.get().getState().getSwirldState(), wrapper::close);
}

/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public <T extends SwirldState> AutoCloseableWrapper<T> getLatestSignedState(@NonNull final String reason) {
final ReservedSignedState wrapper = stateManagementComponent.getLatestSignedState(reason);
return new AutoCloseableWrapper<>(
wrapper.isNull() ? null : (T) wrapper.get().getState().getSwirldState(), wrapper::close);
}

/**
* check whether the given event is the last event in its round, and the platform enters freeze period
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package com.swirlds.platform.components.state;

import static com.swirlds.common.metrics.Metrics.PLATFORM_CATEGORY;
import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
import static com.swirlds.logging.legacy.LogMarker.STATE_TO_DISK;

import com.swirlds.base.time.Time;
import com.swirlds.common.config.StateConfig;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.crypto.Signature;
import com.swirlds.common.metrics.RunningAverageMetric;
import com.swirlds.common.stream.HashSigner;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.components.SavedStateController;
Expand Down Expand Up @@ -103,11 +101,6 @@ public class DefaultStateManagementComponent implements StateManagementComponent
private final SavedStateController savedStateController;
private final Consumer<StateDumpRequest> stateDumpConsumer;

private static final RunningAverageMetric.Config AVG_ROUND_SUPERMAJORITY_CONFIG = new RunningAverageMetric.Config(
PLATFORM_CATEGORY, "roundSup")
.withDescription("latest round with state signed by a supermajority")
.withUnit("round");

/**
* @param platformContext the platform context
* @param threadManager manages platform thread resources
Expand Down Expand Up @@ -162,10 +155,6 @@ public DefaultStateManagementComponent(
newLatestCompleteStateConsumer,
this::stateHasEnoughSignatures,
this::stateLacksSignatures);

final RunningAverageMetric avgRoundSupermajority =
platformContext.getMetrics().getOrCreate(AVG_ROUND_SUPERMAJORITY_CONFIG);
platformContext.getMetrics().addUpdater(() -> avgRoundSupermajority.update(getLastCompleteRound()));
}

/**
Expand Down Expand Up @@ -245,15 +234,6 @@ public void newSignedStateFromTransactions(@NonNull final ReservedSignedState si
}
}

/**
* {@inheritDoc}
*/
@Override
@NonNull
public ReservedSignedState getLatestSignedState(@NonNull final String reason) {
return signedStateManager.getLatestSignedState(reason);
}

/**
* {@inheritDoc}
*/
Expand All @@ -262,14 +242,6 @@ public ReservedSignedState getLatestImmutableState(@NonNull final String reason)
return signedStateManager.getLatestImmutableState(reason);
}

/**
* {@inheritDoc}
*/
@Override
public long getLastCompleteRound() {
return signedStateManager.getLastCompleteRound();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.swirlds.platform.components.PlatformComponent;
import com.swirlds.platform.components.common.output.NewSignedStateFromTransactionsConsumer;
import com.swirlds.platform.components.common.output.SignedStateToLoadConsumer;
import com.swirlds.platform.components.state.query.LatestSignedStateProvider;
import com.swirlds.platform.state.signed.*;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SignedStateFinder;
import com.swirlds.platform.state.signed.SignedStateInfo;
import com.swirlds.platform.state.signed.SignedStateManager;
import com.swirlds.platform.state.signed.StateToDiskReason;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Instant;
Expand All @@ -41,8 +44,7 @@ public interface StateManagementComponent
extends PlatformComponent,
SignedStateFinder,
SignedStateToLoadConsumer,
NewSignedStateFromTransactionsConsumer,
LatestSignedStateProvider {
NewSignedStateFromTransactionsConsumer {

/**
* Get a reserved instance of the latest immutable signed state. May be unhashed, may or may not have all required
Expand All @@ -54,13 +56,6 @@ public interface StateManagementComponent
*/
ReservedSignedState getLatestImmutableState(@NonNull final String reason);

/**
* Returns the latest round for which there is a complete signed state.
*
* @return the latest round number
*/
long getLastCompleteRound();

/**
* Get the latest signed states stored by this component. This method creates a copy, so no changes to the array
* will be made.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private void handleTransaction(@NonNull final NodeId creatorId, @NonNull final S
} catch (final RuntimeException e) {
logger.error(
EXCEPTION.getMarker(),
"Error while handling system transaction preconsensus: handler: {}, id: {}, transaction: {}, error: {}",
"Error while handling system transaction preconsensus: handler: {}, id: {}, transaction: {}",
handler,
creatorId,
transaction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.swirlds.common.threading.framework.QueueThread;
import com.swirlds.common.threading.framework.config.StoppableThreadConfiguration;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.components.state.StateManagementComponent;
import com.swirlds.platform.config.ThreadConfig;
import com.swirlds.platform.crypto.KeysAndCerts;
import com.swirlds.platform.event.GossipEvent;
Expand All @@ -57,6 +56,7 @@
import com.swirlds.platform.reconnect.ReconnectLearnerThrottle;
import com.swirlds.platform.reconnect.ReconnectThrottle;
import com.swirlds.platform.state.SwirldStateManager;
import com.swirlds.platform.state.nexus.SignedStateNexus;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.system.PlatformConstructionException;
import com.swirlds.platform.system.SoftwareVersion;
Expand Down Expand Up @@ -124,7 +124,7 @@ public abstract class AbstractGossip implements ConnectionTracker, Gossip {
* @param appVersion the version of the app
* @param intakeQueue the event intake queue
* @param swirldStateManager manages the mutable state
* @param stateManagementComponent manages the lifecycle of the state queue
* @param latestCompleteState holds the latest signed state that has enough signatures to be verifiable
* @param syncMetrics metrics for sync
* @param statusActionSubmitter enables submitting platform status actions
* @param loadReconnectState a method that should be called when a state from reconnect is obtained
Expand All @@ -140,7 +140,7 @@ protected AbstractGossip(
@NonNull final SoftwareVersion appVersion,
@NonNull final QueueThread<GossipEvent> intakeQueue,
@NonNull final SwirldStateManager swirldStateManager,
@NonNull final StateManagementComponent stateManagementComponent,
@NonNull final SignedStateNexus latestCompleteState,
@NonNull final SyncMetrics syncMetrics,
@NonNull final StatusActionSubmitter statusActionSubmitter,
@NonNull final Consumer<SignedState> loadReconnectState,
Expand Down Expand Up @@ -214,7 +214,7 @@ protected AbstractGossip(
this::pause,
clearAllPipelinesForReconnect::run,
swirldStateManager::getConsensusState,
stateManagementComponent::getLastCompleteRound,
latestCompleteState::getRound,
new ReconnectLearnerThrottle(time, selfId, reconnectConfig),
loadReconnectState,
new ReconnectLearnerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.swirlds.common.threading.framework.QueueThread;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.Consensus;
import com.swirlds.platform.components.state.StateManagementComponent;
import com.swirlds.platform.crypto.KeysAndCerts;
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.event.linking.EventLinker;
Expand All @@ -40,6 +39,7 @@
import com.swirlds.platform.observers.EventObserverDispatcher;
import com.swirlds.platform.recovery.EmergencyRecoveryManager;
import com.swirlds.platform.state.SwirldStateManager;
import com.swirlds.platform.state.nexus.SignedStateNexus;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.system.SoftwareVersion;
Expand Down Expand Up @@ -80,7 +80,7 @@ private GossipFactory() {}
* @param consensusRef a pointer to consensus
* @param intakeQueue the event intake queue
* @param swirldStateManager manages the mutable state
* @param stateManagementComponent manages the lifecycle of the state
* @param latestCompleteState holds the latest signed state that has enough signatures to be verifiable
* @param eventValidator validates events and passes valid events further along the intake pipeline
* @param eventObserverDispatcher the object used to wire event intake
* @param syncMetrics metrics for sync
Expand All @@ -107,7 +107,7 @@ public static Gossip buildGossip(
@NonNull final AtomicReference<Consensus> consensusRef,
@NonNull final QueueThread<GossipEvent> intakeQueue,
@NonNull final SwirldStateManager swirldStateManager,
@NonNull final StateManagementComponent stateManagementComponent,
@NonNull final SignedStateNexus latestCompleteState,
@NonNull final EventValidator eventValidator,
@NonNull final EventObserverDispatcher eventObserverDispatcher,
@NonNull final SyncMetrics syncMetrics,
Expand All @@ -131,7 +131,7 @@ public static Gossip buildGossip(
Objects.requireNonNull(consensusRef);
Objects.requireNonNull(intakeQueue);
Objects.requireNonNull(swirldStateManager);
Objects.requireNonNull(stateManagementComponent);
Objects.requireNonNull(latestCompleteState);
Objects.requireNonNull(eventValidator);
Objects.requireNonNull(eventObserverDispatcher);
Objects.requireNonNull(syncMetrics);
Expand Down Expand Up @@ -160,7 +160,7 @@ public static Gossip buildGossip(
consensusRef,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
eventValidator,
eventObserverDispatcher,
syncMetrics,
Expand All @@ -183,7 +183,7 @@ public static Gossip buildGossip(
shadowGraph,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
syncMetrics,
platformStatusManager,
loadReconnectState,
Expand All @@ -205,7 +205,7 @@ public static Gossip buildGossip(
consensusRef,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
syncMetrics,
eventLinker,
platformStatusManager,
Expand Down

0 comments on commit a48901b

Please sign in to comment.