Skip to content

Commit

Permalink
SNAPSYNC add request task (hyperledger#3601)
Browse files Browse the repository at this point in the history
PR that adds the different request tasks necessary for the snapsync as well as a utility to manage the ranges of requests


Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
  • Loading branch information
matkt authored and garyschulte committed May 2, 2022
1 parent 79587a0 commit 84e9286
Show file tree
Hide file tree
Showing 34 changed files with 1,714 additions and 229 deletions.
@@ -0,0 +1,95 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.core;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;

import java.util.ArrayList;
import java.util.List;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;

public class TrieGenerator {

public static MerklePatriciaTrie<Bytes32, Bytes> generateTrie(
final WorldStateStorage worldStateStorage, final int nbAccounts) {
final List<Hash> accountHash = new ArrayList<>();
final MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
emptyAccountStateTrie(worldStateStorage);
// Add some storage values
for (int i = 0; i < nbAccounts; i++) {
final WorldStateStorage.Updater updater = worldStateStorage.updater();

accountHash.add(Hash.wrap(Bytes32.leftPad(Bytes.of(i + 1))));
final MerklePatriciaTrie<Bytes32, Bytes> storageTrie =
emptyStorageTrie(worldStateStorage, accountHash.get(i));
writeStorageValue(storageTrie, UInt256.ONE, UInt256.valueOf(2L));
writeStorageValue(storageTrie, UInt256.valueOf(2L), UInt256.valueOf(4L));
writeStorageValue(storageTrie, UInt256.valueOf(3L), UInt256.valueOf(6L));
int accountIndex = i;
storageTrie.commit(
(location, hash, value) ->
updater.putAccountStorageTrieNode(
accountHash.get(accountIndex), location, hash, value));
final Hash codeHash = Hash.hash(Bytes32.leftPad(Bytes.of(i + 10)));
final StateTrieAccountValue accountValue =
new StateTrieAccountValue(1L, Wei.of(2L), Hash.wrap(storageTrie.getRootHash()), codeHash);
accountStateTrie.put(accountHash.get(i), RLP.encode(accountValue::writeTo));
accountStateTrie.commit(updater::putAccountStateTrieNode);

// Persist updates
updater.commit();
}
return accountStateTrie;
}

private static void writeStorageValue(
final MerklePatriciaTrie<Bytes32, Bytes> storageTrie,
final UInt256 key,
final UInt256 value) {
storageTrie.put(storageKeyHash(key), encodeStorageValue(value));
}

private static Bytes32 storageKeyHash(final UInt256 storageKey) {
return Hash.hash(storageKey);
}

private static Bytes encodeStorageValue(final UInt256 storageValue) {
return RLP.encode(out -> out.writeBytes(storageValue.toMinimalBytes()));
}

public static MerklePatriciaTrie<Bytes32, Bytes> emptyStorageTrie(
final WorldStateStorage worldStateStorage, final Hash accountHash) {
return new StoredMerklePatriciaTrie<>(
(location, hash) ->
worldStateStorage.getAccountStorageTrieNode(accountHash, location, hash),
b -> b,
b -> b);
}

public static MerklePatriciaTrie<Bytes32, Bytes> emptyAccountStateTrie(
final WorldStateStorage worldStateStorage) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode, b -> b, b -> b);
}
}
Expand Up @@ -17,14 +17,11 @@
import static org.assertj.core.api.Assertions.assertThat;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.core.TrieGenerator;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.RangeStorageEntriesCollector;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.TrieIterator;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;

Expand All @@ -36,7 +33,6 @@

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -57,7 +53,8 @@ public void setup() {

@Test
public void rangeProofValidationNominalCase() {
final MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie = generateTrie();
final MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
TrieGenerator.generateTrie(worldStateStorage, 15);
// collect accounts in range
final RangeStorageEntriesCollector collector =
RangeStorageEntriesCollector.createCollector(Hash.ZERO, MAX_RANGE, 10, Integer.MAX_VALUE);
Expand Down Expand Up @@ -85,7 +82,8 @@ public void rangeProofValidationNominalCase() {

@Test
public void rangeProofValidationMissingAccount() {
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie = generateTrie();
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
TrieGenerator.generateTrie(worldStateStorage, 15);
// collect accounts in range
final RangeStorageEntriesCollector collector =
RangeStorageEntriesCollector.createCollector(Hash.ZERO, MAX_RANGE, 10, Integer.MAX_VALUE);
Expand Down Expand Up @@ -122,7 +120,8 @@ public void rangeProofValidationMissingAccount() {

@Test
public void rangeProofValidationNoMonotonicIncreasing() {
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie = generateTrie();
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
TrieGenerator.generateTrie(worldStateStorage, 15);

// generate the invalid proof
final RangeStorageEntriesCollector collector =
Expand Down Expand Up @@ -158,7 +157,8 @@ public void rangeProofValidationNoMonotonicIncreasing() {

@Test
public void rangeProofValidationEmptyProof() {
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie = generateTrie();
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
TrieGenerator.generateTrie(worldStateStorage, 15);

// generate the invalid proof
final RangeStorageEntriesCollector collector =
Expand All @@ -185,7 +185,8 @@ public void rangeProofValidationEmptyProof() {

@Test
public void rangeProofValidationInvalidEmptyProof() {
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie = generateTrie();
MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie =
TrieGenerator.generateTrie(worldStateStorage, 15);

// generate the invalid proof
final RangeStorageEntriesCollector collector =
Expand All @@ -209,61 +210,4 @@ public void rangeProofValidationInvalidEmptyProof() {
accounts);
assertThat(isValidRangeProof).isFalse();
}

private MerklePatriciaTrie<Bytes32, Bytes> generateTrie() {
final List<Hash> accountHash = new ArrayList<>();
final MerklePatriciaTrie<Bytes32, Bytes> accountStateTrie = emptyAccountStateTrie();
// Add some storage values
for (int i = 0; i < 15; i++) {
final WorldStateStorage.Updater updater = worldStateStorage.updater();

accountHash.add(Hash.wrap(Bytes32.leftPad(Bytes.of(i + 1))));
final MerklePatriciaTrie<Bytes32, Bytes> storageTrie = emptyStorageTrie(accountHash.get(i));
writeStorageValue(storageTrie, UInt256.ONE, UInt256.valueOf(2L));
writeStorageValue(storageTrie, UInt256.valueOf(2L), UInt256.valueOf(4L));
writeStorageValue(storageTrie, UInt256.valueOf(3L), UInt256.valueOf(6L));
int accountIndex = i;
storageTrie.commit(
(location, hash, value) ->
updater.putAccountStorageTrieNode(
accountHash.get(accountIndex), location, hash, value));
final Hash codeHash = Hash.hash(Bytes32.leftPad(Bytes.of(i + 10)));
final StateTrieAccountValue accountValue =
new StateTrieAccountValue(1L, Wei.of(2L), Hash.wrap(storageTrie.getRootHash()), codeHash);
accountStateTrie.put(accountHash.get(i), RLP.encode(accountValue::writeTo));
accountStateTrie.commit(updater::putAccountStateTrieNode);

// Persist updates
updater.commit();
}
return accountStateTrie;
}

private void writeStorageValue(
final MerklePatriciaTrie<Bytes32, Bytes> storageTrie,
final UInt256 key,
final UInt256 value) {
storageTrie.put(storageKeyHash(key), encodeStorageValue(value));
}

private Bytes32 storageKeyHash(final UInt256 storageKey) {
return Hash.hash(storageKey);
}

private Bytes encodeStorageValue(final UInt256 storageValue) {
return RLP.encode(out -> out.writeBytes(storageValue.toMinimalBytes()));
}

private MerklePatriciaTrie<Bytes32, Bytes> emptyStorageTrie(final Hash accountHash) {
return new StoredMerklePatriciaTrie<>(
(location, hash) ->
worldStateStorage.getAccountStorageTrieNode(accountHash, location, hash),
b -> b,
b -> b);
}

private static MerklePatriciaTrie<Bytes32, Bytes> emptyAccountStateTrie() {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode, b -> b, b -> b);
}
}
Expand Up @@ -31,7 +31,7 @@
import org.hyperledger.besu.ethereum.eth.messages.snap.GetAccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetStorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodes;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
Expand All @@ -57,6 +57,7 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -291,26 +292,45 @@ public RequestManager.ResponseStream getPooledTransactions(final List<Hash> hash
requestManagers.get(protocolName).get(EthPV65.GET_POOLED_TRANSACTIONS), message);
}

public RequestManager.ResponseStream getSnapAccountRange(final GetAccountRangeMessage message)
public RequestManager.ResponseStream getSnapAccountRange(
final Hash stateRoot, final Bytes32 startKeyHash, final Bytes32 endKeyHash)
throws PeerNotConnected {
final GetAccountRangeMessage getAccountRangeMessage =
GetAccountRangeMessage.create(stateRoot, startKeyHash, endKeyHash);
getAccountRangeMessage.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_ACCOUNT_RANGE), message);
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_ACCOUNT_RANGE),
getAccountRangeMessage);
}

public RequestManager.ResponseStream getSnapStorageRange(final GetStorageRangeMessage message)
public RequestManager.ResponseStream getSnapStorageRange(
final Hash stateRoot,
final List<Bytes32> accountHashes,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash)
throws PeerNotConnected {
final GetStorageRangeMessage getStorageRangeMessage =
GetStorageRangeMessage.create(stateRoot, accountHashes, startKeyHash, endKeyHash);
getStorageRangeMessage.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_STORAGE_RANGE), message);
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_STORAGE_RANGE),
getStorageRangeMessage);
}

public RequestManager.ResponseStream getSnapBytecode(final GetByteCodesMessage message)
throws PeerNotConnected {
return sendRequest(requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_BYTECODES), message);
public RequestManager.ResponseStream getSnapBytecode(
final Hash stateRoot, final List<Bytes32> codeHashes) throws PeerNotConnected {
final GetByteCodesMessage getByteCodes = GetByteCodesMessage.create(codeHashes);
getByteCodes.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_BYTECODES), getByteCodes);
}

public RequestManager.ResponseStream getSnapTrieNode(final GetTrieNodes message)
throws PeerNotConnected {
return sendRequest(requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_TRIE_NODES), message);
public RequestManager.ResponseStream getSnapTrieNode(
final Hash stateRoot, final List<List<Bytes>> paths) throws PeerNotConnected {
final GetTrieNodesMessage getTrieNodes = GetTrieNodesMessage.create(stateRoot, paths);
getTrieNodes.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_TRIE_NODES), getTrieNodes);
}

private RequestManager.ResponseStream sendRequest(
Expand Down
@@ -0,0 +1,93 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.snap;

import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.Optional;

import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GetAccountRangeFromPeerTask
extends AbstractPeerRequestTask<AccountRangeMessage.AccountRangeData> {

private static final Logger LOG = LoggerFactory.getLogger(GetAccountRangeFromPeerTask.class);

private final Bytes32 startKeyHash;
private final Bytes32 endKeyHash;
private final BlockHeader blockHeader;

private GetAccountRangeFromPeerTask(
final EthContext ethContext,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, SnapV1.ACCOUNT_RANGE, metricsSystem);
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
this.blockHeader = blockHeader;
}

public static GetAccountRangeFromPeerTask forAccountRange(
final EthContext ethContext,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
return new GetAccountRangeFromPeerTask(
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
}

@Override
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(
peer -> {
LOG.trace(
"Requesting account range [{} ,{}] for state root {} from peer {} .",
startKeyHash,
endKeyHash,
blockHeader.getStateRoot(),
peer);
return peer.getSnapAccountRange(blockHeader.getStateRoot(), startKeyHash, endKeyHash);
},
blockHeader.getNumber());
}

@Override
protected Optional<AccountRangeMessage.AccountRangeData> processResponse(
final boolean streamClosed, final MessageData message, final EthPeer peer) {

if (streamClosed) {
// We don't record this as a useless response because it's impossible to know if a peer has
// the data we're requesting.
return Optional.empty();
}
final AccountRangeMessage accountRangeMessage = AccountRangeMessage.readFrom(message);
final AccountRangeMessage.AccountRangeData accountRangeData =
accountRangeMessage.accountData(true);
return Optional.of(accountRangeData);
}
}

0 comments on commit 84e9286

Please sign in to comment.