From c6e86526dae3bea24f9c56f04d850cd608f065e6 Mon Sep 17 00:00:00 2001 From: Keefe-Liu Date: Tue, 11 Jan 2022 16:17:03 +0800 Subject: [PATCH] implement trust protocol and verify node --- common/types.go | 15 +++ core/blockchain.go | 114 ++++++++++++++++++++++ core/state/state_object.go | 7 ++ core/state/statedb.go | 6 +- core/types/block.go | 89 ++++++++++++++++- eth/backend.go | 3 + eth/handler.go | 24 ++++- eth/handler_trust.go | 43 +++++++++ eth/peer.go | 25 ++++- eth/peerset.go | 118 +++++++++++++++++++++-- eth/protocols/trust/discovery.go | 14 +++ eth/protocols/trust/handler.go | 159 +++++++++++++++++++++++++++++++ eth/protocols/trust/peer.go | 66 +++++++++++++ eth/protocols/trust/protocol.go | 72 ++++++++++++++ eth/protocols/trust/tracker.go | 10 ++ ethclient/ethclient.go | 6 ++ internal/ethapi/api.go | 4 + p2p/peer.go | 5 + p2p/server.go | 18 ++++ 19 files changed, 785 insertions(+), 13 deletions(-) create mode 100644 eth/handler_trust.go create mode 100644 eth/protocols/trust/discovery.go create mode 100644 eth/protocols/trust/handler.go create mode 100644 eth/protocols/trust/peer.go create mode 100644 eth/protocols/trust/protocol.go create mode 100644 eth/protocols/trust/tracker.go diff --git a/common/types.go b/common/types.go index d715356692..15f2bce681 100644 --- a/common/types.go +++ b/common/types.go @@ -354,6 +354,21 @@ func (a *Address) UnmarshalGraphQL(input interface{}) error { return err } +// AddressSlice is used for sort +type AddressSlice []Address + +func (s AddressSlice) Len() int { + return len(s) +} + +func (s AddressSlice) Less(i, j int) bool { + return s[i].Hex() < s[j].Hex() +} + +func (s AddressSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + // UnprefixedAddress allows marshaling an Address without 0x prefix. type UnprefixedAddress Address diff --git a/core/blockchain.go b/core/blockchain.go index e286a1e124..1bc5be0b19 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + "golang.org/x/crypto/sha3" + lru "github.com/hashicorp/golang-lru" "github.com/ethereum/go-ethereum/common" @@ -1798,6 +1800,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. diffLayer.Receipts = receipts diffLayer.BlockHash = block.Hash() diffLayer.Number = block.NumberU64() + + sort.Sort(types.DiffCodeSlice(diffLayer.Codes)) + sort.Sort(common.AddressSlice(diffLayer.Destructs)) + sort.Sort(types.DiffAccountSlice(diffLayer.Accounts)) + sort.Sort(types.DiffStorageSlice(diffLayer.Storages)) + bc.cacheDiffLayer(diffLayer) } @@ -3111,3 +3119,109 @@ func EnablePersistDiff(limit uint64) BlockChainOption { return chain } } + +func (bc *BlockChain) GetRootByDiffHash(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) (*types.VerifyResult, error) { + var res types.VerifyResult + res.BlockNumber = blockNumber + res.BlockHash = blockHash + + if blockNumber > bc.CurrentHeader().Number.Uint64()+11 { + res.Status = types.StatusBlockTooNew + return &res, nil + } else if blockNumber > bc.CurrentHeader().Number.Uint64() { + res.Status = types.StatusBlockNewer + return &res, nil + } + + diff := bc.GetTrustedDiffLayer(blockHash) + if diff != nil { + if diff.DiffHash == (common.Hash{}) { + hash, err := GetTrustedDiffHash(diff) + if err != nil { + res.Status = types.StatusUnexpectedError + return &res, err + } + + diff.DiffHash = hash + } + + if diffHash != diff.DiffHash { + res.Status = types.StatusDiffHashMismatch + return &res, nil + } + + header := bc.GetHeaderByHash(blockHash) + if header == nil { + res.Status = types.StatusUnexpectedError + return &res, fmt.Errorf("unexpected error, header not found") + } + res.Status = types.StatusFullVerified + res.Root = header.Root + return &res, nil + } + + header := bc.GetHeaderByHash(blockHash) + if header == nil { + if blockNumber > bc.CurrentHeader().Number.Uint64()-11 { + res.Status = types.StatusPossibleFork + return &res, nil + } + + res.Status = types.StatusImpossibleFork + return &res, nil + } + + res.Status = types.StatusUntrustedVerified + res.Root = header.Root + return &res, nil +} + +func (bc *BlockChain) GetTrustedDiffLayer(blockHash common.Hash) *types.DiffLayer { + var diff *types.DiffLayer + if cached, ok := bc.diffLayerCache.Get(blockHash); ok { + diff = cached.(*types.DiffLayer) + return diff + } + + diffStore := bc.db.DiffStore() + if diffStore != nil { + diff = rawdb.ReadDiffLayer(diffStore, blockHash) + } + return diff +} + +func GetTrustedDiffHash(d *types.DiffLayer) (common.Hash, error) { + diff := &types.ExtDiffLayer{ + BlockHash: d.BlockHash, + Receipts: make([]*types.ReceiptForStorage, 0), + Number: d.Number, + Codes: d.Codes, + Destructs: d.Destructs, + Accounts: d.Accounts, + Storages: d.Storages, + } + + for index, account := range diff.Accounts { + full, err := snapshot.FullAccount(account.Blob) + if err != nil { + return common.Hash{}, fmt.Errorf("decode full account error: %v", err) + } + // set account root to empty root + diff.Accounts[index].Blob = snapshot.SlimAccountRLP(full.Nonce, full.Balance, common.Hash{}, full.CodeHash) + } + + rawData, err := rlp.EncodeToBytes(diff) + if err != nil { + return common.Hash{}, fmt.Errorf("encode new diff error: %v", err) + } + + hasher := sha3.NewLegacyKeccak256() + _, err = hasher.Write(rawData) + if err != nil { + return common.Hash{}, fmt.Errorf("hasher write error: %v", err) + } + + var hash common.Hash + hasher.Sum(hash[:0]) + return hash, nil +} diff --git a/core/state/state_object.go b/core/state/state_object.go index 298f4305ba..c86585a658 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -391,6 +391,13 @@ func (s *StateObject) updateTrie(db Database) Trie { // UpdateRoot sets the trie root to the current root hash of func (s *StateObject) updateRoot(db Database) { + // If node runs in no trie mode, set root to empty. + defer func() { + if db.NoTries() { + s.data.Root = common.Hash{} + } + }() + // If nothing changed, don't bother with hashing anything if s.updateTrie(db) == nil { return diff --git a/core/state/statedb.go b/core/state/statedb.go index 1bb1db7ba1..17eb3bf856 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1575,8 +1575,12 @@ func (s *StateDB) SnapToDiffLayer() ([]common.Address, []types.DiffAccount, []ty for accountHash, storage := range s.snapStorage { keys := make([]string, 0, len(storage)) values := make([][]byte, 0, len(storage)) - for k, v := range storage { + for k, _ := range storage { keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + v := storage[k] values = append(values, v) } storages = append(storages, types.DiffStorage{ diff --git a/core/types/block.go b/core/types/block.go index bee5d80cdd..f3c487b684 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -40,6 +40,40 @@ var ( EmptyUncleHash = rlpHash([]*Header(nil)) ) +type VerifyStatus struct { + Code uint16 + Msg string +} + +var ( + // StatusVerified means the processing of request going as expected and found the root correctly. + StatusVerified = VerifyStatus{Code: 0x100} + StatusFullVerified = VerifyStatus{Code: 0x101, Msg: "state root full verified"} + StatusUntrustedVerified = VerifyStatus{Code: 0x102, Msg: "state root untrusted verified, because of difflayer not found"} + + // StatusFailed means the request has something wrong. + StatusFailed = VerifyStatus{Code: 0x200} + StatusDiffHashMismatch = VerifyStatus{Code: 0x201, Msg: "verify failed because of blockhash mismatch with diffhash"} + StatusImpossibleFork = VerifyStatus{Code: 0x202, Msg: "verify failed because of impossible fork detected"} + + // StatusUncertain means verify node can't give a certain result of the request. + StatusUncertain = VerifyStatus{Code: 0x300} + StatusBlockTooNew = VerifyStatus{Code: 0x301, Msg: "can’t verify because of block number larger than current height more than 11"} + StatusBlockNewer = VerifyStatus{Code: 0x302, Msg: "can’t verify because of block number larger than current height"} + StatusPossibleFork = VerifyStatus{Code: 0x303, Msg: "can’t verify because of possible fork detected"} + StatusRequestTooBusy = VerifyStatus{Code: 0x304, Msg: "can’t verify because of request too busy"} + + // StatusUnexpectedError is unexpected internal error. + StatusUnexpectedError = VerifyStatus{Code: 0x400, Msg: "can’t verify because of unexpected internal error"} +) + +type VerifyResult struct { + Status VerifyStatus + BlockNumber uint64 + BlockHash common.Hash + Root common.Hash +} + // A BlockNonce is a 64-bit hash which proves (combined with the // mix-hash) that a sufficient amount of computation has been carried // out on a block. @@ -383,7 +417,7 @@ type DiffLayer struct { DiffHash common.Hash } -type extDiffLayer struct { +type ExtDiffLayer struct { BlockHash common.Hash Number uint64 Receipts []*ReceiptForStorage // Receipts are duplicated stored to simplify the logic @@ -395,7 +429,7 @@ type extDiffLayer struct { // DecodeRLP decodes the Ethereum func (d *DiffLayer) DecodeRLP(s *rlp.Stream) error { - var ed extDiffLayer + var ed ExtDiffLayer if err := s.Decode(&ed); err != nil { return err } @@ -415,7 +449,7 @@ func (d *DiffLayer) EncodeRLP(w io.Writer) error { for i, receipt := range d.Receipts { storageReceipts[i] = (*ReceiptForStorage)(receipt) } - return rlp.Encode(w, extDiffLayer{ + return rlp.Encode(w, ExtDiffLayer{ BlockHash: d.BlockHash, Number: d.Number, Receipts: storageReceipts, @@ -443,17 +477,66 @@ type DiffCode struct { Code []byte } +// DiffCodeSlice is used for sort +type DiffCodeSlice []DiffCode + +func (s DiffCodeSlice) Len() int { + return len(s) +} + +func (s DiffCodeSlice) Less(i, j int) bool { + return s[i].Hash.Hex() < s[j].Hash.Hex() +} + +func (s DiffCodeSlice) Swap(i, j int) { + s[i].Hash, s[j].Hash = s[j].Hash, s[i].Hash + s[i].Code, s[j].Code = s[j].Code, s[i].Code +} + type DiffAccount struct { Account common.Address Blob []byte } +// DiffAccountSlice is used for sort +type DiffAccountSlice []DiffAccount + +func (s DiffAccountSlice) Len() int { + return len(s) +} + +func (s DiffAccountSlice) Less(i, j int) bool { + return s[i].Account.Hex() < s[j].Account.Hex() +} + +func (s DiffAccountSlice) Swap(i, j int) { + s[i].Account, s[j].Account = s[j].Account, s[i].Account + s[i].Blob, s[j].Blob = s[j].Blob, s[i].Blob +} + type DiffStorage struct { Account common.Address Keys []string Vals [][]byte } +// DiffStorageSlice is used for sort +type DiffStorageSlice []DiffStorage + +func (s DiffStorageSlice) Len() int { + return len(s) +} + +func (s DiffStorageSlice) Less(i, j int) bool { + return s[i].Account.Hex() < s[j].Account.Hex() +} + +func (s DiffStorageSlice) Swap(i, j int) { + s[i].Account, s[j].Account = s[j].Account, s[i].Account + s[i].Keys, s[j].Keys = s[j].Keys, s[i].Keys + s[i].Vals, s[j].Vals = s[j].Vals, s[i].Vals +} + type DiffAccountsInTx struct { TxHash common.Hash Accounts map[common.Address]*big.Int diff --git a/eth/backend.go b/eth/backend.go index 285bb6f7a7..b9e8e40cb9 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -554,6 +556,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol { } // diff protocol can still open without snap protocol protos = append(protos, diff.MakeProtocols((*diffHandler)(s.handler), s.snapDialCandidates)...) + protos = append(protos, trust.MakeProtocols((*trustHandler)(s.handler), s.snapDialCandidates)...) return protos } diff --git a/eth/handler.go b/eth/handler.go index cbc6eca809..e47d3eee8d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/forkid" @@ -269,6 +271,11 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Diff extension barrier failed", "err", err) return err } + trust, err := h.peers.waitTrustExtension(peer) + if err != nil { + peer.Log().Error("Trust extension barrier failed", "err", err) + return err + } // TODO(karalabe): Not sure why this is needed if !h.chainSync.handlePeerEvent(peer) { return p2p.DiscQuitting @@ -309,7 +316,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Debug("Ethereum peer connected", "name", peer.Name()) // Register the peer locally - if err := h.peers.registerPeer(peer, snap, diff); err != nil { + if err := h.peers.registerPeer(peer, snap, diff, trust); err != nil { peer.Log().Error("Ethereum peer registration failed", "err", err) return err } @@ -395,6 +402,21 @@ func (h *handler) runDiffExtension(peer *diff.Peer, handler diff.Handler) error return handler(peer) } +// runTrustExtension registers a `trust` peer into the joint eth/trust peerset and +// starts handling inbound messages. As `trust` is only a satellite protocol to +// `eth`, all subsystem registrations and lifecycle management will be done by +// the main `eth` handler to prevent strange races. +func (h *handler) runTrustExtension(peer *trust.Peer, handler trust.Handler) error { + h.peerWG.Add(1) + defer h.peerWG.Done() + + if err := h.peers.registerTrustExtension(peer); err != nil { + peer.Log().Error("Trust extension registration failed", "err", err) + return err + } + return handler(peer) +} + // removePeer unregisters a peer from the downloader and fetchers, removes it from // the set of tracked peers and closes the network connection to it. func (h *handler) removePeer(id string) { diff --git a/eth/handler_trust.go b/eth/handler_trust.go new file mode 100644 index 0000000000..6df630a2e8 --- /dev/null +++ b/eth/handler_trust.go @@ -0,0 +1,43 @@ +package eth + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// trustHandler implements the trust.Backend interface to handle the various network +// packets that are sent as replies or broadcasts. +type trustHandler handler + +func (h *trustHandler) Chain() *core.BlockChain { return h.chain } + +// RunPeer is invoked when a peer joins on the `snap` protocol. +func (h *trustHandler) RunPeer(peer *trust.Peer, hand trust.Handler) error { + return (*handler)(h).runTrustExtension(peer, hand) +} + +// PeerInfo retrieves all known `trust` information about a peer. +func (h *trustHandler) PeerInfo(id enode.ID) interface{} { + if p := h.peers.peer(id.String()); p != nil { + if p.trustExt != nil { + return p.trustExt.info() + } + } + return nil +} + +// Handle is invoked from a peer's message handler when it receives a new remote +// message that the handler couldn't consume and serve itself. +func (h *trustHandler) Handle(peer *trust.Peer, packet trust.Packet) error { + switch packet := packet.(type) { + case *trust.RootResponsePacket: + // TODO: h.bc.VerifyManager().HandleRootResponse(peer.ID(), *packet) + return nil + + default: + return fmt.Errorf("unexpected trust packet type: %T", packet) + } +} diff --git a/eth/peer.go b/eth/peer.go index 2fb6fabf26..4d92f4e78f 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -37,8 +39,9 @@ type ethPeerInfo struct { // ethPeer is a wrapper around eth.Peer to maintain a few extra metadata. type ethPeer struct { *eth.Peer - snapExt *snapPeer // Satellite `snap` connection - diffExt *diffPeer + snapExt *snapPeer // Satellite `snap` connection + diffExt *diffPeer + trustExt *trustPeer syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time snapWait chan struct{} // Notification channel for snap connections @@ -69,6 +72,12 @@ type diffPeerInfo struct { DiffSync bool `json:"diff_sync"` } +// trustPeerInfo represents a short summary of the `trust` sub-protocol metadata known +// about a connected peer. +type trustPeerInfo struct { + Version uint `json:"version"` // Trust protocol version negotiated +} + // snapPeer is a wrapper around snap.Peer to maintain a few extra metadata. type snapPeer struct { *snap.Peer @@ -79,6 +88,11 @@ type diffPeer struct { *diff.Peer } +// trustPeer is a wrapper around trust.Peer to maintain a few extra metadata. +type trustPeer struct { + *trust.Peer +} + // info gathers and returns some `diff` protocol metadata known about a peer. func (p *diffPeer) info() *diffPeerInfo { return &diffPeerInfo{ @@ -93,3 +107,10 @@ func (p *snapPeer) info() *snapPeerInfo { Version: p.Version(), } } + +// info gathers and returns some `trust` protocol metadata known about a peer. +func (p *trustPeer) info() *trustPeerInfo { + return &trustPeerInfo{ + Version: p.Version(), + } +} diff --git a/eth/peerset.go b/eth/peerset.go index 0f5245a05e..dc1d7da45e 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/diff" @@ -53,6 +55,10 @@ var ( // errDiffWithoutEth is returned if a peer attempts to connect only on the // diff protocol without advertising the eth main protocol. errDiffWithoutEth = errors.New("peer connected on diff without compatible eth support") + + // errTrustWithoutEth is returned if a peer attempts to connect only on the + // trust protocol without advertising the eth main protocol. + errTrustWithoutEth = errors.New("peer connected on trust without compatible eth support") ) const ( @@ -73,6 +79,9 @@ type peerSet struct { diffWait map[string]chan *diff.Peer // Peers connected on `eth` waiting for their diff extension diffPend map[string]*diff.Peer // Peers connected on the `diff` protocol, but not yet on `eth` + trustWait map[string]chan *trust.Peer // Peers connected on `eth` waiting for their trust extension + trustPend map[string]*trust.Peer // Peers connected on the `trust` protocol, but not yet on `eth` + lock sync.RWMutex closed bool } @@ -80,11 +89,13 @@ type peerSet struct { // newPeerSet creates a new peer set to track the active participants. func newPeerSet() *peerSet { return &peerSet{ - peers: make(map[string]*ethPeer), - snapWait: make(map[string]chan *snap.Peer), - snapPend: make(map[string]*snap.Peer), - diffWait: make(map[string]chan *diff.Peer), - diffPend: make(map[string]*diff.Peer), + peers: make(map[string]*ethPeer), + snapWait: make(map[string]chan *snap.Peer), + snapPend: make(map[string]*snap.Peer), + diffWait: make(map[string]chan *diff.Peer), + diffPend: make(map[string]*diff.Peer), + trustWait: make(map[string]chan *trust.Peer), + trustPend: make(map[string]*trust.Peer), } } @@ -148,6 +159,40 @@ func (ps *peerSet) registerDiffExtension(peer *diff.Peer) error { return nil } +// registerTrustExtension unblocks an already connected `eth` peer waiting for its +// `trust` extension, or if no such peer exists, tracks the extension for the time +// being until the `eth` main protocol starts looking for it. +func (ps *peerSet) registerTrustExtension(peer *trust.Peer) error { + // Reject the peer if it advertises `trust` without `eth` as `trust` is only a + // satellite protocol meaningful with the chain selection of `eth` + if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) { + return errTrustWithoutEth + } + // If the peer isn't verify node, don't register trust extension into eth protocol. + if !peer.VerifyNode() { + return nil + } + // Ensure nobody can double connect + ps.lock.Lock() + defer ps.lock.Unlock() + + id := peer.ID() + if _, ok := ps.peers[id]; ok { + return errPeerAlreadyRegistered // avoid connections with the same id as existing ones + } + if _, ok := ps.trustPend[id]; ok { + return errPeerAlreadyRegistered // avoid connections with the same id as pending ones + } + // Inject the peer into an `eth` counterpart is available, otherwise save for later + if wait, ok := ps.trustWait[id]; ok { + delete(ps.trustWait, id) + wait <- peer + return nil + } + ps.trustPend[id] = peer + return nil +} + // waitExtensions blocks until all satellite protocols are connected and tracked // by the peerset. func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) { @@ -234,6 +279,53 @@ func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) { } } +// waitTrustExtension blocks until all satellite protocols are connected and tracked +// by the peerset. +func (ps *peerSet) waitTrustExtension(peer *eth.Peer) (*trust.Peer, error) { + // If the peer does not support a compatible `trust`, don't wait + if !peer.RunningCap(trust.ProtocolName, trust.ProtocolVersions) { + return nil, nil + } + // If the peer isn't verify node, don't register trust extension into eth protocol. + if !peer.VerifyNode() { + return nil, nil + } + // Ensure nobody can double connect + ps.lock.Lock() + + id := peer.ID() + if _, ok := ps.peers[id]; ok { + ps.lock.Unlock() + return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones + } + if _, ok := ps.trustWait[id]; ok { + ps.lock.Unlock() + return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones + } + // If `trust` already connected, retrieve the peer from the pending set + if trust, ok := ps.trustPend[id]; ok { + delete(ps.trustPend, id) + + ps.lock.Unlock() + return trust, nil + } + // Otherwise wait for `trust` to connect concurrently + wait := make(chan *trust.Peer) + ps.trustWait[id] = wait + ps.lock.Unlock() + + select { + case peer := <-wait: + return peer, nil + + case <-time.After(extensionWaitTimeout): + ps.lock.Lock() + delete(ps.trustWait, id) + ps.lock.Unlock() + return nil, errPeerWaitTimeout + } +} + func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer { if p := ps.peer(pid); p != nil && p.diffExt != nil { return p.diffExt @@ -241,9 +333,20 @@ func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer { return nil } +// GetVerifyPeers returns an array of verify nodes. +func (ps *peerSet) GetVerifyPeers() []*trustPeer { + res := make([]*trustPeer, 0) + for _, p := range ps.peers { + if p.trustExt != nil { + res = append(res, p.trustExt) + } + } + return res +} + // registerPeer injects a new `eth` peer into the working set, or returns an error // if the peer is already known. -func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error { +func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer, trustExt *trust.Peer) error { // Start tracking the new peer ps.lock.Lock() defer ps.lock.Unlock() @@ -265,6 +368,9 @@ func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Pe if diffExt != nil { eth.diffExt = &diffPeer{diffExt} } + if trustExt != nil { + eth.trustExt = &trustPeer{trustExt} + } ps.peers[id] = eth return nil } diff --git a/eth/protocols/trust/discovery.go b/eth/protocols/trust/discovery.go new file mode 100644 index 0000000000..ce38ec5ed9 --- /dev/null +++ b/eth/protocols/trust/discovery.go @@ -0,0 +1,14 @@ +package trust + +import "github.com/ethereum/go-ethereum/rlp" + +// enrEntry is the ENR entry which advertises `trust` protocol on the discovery. +type enrEntry struct { + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` +} + +// ENRKey implements enr.Entry. +func (e enrEntry) ENRKey() string { + return "trust" +} diff --git a/eth/protocols/trust/handler.go b/eth/protocols/trust/handler.go new file mode 100644 index 0000000000..ce8fc4cd8d --- /dev/null +++ b/eth/protocols/trust/handler.go @@ -0,0 +1,159 @@ +package trust + +import ( + "fmt" + "time" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" +) + +// Handler is a callback to invoke from an outside runner after the boilerplate +// exchanges have passed. +type Handler func(peer *Peer) error + +type Backend interface { + // Chain retrieves the blockchain object to serve data. + Chain() *core.BlockChain + + // RunPeer is invoked when a peer joins on the `eth` protocol. The handler + // should do any peer maintenance work, handshakes and validations. If all + // is passed, control should be given back to the `handler` to process the + // inbound messages going forward. + RunPeer(peer *Peer, handler Handler) error + + PeerInfo(id enode.ID) interface{} + + Handle(peer *Peer, packet Packet) error +} + +// MakeProtocols constructs the P2P protocol definitions for `trust`. +func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { + // Filter the discovery iterator for nodes advertising trust support. + dnsdisc = enode.Filter(dnsdisc, func(n *enode.Node) bool { + var trust enrEntry + return n.Load(&trust) == nil + }) + + protocols := make([]p2p.Protocol, len(ProtocolVersions)) + for i, version := range ProtocolVersions { + version := version // Closure + + protocols[i] = p2p.Protocol{ + Name: ProtocolName, + Version: version, + Length: protocolLengths[version], + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + return backend.RunPeer(NewPeer(version, p, rw), func(peer *Peer) error { + defer peer.Close() + return Handle(backend, peer) + }) + }, + NodeInfo: func() interface{} { + return nodeInfo(backend.Chain()) + }, + PeerInfo: func(id enode.ID) interface{} { + return backend.PeerInfo(id) + }, + Attributes: []enr.Entry{&enrEntry{}}, + DialCandidates: dnsdisc, + } + } + return protocols +} + +// Handle is the callback invoked to manage the life cycle of a `trust` peer. +// When this function terminates, the peer is disconnected. +func Handle(backend Backend, peer *Peer) error { + for { + if err := handleMessage(backend, peer); err != nil { + peer.Log().Debug("Message handling failed in `trust`", "err", err) + return err + } + } +} + +// handleMessage is invoked whenever an inbound message is received from a +// remote peer on the `diff` protocol. The remote connection is torn down upon +// returning any error. +func handleMessage(backend Backend, peer *Peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed + msg, err := peer.rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > maxMessageSize { + return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize) + } + defer msg.Discard() + + // Track the amount of time it takes to serve the request and run the handler + if metrics.Enabled { + h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) + defer func(start time.Time) { + sampler := func() metrics.Sample { + return metrics.ResettingSample( + metrics.NewExpDecaySample(1028, 0.015), + ) + } + metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds()) + }(time.Now()) + } + // Handle the message depending on its contents + switch { + case msg.Code == RequestRootMsg: + return handleRootRequest(backend, msg, peer) + + case msg.Code == RespondRootMsg: + return handleRootResponse(backend, msg, peer) + + default: + return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code) + } +} + +type Decoder interface { + Decode(val interface{}) error + Time() time.Time +} + +func handleRootRequest(backend Backend, msg Decoder, peer *Peer) error { + req := new(RootRequestPacket) + if err := msg.Decode(req); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + res, err := backend.Chain().GetRootByDiffHash(req.BlockNumber, req.BlockHash, req.DiffHash) + p2p.Send(peer.rw, RespondRootMsg, RootResponsePacket{ + RequestId: req.RequestId, + Status: res.Status, + BlockNumber: req.BlockNumber, + BlockHash: req.BlockHash, + Root: res.Root, + Extra: defaultExtra, + }) + + return err +} + +func handleRootResponse(backend Backend, msg Decoder, peer *Peer) error { + res := new(RootResponsePacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + requestTracker.Fulfil(peer.id, peer.version, RespondRootMsg, res.RequestId) + return backend.Handle(peer, res) +} + +// NodeInfo represents a short summary of the `trust` sub-protocol metadata +// known about the host peer. +type NodeInfo struct{} + +// nodeInfo retrieves some `trust` protocol metadata about the running host node. +func nodeInfo(chain *core.BlockChain) *NodeInfo { + return &NodeInfo{} +} diff --git a/eth/protocols/trust/peer.go b/eth/protocols/trust/peer.go new file mode 100644 index 0000000000..18ba229914 --- /dev/null +++ b/eth/protocols/trust/peer.go @@ -0,0 +1,66 @@ +package trust + +import ( + "math/rand" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" +) + +// Peer is a collection of relevant information we have about a `trust` peer. +type Peer struct { + id string // Unique ID for the peer, cached + + *p2p.Peer // The embedded P2P package peer + rw p2p.MsgReadWriter // Input/output streams for diff + version uint // Protocol version negotiated + logger log.Logger // Contextual logger with the peer id injected +} + +// NewPeer create a wrapper for a network connection and negotiated protocol +// version. +func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer { + id := p.ID().String() + peer := &Peer{ + id: id, + Peer: p, + rw: rw, + version: version, + logger: log.New("peer", id[:8]), + } + return peer +} + +// ID retrieves the peer's unique identifier. +func (p *Peer) ID() string { + return p.id +} + +// Version retrieves the peer's negoatiated `diff` protocol version. +func (p *Peer) Version() uint { + return p.version +} + +// Log overrides the P2P logget with the higher level one containing only the id. +func (p *Peer) Log() log.Logger { + return p.logger +} + +// Close signals the broadcast goroutine to terminate. Only ever call this if +// you created the peer yourself via NewPeer. Otherwise let whoever created it +// clean it up! +func (p *Peer) Close() { +} + +func (p *Peer) RequestRoot(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) error { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, RequestRootMsg, RespondRootMsg, id) + return p2p.Send(p.rw, RequestRootMsg, RootRequestPacket{ + RequestId: id, + BlockNumber: blockNumber, + BlockHash: blockHash, + DiffHash: diffHash, + }) +} diff --git a/eth/protocols/trust/protocol.go b/eth/protocols/trust/protocol.go new file mode 100644 index 0000000000..2bd7fecc15 --- /dev/null +++ b/eth/protocols/trust/protocol.go @@ -0,0 +1,72 @@ +package trust + +import ( + "errors" + + "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" +) + +// Constants to match up protocol versions and messages +const ( + Trust1 = 1 +) + +// ProtocolName is the official short name of the `trust` protocol used during +// devp2p capability negotiation. +const ProtocolName = "trust" + +// ProtocolVersions are the supported versions of the `trust` protocol (first +// is primary). +var ProtocolVersions = []uint{Trust1} + +// protocolLengths are the number of implemented message corresponding to +// different protocol versions. +var protocolLengths = map[uint]uint64{Trust1: 2} + +// maxMessageSize is the maximum cap on the size of a protocol message. +const maxMessageSize = 10 * 1024 * 1024 + +const ( + RequestRootMsg = 0x00 + RespondRootMsg = 0x01 +) + +var defaultExtra = []byte{0x00} + +var ( + errMsgTooLarge = errors.New("message too long") + errDecode = errors.New("invalid message") + errInvalidMsgCode = errors.New("invalid message code") + errUnexpectedMsg = errors.New("unexpected message code") +) + +// Packet represents a p2p message in the `trust` protocol. +type Packet interface { + Name() string // Name returns a string corresponding to the message type. + Kind() byte // Kind returns the message type. +} + +type RootRequestPacket struct { + RequestId uint64 + BlockNumber uint64 + BlockHash common.Hash + DiffHash common.Hash +} + +type RootResponsePacket struct { + RequestId uint64 + Status types.VerifyStatus + BlockNumber uint64 + BlockHash common.Hash + Root common.Hash + Extra rlp.RawValue // for extension +} + +func (*RootRequestPacket) Name() string { return "RequestRoot" } +func (*RootRequestPacket) Kind() byte { return RequestRootMsg } + +func (*RootResponsePacket) Name() string { return "RootResponse" } +func (*RootResponsePacket) Kind() byte { return RespondRootMsg } diff --git a/eth/protocols/trust/tracker.go b/eth/protocols/trust/tracker.go new file mode 100644 index 0000000000..ab492b3fb8 --- /dev/null +++ b/eth/protocols/trust/tracker.go @@ -0,0 +1,10 @@ +package trust + +import ( + "time" + + "github.com/ethereum/go-ethereum/p2p/tracker" +) + +// requestTracker is a singleton tracker for request times. +var requestTracker = tracker.New(ProtocolName, time.Minute) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 578b10f09a..395e87fe1d 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -200,6 +200,12 @@ func (ec *Client) GetDiffAccountsWithScope(ctx context.Context, number *big.Int, return &result, err } +func (ec *Client) GetRootByDiffHash(ctx context.Context, blockNr *big.Int, blockHash common.Hash, diffHash common.Hash) (*types.VerifyResult, error) { + var result types.VerifyResult + err := ec.c.CallContext(ctx, &result, "eth_getRootByDiffHash", toBlockNumArg(blockNr), blockHash, diffHash) + return &result, err +} + type rpcTransaction struct { tx *types.Transaction txExtraInfo diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 091e9e7e82..b01ec8b2e5 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1287,6 +1287,10 @@ func (s *PublicBlockChainAPI) GetDiffAccountsWithScope(ctx context.Context, bloc return result, err } +func (s *PublicBlockChainAPI) GetRootByDiffHash(ctx context.Context, blockNr rpc.BlockNumber, blockHash common.Hash, diffHash common.Hash) (*types.VerifyResult, error) { + return s.b.Chain().GetRootByDiffHash(uint64(blockNr), blockHash, diffHash) +} + // ExecutionResult groups all structured logs emitted by the EVM // while replaying a transaction in debug mode as well as transaction // execution status, the amount of gas used and the return value diff --git a/p2p/peer.go b/p2p/peer.go index 3b633108db..cdfaf7f21b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -213,6 +213,11 @@ func (p *Peer) Inbound() bool { return p.rw.is(inboundConn) } +// VerifyNode returns true if the peer is a verification connection +func (p *Peer) VerifyNode() bool { + return p.rw.is(verifyConn) +} + func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ diff --git a/p2p/server.go b/p2p/server.go index 2a38550abf..01e56936ab 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -114,6 +114,10 @@ type Config struct { // maintained and re-connected on disconnects. StaticNodes []*enode.Node + // Verify nodes are used as pre-configured connections which are always + // maintained and re-connected on disconnects. + VerifyNodes []*enode.Node + // Trusted nodes are used as pre-configured connections which are always // allowed to connect, even above the peer limit. TrustedNodes []*enode.Node @@ -218,6 +222,7 @@ const ( staticDialedConn inboundConn trustedConn + verifyConn ) // conn wraps a network connection with information gathered @@ -269,6 +274,9 @@ func (f connFlag) String() string { if f&inboundConn != 0 { s += "-inbound" } + if f&verifyConn != 0 { + s += "-verify" + } if s != "" { s = s[1:] } @@ -649,6 +657,9 @@ func (srv *Server) setupDialScheduler() { for _, n := range srv.StaticNodes { srv.dialsched.addStatic(n) } + for _, n := range srv.VerifyNodes { + srv.dialsched.addStatic(n) + } } func (srv *Server) maxInboundConns() int { @@ -934,6 +945,13 @@ func (srv *Server) checkInboundConn(remoteIP net.IP) error { // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { + // If dialDest is verify node, set verifyConn flags. + for _, n := range srv.VerifyNodes { + if dialDest == n { + flags |= verifyConn + } + } + c := &conn{fd: fd, flags: flags, cont: make(chan error)} if dialDest == nil { c.transport = srv.newTransport(fd, nil)