Skip to content

Commit

Permalink
chore: introduce latest complete state nexus (#10134)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <lpetrovic05@gmail.com>
  • Loading branch information
lpetrovic05 committed Dec 12, 2023
1 parent 15e30f9 commit d971a0f
Show file tree
Hide file tree
Showing 41 changed files with 698 additions and 683 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,6 @@ public <T extends SwirldState> AutoCloseableWrapper<T> getLatestImmutableState(@
return null;
}

@Override
public <T extends SwirldState> AutoCloseableWrapper<T> getLatestSignedState(@NonNull String s) {
return null;
}

@Override
public boolean createTransaction(@NonNull byte[] bytes) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.swirlds.platform.components.appcomm.AppCommunicationComponent;
import com.swirlds.platform.components.state.DefaultStateManagementComponent;
import com.swirlds.platform.components.state.StateManagementComponent;
import com.swirlds.platform.components.state.output.NewLatestCompleteStateConsumer;
import com.swirlds.platform.components.transaction.system.ConsensusSystemTransactionManager;
import com.swirlds.platform.components.transaction.system.PreconsensusSystemTransactionManager;
import com.swirlds.platform.config.ThreadConfig;
Expand Down Expand Up @@ -151,6 +152,7 @@
import com.swirlds.platform.state.iss.IssHandler;
import com.swirlds.platform.state.iss.IssScratchpad;
import com.swirlds.platform.state.nexus.EmergencyStateNexus;
import com.swirlds.platform.state.nexus.LatestCompleteStateNexus;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SavedStateInfo;
import com.swirlds.platform.state.signed.SignedState;
Expand Down Expand Up @@ -511,16 +513,25 @@ public class SwirldsPlatform implements Platform {

platformWiring.bind(signedStateFileManager);

final LatestCompleteStateNexus latestCompleteState =
new LatestCompleteStateNexus(stateConfig, platformContext.getMetrics());

Check warning on line 517 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L516-L517

Added lines #L516 - L517 were not covered by tests
final SavedStateController savedStateController =
new SavedStateController(stateConfig, platformWiring.getSaveStateToDiskInput()::offer);
final NewLatestCompleteStateConsumer newLatestCompleteStateConsumer = ss -> {

Check warning on line 520 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L520

Added line #L520 was not covered by tests
// the app comm component will reserve the state, this should be done by the wiring in the future
appCommunicationComponent.newLatestCompleteStateEvent(ss);

Check warning on line 522 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L522

Added line #L522 was not covered by tests
// the nexus expects a state to be reserved for it
// in the future, all of these reservations will be done by the wiring
latestCompleteState.setState(ss.reserve("setting latest complete state"));
};

Check warning on line 526 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L525-L526

Added lines #L525 - L526 were not covered by tests

stateManagementComponent = new DefaultStateManagementComponent(
platformContext,
threadManager,
dispatchBuilder,
new PlatformSigner(keysAndCerts),
txn -> this.createSystemTransaction(txn, true),
appCommunicationComponent,
newLatestCompleteStateConsumer,
this::handleFatalError,
platformStatusManager,
savedStateController,
Expand Down Expand Up @@ -599,11 +610,16 @@ public class SwirldsPlatform implements Platform {
initialState.getState(),
appVersion);

final InterruptableConsumer<ReservedSignedState> newSignedStateFromTransactionsConsumer = rs -> {
latestCompleteState.newIncompleteState(rs.get().getRound());
stateManagementComponent.newSignedStateFromTransactions(rs);
};

Check warning on line 616 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L613-L616

Added lines #L613 - L616 were not covered by tests

stateHashSignQueue = components.add(new QueueThreadConfiguration<ReservedSignedState>(threadManager)
.setNodeId(selfId)
.setComponent(PLATFORM_THREAD_POOL_NAME)
.setThreadName("state-hash-sign")
.setHandler(stateManagementComponent::newSignedStateFromTransactions)
.setHandler(newSignedStateFromTransactionsConsumer)

Check warning on line 622 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L622

Added line #L622 was not covered by tests
.setCapacity(1)
.setMetricsConfiguration(new QueueThreadMetricsConfiguration(metrics).enableBusyTimeMetric())
.build());
Expand Down Expand Up @@ -806,7 +822,7 @@ public class SwirldsPlatform implements Platform {
consensusRef,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
eventValidator,
eventObserverDispatcher,
syncMetrics,
Expand Down Expand Up @@ -891,6 +907,7 @@ public class SwirldsPlatform implements Platform {
GuiPlatformAccessor.getInstance().setShadowGraph(selfId, shadowGraph);
GuiPlatformAccessor.getInstance().setStateManagementComponent(selfId, stateManagementComponent);
GuiPlatformAccessor.getInstance().setConsensusReference(selfId, consensusRef);
GuiPlatformAccessor.getInstance().setLatestCompleteStateComponent(selfId, latestCompleteState);

Check warning on line 910 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L910

Added line #L910 was not covered by tests
}

/**
Expand Down Expand Up @@ -1364,17 +1381,6 @@ public <T extends SwirldState> AutoCloseableWrapper<T> getLatestImmutableState(@
wrapper.isNull() ? null : (T) wrapper.get().getState().getSwirldState(), wrapper::close);
}

/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public <T extends SwirldState> AutoCloseableWrapper<T> getLatestSignedState(@NonNull final String reason) {
final ReservedSignedState wrapper = stateManagementComponent.getLatestSignedState(reason);
return new AutoCloseableWrapper<>(
wrapper.isNull() ? null : (T) wrapper.get().getState().getSwirldState(), wrapper::close);
}

/**
* check whether the given event is the last event in its round, and the platform enters freeze period
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package com.swirlds.platform.components.state;

import static com.swirlds.common.metrics.Metrics.PLATFORM_CATEGORY;
import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
import static com.swirlds.logging.legacy.LogMarker.STATE_TO_DISK;

import com.swirlds.base.time.Time;
import com.swirlds.common.config.StateConfig;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.crypto.Signature;
import com.swirlds.common.metrics.RunningAverageMetric;
import com.swirlds.common.stream.HashSigner;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.components.SavedStateController;
Expand Down Expand Up @@ -103,11 +101,6 @@ public class DefaultStateManagementComponent implements StateManagementComponent
private final SavedStateController savedStateController;
private final Consumer<StateDumpRequest> stateDumpConsumer;

private static final RunningAverageMetric.Config AVG_ROUND_SUPERMAJORITY_CONFIG = new RunningAverageMetric.Config(
PLATFORM_CATEGORY, "roundSup")
.withDescription("latest round with state signed by a supermajority")
.withUnit("round");

/**
* @param platformContext the platform context
* @param threadManager manages platform thread resources
Expand Down Expand Up @@ -162,10 +155,6 @@ public DefaultStateManagementComponent(
newLatestCompleteStateConsumer,
this::stateHasEnoughSignatures,
this::stateLacksSignatures);

final RunningAverageMetric avgRoundSupermajority =
platformContext.getMetrics().getOrCreate(AVG_ROUND_SUPERMAJORITY_CONFIG);
platformContext.getMetrics().addUpdater(() -> avgRoundSupermajority.update(getLastCompleteRound()));
}

/**
Expand Down Expand Up @@ -245,15 +234,6 @@ public void newSignedStateFromTransactions(@NonNull final ReservedSignedState si
}
}

/**
* {@inheritDoc}
*/
@Override
@NonNull
public ReservedSignedState getLatestSignedState(@NonNull final String reason) {
return signedStateManager.getLatestSignedState(reason);
}

/**
* {@inheritDoc}
*/
Expand All @@ -262,14 +242,6 @@ public ReservedSignedState getLatestImmutableState(@NonNull final String reason)
return signedStateManager.getLatestImmutableState(reason);
}

/**
* {@inheritDoc}
*/
@Override
public long getLastCompleteRound() {
return signedStateManager.getLastCompleteRound();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.swirlds.platform.components.PlatformComponent;
import com.swirlds.platform.components.common.output.NewSignedStateFromTransactionsConsumer;
import com.swirlds.platform.components.common.output.SignedStateToLoadConsumer;
import com.swirlds.platform.components.state.query.LatestSignedStateProvider;
import com.swirlds.platform.state.signed.*;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SignedStateFinder;
import com.swirlds.platform.state.signed.SignedStateInfo;
import com.swirlds.platform.state.signed.SignedStateManager;
import com.swirlds.platform.state.signed.StateToDiskReason;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Instant;
Expand All @@ -41,8 +44,7 @@ public interface StateManagementComponent
extends PlatformComponent,
SignedStateFinder,
SignedStateToLoadConsumer,
NewSignedStateFromTransactionsConsumer,
LatestSignedStateProvider {
NewSignedStateFromTransactionsConsumer {

/**
* Get a reserved instance of the latest immutable signed state. May be unhashed, may or may not have all required
Expand All @@ -54,13 +56,6 @@ public interface StateManagementComponent
*/
ReservedSignedState getLatestImmutableState(@NonNull final String reason);

/**
* Returns the latest round for which there is a complete signed state.
*
* @return the latest round number
*/
long getLastCompleteRound();

/**
* Get the latest signed states stored by this component. This method creates a copy, so no changes to the array
* will be made.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private void handleTransaction(@NonNull final NodeId creatorId, @NonNull final S
} catch (final RuntimeException e) {
logger.error(
EXCEPTION.getMarker(),
"Error while handling system transaction preconsensus: handler: {}, id: {}, transaction: {}, error: {}",
"Error while handling system transaction preconsensus: handler: {}, id: {}, transaction: {}",
handler,
creatorId,
transaction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.swirlds.common.threading.framework.QueueThread;
import com.swirlds.common.threading.framework.config.StoppableThreadConfiguration;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.components.state.StateManagementComponent;
import com.swirlds.platform.config.ThreadConfig;
import com.swirlds.platform.crypto.KeysAndCerts;
import com.swirlds.platform.event.GossipEvent;
Expand All @@ -57,6 +56,7 @@
import com.swirlds.platform.reconnect.ReconnectLearnerThrottle;
import com.swirlds.platform.reconnect.ReconnectThrottle;
import com.swirlds.platform.state.SwirldStateManager;
import com.swirlds.platform.state.nexus.SignedStateNexus;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.system.PlatformConstructionException;
import com.swirlds.platform.system.SoftwareVersion;
Expand Down Expand Up @@ -124,7 +124,7 @@ public abstract class AbstractGossip implements ConnectionTracker, Gossip {
* @param appVersion the version of the app
* @param intakeQueue the event intake queue
* @param swirldStateManager manages the mutable state
* @param stateManagementComponent manages the lifecycle of the state queue
* @param latestCompleteState holds the latest signed state that has enough signatures to be verifiable
* @param syncMetrics metrics for sync
* @param statusActionSubmitter enables submitting platform status actions
* @param loadReconnectState a method that should be called when a state from reconnect is obtained
Expand All @@ -140,7 +140,7 @@ protected AbstractGossip(
@NonNull final SoftwareVersion appVersion,
@NonNull final QueueThread<GossipEvent> intakeQueue,
@NonNull final SwirldStateManager swirldStateManager,
@NonNull final StateManagementComponent stateManagementComponent,
@NonNull final SignedStateNexus latestCompleteState,
@NonNull final SyncMetrics syncMetrics,
@NonNull final StatusActionSubmitter statusActionSubmitter,
@NonNull final Consumer<SignedState> loadReconnectState,
Expand Down Expand Up @@ -214,7 +214,7 @@ protected AbstractGossip(
this::pause,
clearAllPipelinesForReconnect::run,
swirldStateManager::getConsensusState,
stateManagementComponent::getLastCompleteRound,
latestCompleteState::getRound,

Check warning on line 217 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/AbstractGossip.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/AbstractGossip.java#L217

Added line #L217 was not covered by tests
new ReconnectLearnerThrottle(time, selfId, reconnectConfig),
loadReconnectState,
new ReconnectLearnerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.swirlds.common.threading.framework.QueueThread;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.Consensus;
import com.swirlds.platform.components.state.StateManagementComponent;
import com.swirlds.platform.crypto.KeysAndCerts;
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.event.linking.EventLinker;
Expand All @@ -40,6 +39,7 @@
import com.swirlds.platform.observers.EventObserverDispatcher;
import com.swirlds.platform.recovery.EmergencyRecoveryManager;
import com.swirlds.platform.state.SwirldStateManager;
import com.swirlds.platform.state.nexus.SignedStateNexus;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.system.SoftwareVersion;
Expand Down Expand Up @@ -80,7 +80,7 @@ private GossipFactory() {}
* @param consensusRef a pointer to consensus
* @param intakeQueue the event intake queue
* @param swirldStateManager manages the mutable state
* @param stateManagementComponent manages the lifecycle of the state
* @param latestCompleteState holds the latest signed state that has enough signatures to be verifiable
* @param eventValidator validates events and passes valid events further along the intake pipeline
* @param eventObserverDispatcher the object used to wire event intake
* @param syncMetrics metrics for sync
Expand All @@ -107,7 +107,7 @@ public static Gossip buildGossip(
@NonNull final AtomicReference<Consensus> consensusRef,
@NonNull final QueueThread<GossipEvent> intakeQueue,
@NonNull final SwirldStateManager swirldStateManager,
@NonNull final StateManagementComponent stateManagementComponent,
@NonNull final SignedStateNexus latestCompleteState,
@NonNull final EventValidator eventValidator,
@NonNull final EventObserverDispatcher eventObserverDispatcher,
@NonNull final SyncMetrics syncMetrics,
Expand All @@ -131,7 +131,7 @@ public static Gossip buildGossip(
Objects.requireNonNull(consensusRef);
Objects.requireNonNull(intakeQueue);
Objects.requireNonNull(swirldStateManager);
Objects.requireNonNull(stateManagementComponent);
Objects.requireNonNull(latestCompleteState);

Check warning on line 134 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/GossipFactory.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/GossipFactory.java#L134

Added line #L134 was not covered by tests
Objects.requireNonNull(eventValidator);
Objects.requireNonNull(eventObserverDispatcher);
Objects.requireNonNull(syncMetrics);
Expand Down Expand Up @@ -160,7 +160,7 @@ public static Gossip buildGossip(
consensusRef,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
eventValidator,
eventObserverDispatcher,
syncMetrics,
Expand All @@ -183,7 +183,7 @@ public static Gossip buildGossip(
shadowGraph,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
syncMetrics,
platformStatusManager,
loadReconnectState,
Expand All @@ -205,7 +205,7 @@ public static Gossip buildGossip(
consensusRef,
intakeQueue,
swirldStateManager,
stateManagementComponent,
latestCompleteState,
syncMetrics,
eventLinker,
platformStatusManager,
Expand Down
Loading

0 comments on commit d971a0f

Please sign in to comment.