Skip to content

Commit

Permalink
fix: manage StakingInfos in restart (#12911)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Tinker <michael.tinker@swirldslabs.com>
Signed-off-by: Neeharika-Sompalli <neeharika.sompalli@swirldslabs.com>
Co-authored-by: Neeharika-Sompalli <neeharika.sompalli@swirldslabs.com>
  • Loading branch information
tinker-michaelj and Neeharika-Sompalli committed Apr 22, 2024
1 parent a98c580 commit c6dab4c
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 333 deletions.
Expand Up @@ -26,15 +26,14 @@
import com.hedera.node.app.service.mono.state.merkle.MerkleStakingInfo;
import com.hedera.node.app.service.mono.state.migration.StateChildIndices;
import com.hedera.node.app.service.mono.utils.EntityNum;
import com.hedera.node.app.service.token.ReadableStakingInfoStore;
import com.hedera.node.app.service.token.TokenService;
import com.hedera.node.app.spi.state.WritableKVState;
import com.hedera.node.app.spi.state.WritableKVStateBase;
import com.hedera.node.app.state.merkle.HederaLifecycles;
import com.hedera.node.app.state.merkle.MerkleHederaState;
import com.hedera.node.app.workflows.dispatcher.ReadableStoreFactory;
import com.hedera.node.app.state.merkle.disk.OnDiskKey;
import com.hedera.node.app.state.merkle.disk.OnDiskValue;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.platform.NodeId;
import com.swirlds.common.threading.manager.AdHocThreadManager;
import com.swirlds.config.api.Configuration;
import com.swirlds.merkle.map.MerkleMap;
import com.swirlds.platform.state.PlatformState;
Expand All @@ -44,11 +43,13 @@
import com.swirlds.platform.system.SoftwareVersion;
import com.swirlds.platform.system.address.AddressBook;
import com.swirlds.platform.system.events.Event;
import com.swirlds.virtualmap.VirtualMap;
import com.swirlds.virtualmap.VirtualMapMigration;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -60,10 +61,42 @@ public class HederaLifecyclesImpl implements HederaLifecycles {
private static final long LEDGER_TOTAL_TINY_BAR_FLOAT = 5000000000000000000L;
private static final int NUM_REWARD_HISTORY_STORED_PERIODS = 365;

private static final BiConsumer<
VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>,
BiConsumer<EntityNumber, StakingNodeInfo>>
WEIGHT_UPDATE_VISITOR = (map, visitor) -> {
try {
VirtualMapMigration.extractVirtualMapData(
AdHocThreadManager.getStaticThreadManager(),
map,
pair -> visitor.accept(
pair.key().getKey(), pair.value().getValue()),
1);
} catch (InterruptedException e) {
logger.error("Interrupted while updating weights", e);
throw new IllegalStateException(e);
}
};

private final Hedera hedera;
private final BiConsumer<
VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>,
BiConsumer<EntityNumber, StakingNodeInfo>>
weightUpdateVisitor;

public HederaLifecyclesImpl(@NonNull final Hedera hedera) {
this.hedera = Objects.requireNonNull(hedera);
this(hedera, WEIGHT_UPDATE_VISITOR);
}

public HederaLifecyclesImpl(
@NonNull final Hedera hedera,
@NonNull
final BiConsumer<
VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>,
BiConsumer<EntityNumber, StakingNodeInfo>>
weightUpdateVisitor) {
this.hedera = requireNonNull(hedera);
this.weightUpdateVisitor = requireNonNull(weightUpdateVisitor);
}

@Override
Expand Down Expand Up @@ -92,56 +125,40 @@ public void onUpdateWeight(
@NonNull final MerkleHederaState state,
@NonNull final AddressBook configAddressBook,
@NonNull final PlatformContext context) {
final var tokenServiceState = state.getWritableStates(TokenService.NAME);
final var isMonoState = state.getChild(StateChildIndices.STAKING_INFO) instanceof MerkleMap;
// Get all nodeIds added in the config.txt
Set<NodeId> configNodeIds = configAddressBook.getNodeIdSet();
final var configAddressBookSize = configNodeIds.size();
if (!tokenServiceState.isEmpty()) {
final var stakingInfoState = tokenServiceState.<EntityNumber, StakingNodeInfo>get(STAKING_INFO_KEY);
final var readableStoreFactory = new ReadableStoreFactory(state);
final var stakingInfoStore = readableStoreFactory.getStore(ReadableStakingInfoStore.class);
final var allNodeIds = stakingInfoStore.getAll();
for (final var nodeId : allNodeIds) {
final var stakingInfo = requireNonNull(stakingInfoStore.get(nodeId));
NodeId id = new NodeId(nodeId);
// set weight for the nodes that exist in state and remove from
// nodes given in config.txt. This is needed to recognize newly added nodes
// It is possible that some nodes are deleted in configAddressBook compared to state
// We will set those node sas deleted in EndOfStakingPeriodCalculator
if (configNodeIds.contains(id)) {
configAddressBook.updateWeight(id, stakingInfo.weight());
configNodeIds.remove(id);
} else {
// We need to validate and mark any node that are removed during upgrade as deleted
// and also set the weight to 0
stakingInfoState.put(
EntityNumber.newBuilder().number(id.id()).build(),
stakingInfo.copyBuilder().deleted(true).weight(0).build());
logger.info(
"Node {} is deleted from configAddressBook during upgrade "
+ "and is marked deleted in state",
id);
}
final var nodeIdsLeftToUpdate = configAddressBook.getNodeIdSet();
if (!isMonoState) {
final var stakingInfoIndex = state.findNodeIndex(TokenService.NAME, STAKING_INFO_KEY);
if (stakingInfoIndex == -1) {
logger.warn("Staking info not found in state, skipping weight update");
return;
}
// for any newly added nodes that doesn't exist in state, weight should be set to 0
// irrespective of the weight provided in config.txt
if (!configNodeIds.isEmpty()) {
configNodeIds.forEach(nodeId -> {
configAddressBook.updateWeight(nodeId, 0);
@SuppressWarnings("unchecked")
final var stakingInfoVMap = (VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>)
state.getChild(stakingInfoIndex);
// Since it is much easier to modify the in-state staking info after schemas
// are registered with MerkleHederaState, we do that work later in the token
// service schema's restart() hook. Here we only update the address book weights
// based on the staking info in the state.
weightUpdateVisitor.accept(stakingInfoVMap, (node, info) -> {
final var nodeId = new NodeId(node.number());
// If present in the address book, remove this node id from the
// set of node ids left to update and update its weight
if (nodeIdsLeftToUpdate.remove(nodeId)) {
configAddressBook.updateWeight(nodeId, info.weight());
} else {
logger.info(
"Node {} is newly added in configAddressBook during upgrade "
+ "with weight 0 in configAddressBook",
"Node {} is no longer in address book; restart() will ensure its staking info is marked deleted",
nodeId);
});
// update the state with new nodes and set weight to 0
addAdditionalNodesToState(
stakingInfoState, configNodeIds, context.getConfiguration(), configAddressBookSize);
}

if (stakingInfoState.isModified()) {
((WritableKVStateBase) stakingInfoState).commit();
}
}
});
nodeIdsLeftToUpdate.forEach(nodeId -> {
configAddressBook.updateWeight(nodeId, 0);
logger.info("Found new node {}; restart() will add its staking info", nodeId);
});
} else {
final var configAddressBookSize = nodeIdsLeftToUpdate.size();
// When doing the first upgrade from 0.48 to 0.49, we will call updateWeight before BBM migration.
// In this case, we need to update the weight in the stakingInfo map from mono service state.
logger.info("Token service state is empty, so we are migrating from mono to mod-service with "
Expand All @@ -150,9 +167,9 @@ public void onUpdateWeight(
(MerkleMap<EntityNum, MerkleStakingInfo>) state.getChild(StateChildIndices.STAKING_INFO));
stakingInfosMap.forEachNode((nodeNum, stakingInfo) -> {
final NodeId nodeId = new NodeId(nodeNum.longValue());
if (configNodeIds.contains(nodeId)) {
if (nodeIdsLeftToUpdate.contains(nodeId)) {
configAddressBook.updateWeight(nodeId, stakingInfo.getWeight());
configNodeIds.remove(nodeId);
nodeIdsLeftToUpdate.remove(nodeId);
} else {
final var newStakingInfo = stakingInfosMap.getForModify(nodeNum);
newStakingInfo.setWeight(0);
Expand All @@ -164,8 +181,8 @@ public void onUpdateWeight(
});
// for any newly added nodes that doesn't exist in state, weight should be set to 0
// irrespective of the weight provided in config.txt
if (!configNodeIds.isEmpty()) {
configNodeIds.forEach(nodeId -> {
if (!nodeIdsLeftToUpdate.isEmpty()) {
nodeIdsLeftToUpdate.forEach(nodeId -> {
configAddressBook.updateWeight(nodeId, 0);
logger.info(
"Node {} is newly added in configAddressBook during upgrade "
Expand All @@ -174,13 +191,14 @@ public void onUpdateWeight(
});
// update the state with new nodes and set weight to 0
addAdditionalNodesToState(
stakingInfosMap, configNodeIds, context.getConfiguration(), configAddressBookSize);
stakingInfosMap, nodeIdsLeftToUpdate, context.getConfiguration(), configAddressBookSize);
}
}
}

/**
* Add additional nodes to state with weight 0 and update all nodes maxStake to maxStakePerNode
*
* @param stakingInfoMap The state to update
* @param newNodeIds The new node ids to add
* @param config The configuration
Expand Down Expand Up @@ -220,49 +238,6 @@ private void addAdditionalNodesToState(
});
}

/**
* Add additional nodes to state with weight 0 and update all nodes maxStake to maxStakePerNode
* @param stakingToState The state to update
* @param newNodeIds The new node ids to add
* @param config The configuration
* @param configAddressBookSize The size of the address book
*/
private void addAdditionalNodesToState(
@NonNull final WritableKVState<EntityNumber, StakingNodeInfo> stakingToState,
@NonNull final Set<NodeId> newNodeIds,
@NonNull final Configuration config,
final int configAddressBookSize) {
// Since PlatformContext configuration is not available here,
// we are using the default values here. (FUTURE) Need to see how to use config here
// for ledger.totalTinyBarFloat and staking.rewardHistory.numStoredPeriods
final long maxStakePerNode = LEDGER_TOTAL_TINY_BAR_FLOAT / configAddressBookSize;

// Add new nodes with weight 0
for (final var nodeId : newNodeIds) {
final var rewardSumHistory = new Long[NUM_REWARD_HISTORY_STORED_PERIODS + 1];
Arrays.fill(rewardSumHistory, 0L);

final var newNodeStakingInfo = StakingNodeInfo.newBuilder()
.nodeNumber(nodeId.id())
.maxStake(maxStakePerNode)
.minStake(0L)
.rewardSumHistory(Arrays.asList(rewardSumHistory))
.weight(0)
.build();
stakingToState.put(new EntityNumber(nodeId.id()), newNodeStakingInfo);
logger.info("Node {} is added in state with weight 0 and maxStakePerNode {} ", nodeId, maxStakePerNode);
}
// Update all nodes maxStake to maxStakePerNode
stakingToState.keys().forEachRemaining(key -> {
final var stakingInfo = stakingToState.get(key);
final var copy = requireNonNull(stakingInfo)
.copyBuilder()
.maxStake(maxStakePerNode)
.build();
stakingToState.put(key, copy);
});
}

@Override
public void onNewRecoveredState(@NonNull final MerkleHederaState recoveredState) {
hedera.onNewRecoveredState(recoveredState);
Expand Down
Expand Up @@ -334,9 +334,15 @@ private void handleUserTransaction(
final var feeAccumulator = createFeeAccumulator(stack, configuration, recordBuilder);

final var tokenServiceContext = new TokenContextImpl(
configuration, storeMetricsService, stack, recordListBuilder, blockRecordManager, isFirstTransaction);
// It's awful that we have to check this every time a transaction is handled, especially since this mostly
// applies to non-production cases. Let's find a way to 💥💥 remove this 💥💥
configuration,
state,
storeMetricsService,
stack,
recordListBuilder,
blockRecordManager,
isFirstTransaction);
// Do any one-time work for the first transaction after genesis;
// overhead for all following transactions is effectively zero
genesisRecordsTimeHook.process(tokenServiceContext);
try {
// If this is the first user transaction after midnight, then handle staking updates prior to handling the
Expand Down
Expand Up @@ -21,10 +21,12 @@
import static java.util.Objects.requireNonNull;

import com.hedera.node.app.records.BlockRecordManager;
import com.hedera.node.app.service.token.ReadableStakingInfoStore;
import com.hedera.node.app.service.token.TokenService;
import com.hedera.node.app.service.token.records.FinalizeContext;
import com.hedera.node.app.service.token.records.TokenContext;
import com.hedera.node.app.spi.metrics.StoreMetricsService;
import com.hedera.node.app.state.HederaState;
import com.hedera.node.app.workflows.dispatcher.ReadableStoreFactory;
import com.hedera.node.app.workflows.dispatcher.WritableStoreFactory;
import com.hedera.node.app.workflows.handle.record.RecordListBuilder;
Expand All @@ -33,10 +35,12 @@
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import java.util.Set;
import java.util.function.Consumer;

public class TokenContextImpl implements TokenContext, FinalizeContext {
private final Configuration configuration;
private final HederaState state;
private final ReadableStoreFactory readableStoreFactory;
private final WritableStoreFactory writableStoreFactory;
private final RecordListBuilder recordListBuilder;
Expand All @@ -45,11 +49,13 @@ public class TokenContextImpl implements TokenContext, FinalizeContext {

public TokenContextImpl(
@NonNull final Configuration configuration,
@NonNull final HederaState state,
@NonNull final StoreMetricsService storeMetricsService,
@NonNull final SavepointStackImpl stack,
@NonNull final RecordListBuilder recordListBuilder,
@NonNull final BlockRecordManager blockRecordManager,
final boolean isFirstTransaction) {
this.state = requireNonNull(state, "state must not be null");
requireNonNull(stack, "stack must not be null");
this.configuration = requireNonNull(configuration, "configuration must not be null");
this.recordListBuilder = requireNonNull(recordListBuilder, "recordListBuilder must not be null");
Expand Down Expand Up @@ -143,4 +149,11 @@ public boolean isScheduleDispatch() {
public void markMigrationRecordsStreamed() {
blockRecordManager.markMigrationRecordsStreamed();
}

@Override
public Set<Long> knownNodeIds() {
return new ReadableStoreFactory(state)
.getStore(ReadableStakingInfoStore.class)
.getAll();
}
}

0 comments on commit c6dab4c

Please sign in to comment.