Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Petx caching in Doms and Peer Location bucketing #885

Merged
merged 1 commit into from May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -41,6 +41,7 @@ include network.env
BASE_CMD = nice -n -20 ./build/bin/go-quai --$(NETWORK) --syncmode $(SYNCMODE) --verbosity $(VERBOSITY) --nonce $(NONCE)
BASE_CMD += --http --http.vhosts=* --http.addr $(HTTP_ADDR) --http.api $(HTTP_API)
BASE_CMD += --ws --ws.addr $(WS_ADDR) --ws.api $(WS_API)
BASE_CMD += --slices $(SLICES)
wizeguyy marked this conversation as resolved.
Show resolved Hide resolved
ifeq ($(ENABLE_ARCHIVE),true)
BASE_CMD += --gcmode archive
endif
Expand Down
1 change: 1 addition & 0 deletions cmd/go-quai/main.go
Expand Up @@ -102,6 +102,7 @@ var (
utils.NodeKeyFileFlag,
utils.NodeKeyHexFlag,
utils.DNSDiscoveryFlag,
utils.SlicesRunningFlag,
utils.ColosseumFlag,
utils.DeveloperFlag,
utils.DeveloperPeriodFlag,
Expand Down
25 changes: 25 additions & 0 deletions cmd/utils/flags.go
Expand Up @@ -130,6 +130,10 @@ var (
Usage: "Explicitly set network id (integer)(For testnets: use --garden)",
Value: ethconfig.Defaults.NetworkId,
}
SlicesRunningFlag = cli.StringFlag{
Name: "slices",
Usage: "All the slices that are running on this node",
}
ColosseumFlag = cli.BoolFlag{
Name: "colosseum",
Usage: "Quai Colosseum testnet",
Expand Down Expand Up @@ -913,6 +917,24 @@ func makeSubUrls(ctx *cli.Context) []string {
return strings.Split(ctx.GlobalString(SubUrls.Name), ",")
}

// setSlicesRunning sets the slices running flag
func setSlicesRunning(ctx *cli.Context, cfg *ethconfig.Config) {
slices := strings.Split(ctx.GlobalString(SlicesRunningFlag.Name), ",")

// Sanity checks
if len(slices) == 0 {
Fatalf("no slices are specified")
}
if len(slices) > common.NumRegionsInPrime*common.NumZonesInRegion {
Fatalf("number of slices exceed the current ontology")
}
slicesRunning := []common.Location{}
for _, slice := range slices {
slicesRunning = append(slicesRunning, common.Location{slice[1] - 48, slice[3] - 48})
}
cfg.SlicesRunning = slicesRunning
}

// MakeDatabaseHandles raises out the number of allowed file handles per process
// for Geth and returns half of the allowance to assign to the database.
func MakeDatabaseHandles() int {
Expand Down Expand Up @@ -1270,6 +1292,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
// set the subordinate chain websocket urls
setSubUrls(ctx, cfg)

// set the slices that the node is running
setSlicesRunning(ctx, cfg)

// Cap the cache allowance and tune the garbage collector
mem, err := gopsutil.VirtualMemory()
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Expand Up @@ -461,7 +461,7 @@ func (c *Core) GetTerminiByHash(hash common.Hash) []common.Hash {
return c.sl.hc.GetTerminiByHash(hash)
}

func (c *Core) SubscribeMissingPendingEtxsEvent(ch chan<- common.Hash) event.Subscription {
func (c *Core) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription {
return c.sl.hc.SubscribeMissingPendingEtxsEvent(ch)
}

Expand Down
26 changes: 13 additions & 13 deletions core/headerchain.go
Expand Up @@ -3,9 +3,9 @@ package core
import (
"errors"
"fmt"
sync "github.com/sasha-s/go-deadlock"
"io"
"math/big"
sync "github.com/sasha-s/go-deadlock"
"sync/atomic"
"time"

Expand Down Expand Up @@ -171,7 +171,7 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
go hc.backfillPETXs(b.Header(), b.SubManifest())
return nil, ErrPendingEtxNotFound
}
subRollup = append(subRollup, pendingEtxs...)
subRollup = append(subRollup, pendingEtxs.Etxs...)
}
} else {
// Start backfilling the missing pending ETXs needed to process this block
Expand All @@ -186,7 +186,7 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
go hc.backfillPETXs(b.Header(), b.SubManifest())
return nil, ErrPendingEtxNotFound
}
subRollup = append(subRollup, pendingEtxs...)
subRollup = append(subRollup, pendingEtxs.Etxs...)
}
}
// Rolluphash is specifically for zone rollup, which can only be validated by region
Expand All @@ -200,18 +200,18 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
}

// GetPendingEtxs gets the pendingEtxs form the
func (hc *HeaderChain) GetPendingEtxs(hash common.Hash) (types.Transactions, error) {
var pendingEtxs types.Transactions
func (hc *HeaderChain) GetPendingEtxs(hash common.Hash) (*types.PendingEtxs, error) {
var pendingEtxs types.PendingEtxs
// Look for pending ETXs first in pending ETX cache, then in database
if res, ok := hc.pendingEtxs.Get(hash); ok && res != nil {
pendingEtxs = res.(types.Transactions)
pendingEtxs = res.(types.PendingEtxs)
} else if res := rawdb.ReadPendingEtxs(hc.headerDb, hash); res != nil {
pendingEtxs = res.Etxs
pendingEtxs = *res
} else {
log.Trace("unable to find pending etxs for hash in manifest", "hash:", hash.String())
return nil, ErrPendingEtxNotFound
}
return pendingEtxs, nil
return &pendingEtxs, nil
}

func (hc *HeaderChain) GetPendingEtxsRollup(hash common.Hash) (types.BlockManifest, error) {
Expand Down Expand Up @@ -239,9 +239,9 @@ func (hc *HeaderChain) backfillPETXs(header *types.Header, subManifest types.Blo
// and then fetch the pending etx for each of the rollup hashes
if manifest, err := hc.GetPendingEtxsRollup(hash); err == nil {
for _, pEtxHash := range manifest {
if _, err := hc.GetPendingEtxs(pEtxHash); err != nil {
if pEtx, err := hc.GetPendingEtxs(pEtxHash); err != nil {
// Send the pendingEtxs to the feed for broadcast
hc.missingPendingEtxsFeed.Send(pEtxHash)
hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: pEtxHash, Location: pEtx.Header.Location()})
}
}
} else {
Expand All @@ -250,7 +250,7 @@ func (hc *HeaderChain) backfillPETXs(header *types.Header, subManifest types.Blo
} else if nodeCtx == common.REGION_CTX {
if _, err := hc.GetPendingEtxs(hash); err != nil {
// Send the pendingEtxs to the feed for broadcast
hc.missingPendingEtxsFeed.Send(hash)
hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: hash, Location: header.Location()})
}
}
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (hc *HeaderChain) AddPendingEtxs(pEtxs types.PendingEtxs) error {
// Write to pending ETX database
rawdb.WritePendingEtxs(hc.headerDb, pEtxs)
// Also write to cache for faster access
hc.pendingEtxs.Add(pEtxs.Header.Hash(), pEtxs.Etxs)
hc.pendingEtxs.Add(pEtxs.Header.Hash(), pEtxs)
} else {
return ErrPendingEtxAlreadyKnown
}
Expand Down Expand Up @@ -885,7 +885,7 @@ func (hc *HeaderChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.S
return hc.scope.Track(hc.chainSideFeed.Subscribe(ch))
}

func (hc *HeaderChain) SubscribeMissingPendingEtxsEvent(ch chan<- common.Hash) event.Subscription {
func (hc *HeaderChain) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription {
return hc.scope.Track(hc.missingPendingEtxsFeed.Subscribe(ch))
}

Expand Down
5 changes: 5 additions & 0 deletions core/types/block.go
Expand Up @@ -1069,3 +1069,8 @@ type HashAndNumber struct {
Hash common.Hash
Number uint64
}

type HashAndLocation struct {
Hash common.Hash
Location common.Location
}
17 changes: 9 additions & 8 deletions eth/backend.go
Expand Up @@ -195,14 +195,15 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Core: eth.core,
TxPool: eth.core.TxPool(),
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Whitelist: config.Whitelist,
Database: chainDb,
Core: eth.core,
TxPool: eth.core.TxPool(),
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Whitelist: config.Whitelist,
SlicesRunning: config.SlicesRunning,
}); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Expand Up @@ -154,6 +154,9 @@ type Config struct {

// Sub node websocket urls
SubUrls []string

// Slices running on the node
SlicesRunning []common.Location
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
74 changes: 39 additions & 35 deletions eth/handler.go
Expand Up @@ -100,19 +100,21 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Core *core.Core // Core to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
Database ethdb.Database // Database for direct sync insertions
Core *core.Core // Core to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
SlicesRunning []common.Location // Slices run by the node
}

type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
slicesRunning []common.Location // Slices running on the node

acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)

Expand All @@ -132,7 +134,7 @@ type handler struct {
minedBlockSub *event.TypeMuxSubscription
missingBodyCh chan *types.Header
missingBodySub event.Subscription
missingPendingEtxsCh chan common.Hash
missingPendingEtxsCh chan types.HashAndLocation
missingPendingEtxsSub event.Subscription
missingParentCh chan common.Hash
missingParentSub event.Subscription
Expand Down Expand Up @@ -163,16 +165,17 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Core),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
core: config.Core,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: config.Network,
slicesRunning: config.SlicesRunning,
forkFilter: forkid.NewFilter(config.Core),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
core: config.Core,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}

h.downloader = downloader.New(config.Database, h.eventMux, h.core, h.removePeer)
Expand Down Expand Up @@ -229,7 +232,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
entropy = head.CalcS()
)
forkID := forkid.NewID(h.core.Config(), h.core.Genesis().Hash(), h.core.CurrentHeader().Number().Uint64())
if err := peer.Handshake(h.networkID, entropy, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
if err := peer.Handshake(h.networkID, h.slicesRunning, entropy, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down Expand Up @@ -327,7 +330,7 @@ func (h *handler) Start(maxPeers int) {

// broadcast pending etxs
h.wg.Add(1)
h.missingPendingEtxsCh = make(chan common.Hash, missingPendingEtxsChanSize)
h.missingPendingEtxsCh = make(chan types.HashAndLocation, missingPendingEtxsChanSize)
h.missingPendingEtxsSub = h.core.SubscribeMissingPendingEtxsEvent(h.missingPendingEtxsCh)
go h.missingPendingEtxsLoop()

Expand Down Expand Up @@ -536,9 +539,7 @@ func (h *handler) missingPEtxsRollupLoop() {
// Check if any of the peers have the body
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing pending etxs rollup from", "peer", peer.ID(), "hash", hash)
if err := peer.RequestOnePendingEtxsRollup(hash); err != nil {
return
}
peer.RequestOnePendingEtxsRollup(hash)
}

case <-h.missingPEtxsRollupSub.Err():
Expand All @@ -552,13 +553,18 @@ func (h *handler) missingPendingEtxsLoop() {
defer h.wg.Done()
for {
select {
case hash := <-h.missingPendingEtxsCh:
case hashAndLocation := <-h.missingPendingEtxsCh:
// Only ask from peers running the slice for the missing pending etxs
// In the future, peers not responding before the timeout has to be punished
peersRunningSlice := h.peers.peerRunningSlice(hashAndLocation.Location)
wizeguyy marked this conversation as resolved.
Show resolved Hide resolved
// If the node doesn't have any peer running that slice, add a warning
if len(peersRunningSlice) == 0 {
log.Warn("Node doesn't have peers for given Location", "location", hashAndLocation.Location)
}
// Check if any of the peers have the body
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hash)
if err := peer.RequestOnePendingEtxs(hash); err != nil {
return
}
for _, peer := range peersRunningSlice {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash)
peer.RequestOnePendingEtxs(hashAndLocation.Hash)
}
case <-h.missingPendingEtxsSub.Err():
return
Expand All @@ -575,9 +581,7 @@ func (h *handler) missingParentLoop() {
// Check if any of the peers have the body
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing parent from", "peer", peer.ID(), "hash", hash)
if err := peer.RequestBlockByHash(hash); err != nil {
return
}
peer.RequestBlockByHash(hash)
}
case <-h.missingParentSub.Err():
return
Expand Down
22 changes: 22 additions & 0 deletions eth/peerset.go
Expand Up @@ -174,6 +174,28 @@ func (ps *peerSet) peerWithHighestEntropy() *eth.Peer {
return bestPeer
}

func (ps *peerSet) peerRunningSlice(location common.Location) []*eth.Peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

containsLocation := func(s []common.Location, e common.Location) bool {
for _, a := range s {
if common.Location.Equal(a, e) {
return true
}
}
return false
}

var peersRunningSlice []*eth.Peer
for _, p := range ps.peers {
if containsLocation(p.Peer.SlicesRunning(), location) {
peersRunningSlice = append(peersRunningSlice, p.Peer)
}
}
return peersRunningSlice
}

// close disconnects all peers.
func (ps *peerSet) close() {
ps.lock.Lock()
Expand Down