Skip to content

Commit

Permalink
Snap server rebase (#6640)
Browse files Browse the repository at this point in the history
* initial snap server implementation

Signed-off-by: garyschulte <garyschulte@gmail.com>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
  • Loading branch information
garyschulte and macfarla committed Mar 30, 2024
1 parent deaea9b commit 34fc5ee
Show file tree
Hide file tree
Showing 35 changed files with 1,714 additions and 237 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Dedicated log marker for invalid txs removed from the txpool [#6826](https://github.com/hyperledger/besu/pull/6826)
- Prevent startup with BONSAI and privacy enabled [#6809](https://github.com/hyperledger/besu/pull/6809)
- Remove deprecated Forest pruning [#6810](https://github.com/hyperledger/besu/pull/6810)
- Experimental Snap Sync Server [#6640](https://github.com/hyperledger/besu/pull/6640)

### Bug fixes
- Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665)
Expand Down
2 changes: 2 additions & 0 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,8 @@ private String generateConfigurationOverview() {
getDataStorageConfiguration().getUnstable().getBonsaiTrieLogPruningWindowSize());
}

builder.setSnapServerEnabled(this.unstableSynchronizerOptions.isSnapsyncServerEnabled());

builder.setTxPoolImplementation(buildTransactionPoolConfiguration().getTxPoolImplementation());
builder.setWorldStateUpdateMode(unstableEvmOptions.toDomainObject().worldUpdaterMode());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ConfigurationOverviewBuilder {
private boolean isBonsaiLimitTrieLogsEnabled = false;
private long trieLogRetentionLimit = 0;
private Integer trieLogsPruningWindowSize = null;
private boolean isSnapServerEnabled = false;
private TransactionPoolConfiguration.Implementation txPoolImplementation;
private EvmConfiguration.WorldUpdaterMode worldStateUpdateMode;
private Map<String, String> environment;
Expand Down Expand Up @@ -219,6 +220,17 @@ public ConfigurationOverviewBuilder setTrieLogRetentionLimit(final long limit) {
return this;
}

/**
* Sets snap server enabled/disabled
*
* @param snapServerEnabled bool to indicate if snap server is enabled
* @return the builder
*/
public ConfigurationOverviewBuilder setSnapServerEnabled(final boolean snapServerEnabled) {
isSnapServerEnabled = snapServerEnabled;
return this;
}

/**
* Sets trie logs pruning window size
*
Expand Down Expand Up @@ -339,6 +351,10 @@ public String build() {

lines.add("Using " + worldStateUpdateMode + " worldstate update mode");

if (isSnapServerEnabled) {
lines.add("Experimental Snap Sync server enabled");
}

if (isBonsaiLimitTrieLogsEnabled) {
final StringBuilder trieLogPruningString = new StringBuilder();
trieLogPruningString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
private static final String SNAP_FLAT_DB_HEALING_ENABLED_FLAG =
"--Xsnapsync-synchronizer-flat-db-healing-enabled";

private static final String SNAP_SERVER_ENABLED_FLAG = "--Xsnapsync-server-enabled";

private static final String CHECKPOINT_POST_MERGE_FLAG = "--Xcheckpoint-post-merge-enabled";

/**
Expand Down Expand Up @@ -296,6 +298,13 @@ public void parseBlockPropagationRange(final String arg) {
private Boolean snapsyncFlatDbHealingEnabled =
SnapSyncConfiguration.DEFAULT_IS_FLAT_DB_HEALING_ENABLED;

@CommandLine.Option(
names = SNAP_SERVER_ENABLED_FLAG,
hidden = true,
paramLabel = "<Boolean>",
description = "Snap sync server enabled (default: ${DEFAULT-VALUE})")
private Boolean snapsyncServerEnabled = SnapSyncConfiguration.DEFAULT_SNAP_SERVER_ENABLED;

@CommandLine.Option(
names = {CHECKPOINT_POST_MERGE_FLAG},
hidden = true,
Expand All @@ -314,6 +323,15 @@ public boolean isSnapsyncFlatDbHealingEnabled() {
return snapsyncFlatDbHealingEnabled;
}

/**
* Flag to know whether the Snap sync server feature is enabled or disabled.
*
* @return true if snap sync server is enabled
*/
public boolean isSnapsyncServerEnabled() {
return snapsyncServerEnabled;
}

/**
* Create synchronizer options.
*
Expand Down Expand Up @@ -398,6 +416,7 @@ public SynchronizerConfiguration.Builder toDomainObject() {
.localFlatAccountCountToHealPerRequest(snapsyncFlatAccountHealedCountPerRequest)
.localFlatStorageCountToHealPerRequest(snapsyncFlatStorageHealedCountPerRequest)
.isFlatDbHealingEnabled(snapsyncFlatDbHealingEnabled)
.isSnapServerEnabled(snapsyncServerEnabled)
.build());
builder.checkpointPostMergeEnabled(checkpointPostMergeSyncEnabled);

Expand Down Expand Up @@ -456,7 +475,9 @@ public List<String> getCLIOptions() {
SNAP_FLAT_ACCOUNT_HEALED_COUNT_PER_REQUEST_FLAG,
OptionParser.format(snapsyncFlatAccountHealedCountPerRequest),
SNAP_FLAT_STORAGE_HEALED_COUNT_PER_REQUEST_FLAG,
OptionParser.format(snapsyncFlatStorageHealedCountPerRequest)));
OptionParser.format(snapsyncFlatStorageHealedCountPerRequest),
SNAP_SERVER_ENABLED_FLAG,
OptionParser.format(snapsyncServerEnabled)));
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,6 @@ public BesuController build() {
peerValidators,
Optional.empty());

final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive);

final PivotBlockSelector pivotBlockSelector =
createPivotSelector(
protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
Expand All @@ -671,6 +668,10 @@ public BesuController build() {

protocolContext.setSynchronizer(Optional.of(synchronizer));

final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(
protocolContext, worldStateStorageCoordinator, ethPeers, snapMessages);

final MiningCoordinator miningCoordinator =
createMiningCoordinator(
protocolSchedule,
Expand Down Expand Up @@ -986,12 +987,23 @@ protected ProtocolContext createProtocolContext(
}

private Optional<SnapProtocolManager> createSnapProtocolManager(
final List<PeerValidator> peerValidators,
final ProtocolContext protocolContext,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final EthPeers ethPeers,
final EthMessages snapMessages,
final WorldStateArchive worldStateArchive) {
return Optional.of(
new SnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive));
final EthMessages snapMessages) {
if (Optional.ofNullable(syncConfig.getSnapSyncConfiguration())
.map(snapConfig -> snapConfig.isSnapServerEnabled())
.orElse(false)) {
return Optional.of(
new SnapProtocolManager(
worldStateStorageCoordinator,
syncConfig.getSnapSyncConfiguration(),
ethPeers,
snapMessages,
protocolContext));
} else {
return Optional.empty();
}
}

WorldStateArchive createWorldStateArchive(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class Hash extends DelegatingBytes32 {
/** The constant ZERO. */
public static final Hash ZERO = new Hash(Bytes32.ZERO);

/** Last hash */
public static final Hash LAST = new Hash(Bytes32.fromHexString("F".repeat(64)));

/**
* Hash of an RLP encoded trie hash with no content, or
* "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
Expand Down
1 change: 1 addition & 0 deletions ethereum/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {

implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.google.guava:guava'
implementation 'com.github.ben-manes.caffeine:caffeine'
implementation 'com.google.dagger:dagger'
implementation 'org.apache.maven:maven-artifact'
annotationProcessor 'com.google.dagger:dagger-compiler'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -41,6 +42,8 @@
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The WorldStateProofProvider class is responsible for providing proofs for world state entries. It
Expand All @@ -49,6 +52,7 @@
public class WorldStateProofProvider {

private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private static final Logger LOG = LoggerFactory.getLogger(WorldStateProofProvider.class);

public WorldStateProofProvider(final WorldStateStorageCoordinator worldStateStorageCoordinator) {
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
Expand Down Expand Up @@ -85,7 +89,8 @@ private SortedMap<UInt256, Proof<Bytes>> getStorageProofs(
final List<UInt256> accountStorageKeys) {
final MerkleTrie<Bytes32, Bytes> storageTrie =
newAccountStorageTrie(accountHash, account.getStorageRoot());
final NavigableMap<UInt256, Proof<Bytes>> storageProofs = new TreeMap<>();
final NavigableMap<UInt256, Proof<Bytes>> storageProofs =
new TreeMap<>(Comparator.comparing(Bytes32::toHexString));
accountStorageKeys.forEach(
key -> storageProofs.put(key, storageTrie.getValueWithProof(Hash.hash(key))));
return storageProofs;
Expand Down Expand Up @@ -153,19 +158,26 @@ public boolean isValidRangeProof(
final SortedMap<Bytes32, Bytes> keys) {

// check if it's monotonic increasing
if (!Ordering.natural().isOrdered(keys.keySet())) {
if (keys.size() > 1 && !Ordering.natural().isOrdered(keys.keySet())) {
return false;
}

// when proof is empty we need to have all the keys to reconstruct the trie
// when proof is empty and we requested the full range, we should
// have all the keys to reconstruct the trie
if (proofs.isEmpty()) {
final MerkleTrie<Bytes, Bytes> trie = new SimpleMerklePatriciaTrie<>(Function.identity());
// add the received keys in the trie
for (Map.Entry<Bytes32, Bytes> key : keys.entrySet()) {
trie.put(key.getKey(), key.getValue());
if (startKeyHash.equals(Bytes32.ZERO)) {
final MerkleTrie<Bytes, Bytes> trie = new SimpleMerklePatriciaTrie<>(Function.identity());
// add the received keys in the trie
for (Map.Entry<Bytes32, Bytes> key : keys.entrySet()) {
trie.put(key.getKey(), key.getValue());
}
return rootHash.equals(trie.getRootHash());
} else {
// TODO: possibly accept a node loader so we can verify this with already
// completed partial storage requests
LOG.info("failing proof due to incomplete range without proofs");
return false;
}

return rootHash.equals(trie.getRootHash());
}

// reconstruct a part of the trie with the proof
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public synchronized void addCachedLayer(
}

@Override
public boolean containWorldStateStorage(final Hash blockHash) {
public boolean contains(final Hash blockHash) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ public Optional<Bytes> getAccountStorageTrieNode(
}

public Optional<Bytes> getTrieNodeUnsafe(final Bytes key) {
return composedWorldStateStorage
.get(TRIE_BRANCH_STORAGE, Bytes.concatenate(key).toArrayUnsafe())
.map(Bytes::wrap);
return composedWorldStateStorage.get(TRIE_BRANCH_STORAGE, key.toArrayUnsafe()).map(Bytes::wrap);
}

public Optional<Bytes> getStorageValueByStorageSlotKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Optional<WorldState> get(final Hash rootHash, final Hash blockHash) {

@Override
public boolean isWorldStateAvailable(final Hash rootHash, final Hash blockHash) {
return cachedWorldStorageManager.containWorldStateStorage(blockHash)
return cachedWorldStorageManager.contains(blockHash)
|| persistedState.blockHash().equals(blockHash)
|| worldStateKeyValueStorage.isWorldStateAvailable(rootHash, blockHash);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package org.hyperledger.besu.ethereum.trie.diffbased.common.cache;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.trie.diffbased.common.DiffBasedWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.diffbased.common.StorageSubscriber;
import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.DiffBasedLayeredWorldStateKeyValueStorage;
Expand All @@ -29,8 +31,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +46,11 @@ public abstract class DiffBasedCachedWorldStorageManager implements StorageSubsc
LoggerFactory.getLogger(DiffBasedCachedWorldStorageManager.class);
private final DiffBasedWorldStateProvider archive;
private final EvmConfiguration evmConfiguration;
private final Cache<Hash, BlockHeader> stateRootToBlockHeaderCache =
Caffeine.newBuilder()
.maximumSize(RETAINED_LAYERS)
.expireAfterWrite(100, TimeUnit.MINUTES)
.build();

private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage;
private final Map<Bytes32, DiffBasedCachedWorldView> cachedWorldStatesByHash;
Expand Down Expand Up @@ -104,6 +114,8 @@ public synchronized void addCachedLayer(
((DiffBasedLayeredWorldStateKeyValueStorage) forWorldState.getWorldStateStorage())
.clone()));
}
// add stateroot -> blockHeader cache entry
stateRootToBlockHeaderCache.put(blockHeader.getStateRoot(), blockHeader);
}
scrubCachedLayers(blockHeader.getNumber());
}
Expand Down Expand Up @@ -192,14 +204,50 @@ public Optional<DiffBasedWorldState> getHeadWorldState(
});
}

public boolean containWorldStateStorage(final Hash blockHash) {
public boolean contains(final Hash blockHash) {
return cachedWorldStatesByHash.containsKey(blockHash);
}

public void reset() {
this.cachedWorldStatesByHash.clear();
}

public void primeRootToBlockHashCache(final Blockchain blockchain, final int numEntries) {
// prime the stateroot-to-blockhash cache
long head = blockchain.getChainHeadHeader().getNumber();
for (long i = head; i > Math.max(0, head - numEntries); i--) {
blockchain
.getBlockHeader(i)
.ifPresent(header -> stateRootToBlockHeaderCache.put(header.getStateRoot(), header));
}
}

/**
* Returns the worldstate for the supplied root hash. If the worldstate is not already in cache,
* this method will attempt to fetch it and add it to the cache. synchronized to prevent
* concurrent loads/adds to the cache of the same root hash.
*
* @param rootHash rootHash to supply worldstate storage for
* @return Optional worldstate storage
*/
public synchronized Optional<DiffBasedWorldStateKeyValueStorage> getStorageByRootHash(
final Hash rootHash) {
return Optional.ofNullable(stateRootToBlockHeaderCache.getIfPresent(rootHash))
.flatMap(
header ->
Optional.ofNullable(cachedWorldStatesByHash.get(header.getHash()))
.map(DiffBasedCachedWorldView::getWorldStateStorage)
.or(
() -> {
// if not cached already, maybe fetch and cache this worldstate
var maybeWorldState =
archive.getMutable(header, false).map(BonsaiWorldState.class::cast);
maybeWorldState.ifPresent(
ws -> addCachedLayer(header, header.getStateRoot(), ws));
return maybeWorldState.map(BonsaiWorldState::getWorldStateStorage);
}));
}

@Override
public void onClearStorage() {
this.cachedWorldStatesByHash.clear();
Expand Down
Loading

0 comments on commit 34fc5ee

Please sign in to comment.