Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ type (
DiscoveryDNS string
Database string
NoDiscovery bool
MaxRequests int
RequestsCacheTTL time.Duration
MaxBlocks int
BlocksCacheTTL time.Duration
RequestsCache p2p.CacheOptions
ParentsCache p2p.CacheOptions
BlocksCache p2p.CacheOptions

bootnodes []*enode.Node
staticNodes []*enode.Node
Expand Down Expand Up @@ -196,24 +195,23 @@ var SensorCmd = &cobra.Command{
// Create peer connection manager for broadcasting transactions
// and managing the global blocks cache
conns := p2p.NewConns(p2p.ConnsOptions{
MaxBlocks: inputSensorParams.MaxBlocks,
BlocksCacheTTL: inputSensorParams.BlocksCacheTTL,
BlocksCache: inputSensorParams.BlocksCache,
})

opts := p2p.EthProtocolOptions{
Context: cmd.Context(),
Database: db,
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
RPC: inputSensorParams.RPC,
SensorID: inputSensorParams.SensorID,
NetworkID: inputSensorParams.NetworkID,
Conns: conns,
Head: &head,
HeadMutex: &sync.RWMutex{},
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
MsgCounter: msgCounter,
MaxRequests: inputSensorParams.MaxRequests,
RequestsCacheTTL: inputSensorParams.RequestsCacheTTL,
Context: cmd.Context(),
Database: db,
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
RPC: inputSensorParams.RPC,
SensorID: inputSensorParams.SensorID,
NetworkID: inputSensorParams.NetworkID,
Conns: conns,
Head: &head,
HeadMutex: &sync.RWMutex{},
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
MsgCounter: msgCounter,
RequestsCache: inputSensorParams.RequestsCache,
ParentsCache: inputSensorParams.ParentsCache,
}

config := ethp2p.Config{
Expand Down Expand Up @@ -486,8 +484,10 @@ will result in less chance of missing data but can significantly increase memory
- json (output to stdout)
- none (no persistence)`)
f.BoolVar(&inputSensorParams.NoDiscovery, "no-discovery", false, "disable P2P peer discovery")
f.IntVar(&inputSensorParams.MaxRequests, "max-requests", 2048, "maximum request IDs to track per peer (0 for no limit)")
f.DurationVar(&inputSensorParams.RequestsCacheTTL, "requests-cache-ttl", 5*time.Minute, "time to live for requests cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.MaxBlocks, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
f.DurationVar(&inputSensorParams.BlocksCacheTTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.RequestsCache.MaxSize, "max-requests", 2048, "maximum request IDs to track per peer (0 for no limit)")
f.DurationVar(&inputSensorParams.RequestsCache.TTL, "requests-cache-ttl", 5*time.Minute, "time to live for requests cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.ParentsCache.MaxSize, "max-parents", 1024, "maximum parent block hashes to track per peer (0 for no limit)")
f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
}
2 changes: 2 additions & 0 deletions doc/polycli_p2p_sensor.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ If no nodes.json file exists, it will be created.
--max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024)
-D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this
will result in less chance of missing data but can significantly increase memory usage) (default 10000)
--max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024)
-m, --max-peers int maximum number of peers to connect to (default 2000)
--max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048)
--nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:<IP>|extip:<IP>) (default "any")
-n, --network-id uint filter discovered nodes by this network ID
--no-discovery disable P2P peer discovery
--parents-cache-ttl duration time to live for parent hash cache entries (0 for no expiration) (default 5m0s)
--port int TCP network listening port (default 30303)
--pprof run pprof server
--pprof-port uint port pprof runs on (default 6060)
Expand Down
27 changes: 19 additions & 8 deletions p2p/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"time"
)

// CacheOptions contains configuration for LRU caches with TTL.
type CacheOptions struct {
MaxSize int
TTL time.Duration
}

// Cache is a thread-safe LRU cache with optional TTL-based expiration.
type Cache[K comparable, V any] struct {
mu sync.RWMutex
Expand All @@ -21,13 +27,13 @@ type entry[K comparable, V any] struct {
expiresAt time.Time
}

// NewCache creates a new cache with the given max size and optional TTL.
// If maxSize <= 0, the cache has no size limit.
// If ttl is 0, entries never expire based on time.
func NewCache[K comparable, V any](maxSize int, ttl time.Duration) *Cache[K, V] {
// NewCache creates a new cache with the given options.
// If opts.MaxSize <= 0, the cache has no size limit.
// If opts.TTL is 0, entries never expire based on time.
func NewCache[K comparable, V any](opts CacheOptions) *Cache[K, V] {
return &Cache[K, V]{
maxSize: maxSize,
ttl: ttl,
maxSize: opts.MaxSize,
ttl: opts.TTL,
items: make(map[K]*list.Element),
list: list.New(),
}
Expand Down Expand Up @@ -187,15 +193,20 @@ func (c *Cache[K, V]) Contains(key K) bool {
return true
}

// Remove removes a key from the cache.
func (c *Cache[K, V]) Remove(key K) {
// Remove removes a key from the cache and returns the value if it existed.
func (c *Cache[K, V]) Remove(key K) (V, bool) {
c.mu.Lock()
defer c.mu.Unlock()

if elem, ok := c.items[key]; ok {
e := elem.Value.(*entry[K, V])
c.list.Remove(elem)
delete(c.items, key)
return e.value, true
}

var zero V
return zero, false
}

// Len returns the number of items in the cache.
Expand Down
7 changes: 2 additions & 5 deletions p2p/conns.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@ type Conns struct {

// ConnsOptions contains configuration options for creating a new Conns manager.
type ConnsOptions struct {
// MaxBlocks is the maximum number of blocks to track in the cache.
MaxBlocks int
// BlocksCacheTTL is the time to live for block cache entries.
BlocksCacheTTL time.Duration
BlocksCache CacheOptions
}

// NewConns creates a new connection manager with a blocks cache.
func NewConns(opts ConnsOptions) *Conns {
return &Conns{
conns: make(map[string]*conn),
blocks: NewCache[common.Hash, BlockCache](opts.MaxBlocks, opts.BlocksCacheTTL),
blocks: NewCache[common.Hash, BlockCache](opts.BlocksCache),
}
}

Expand Down
5 changes: 3 additions & 2 deletions p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type Database interface {
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time)

// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
// returns true.
WriteBlockHeaders(context.Context, []*types.Header, time.Time)
// returns true. The isParent parameter indicates if these headers were
// fetched as parent blocks.
WriteBlockHeaders(context.Context, []*types.Header, time.Time, bool)

// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
// returns true.
Expand Down
23 changes: 13 additions & 10 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type DatastoreHeader struct {
BaseFee string
TimeFirstSeen time.Time
TTL time.Time
IsParent bool
}

// DatastoreBlock represents a block stored in datastore.
Expand Down Expand Up @@ -184,15 +185,16 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
// WriteBlockHeaders will write the block headers to datastore. It will not
// write block events because headers will only be sent to the sensor when
// requested. The block events will be written when the hash is received
// instead.
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
// instead. The isParent parameter indicates if these headers were fetched
// as parent blocks.
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
if d.client == nil || !d.ShouldWriteBlocks() {
return
}

for _, h := range headers {
d.runAsync(func() {
d.writeBlockHeader(ctx, h, tfs)
d.writeBlockHeader(ctx, h, tfs, isParent)
})
}
}
Expand Down Expand Up @@ -314,7 +316,7 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {

// newDatastoreHeader creates a DatastoreHeader from a types.Header. Some
// values are converted into strings to prevent a loss of precision.
func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *DatastoreHeader {
func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isParent bool) *DatastoreHeader {
return &DatastoreHeader{
ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil),
UncleHash: header.UncleHash.Hex(),
Expand All @@ -334,6 +336,7 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *Dat
BaseFee: header.BaseFee.String(),
TimeFirstSeen: tfs,
TTL: tfs.Add(d.ttl),
IsParent: isParent,
}
}

Expand Down Expand Up @@ -390,7 +393,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.

if dsBlock.DatastoreHeader == nil {
shouldWrite = true
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs)
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs, false)
}

if len(dsBlock.TotalDifficulty) == 0 {
Expand All @@ -414,7 +417,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
shouldWrite = true
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
for _, uncle := range block.Uncles() {
d.writeBlockHeader(ctx, uncle, tfs)
d.writeBlockHeader(ctx, uncle, tfs, false)
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}
Expand Down Expand Up @@ -475,8 +478,8 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind
}

// writeBlockHeader will write the block header to datastore if it doesn't
// exist.
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time) {
// exist. The isParent parameter indicates if this block was fetched as a parent block.
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time, isParent bool) {
key := datastore.NameKey(BlocksKind, header.Hash().Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand All @@ -485,7 +488,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
return nil
}

block.DatastoreHeader = d.newDatastoreHeader(header, tfs)
block.DatastoreHeader = d.newDatastoreHeader(header, tfs, isParent)
_, err := tx.Put(key, &block)
return err
}, datastore.MaxAttempts(MaxAttempts))
Expand Down Expand Up @@ -522,7 +525,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
shouldWrite = true
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
for _, uncle := range body.Uncles {
d.writeBlockHeader(ctx, uncle, tfs)
d.writeBlockHeader(ctx, uncle, tfs, false)
block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}
Expand Down
5 changes: 4 additions & 1 deletion p2p/database/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type JSONBlock struct {
TxCount int `json:"tx_count"`
UncleCount int `json:"uncle_count"`
TimeFirstSeen time.Time `json:"time_first_seen"`
IsParent bool `json:"is_parent"`
}

// JSONBlockEvent represents a block event in JSON format.
Expand Down Expand Up @@ -179,7 +180,8 @@ func (j *JSONDatabase) writeBlock(block *types.Block, td *big.Int, tfs time.Time
}

// WriteBlockHeaders writes the block headers as JSON.
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
// The isParent parameter indicates if these headers were fetched as parent blocks.
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
if !j.ShouldWriteBlocks() {
return
}
Expand All @@ -196,6 +198,7 @@ func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.H
GasUsed: header.GasUsed,
Difficulty: header.Difficulty.String(),
TimeFirstSeen: tfs,
IsParent: isParent,
}

if header.BaseFee != nil {
Expand Down
2 changes: 1 addition & 1 deletion p2p/database/nodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (n *nodb) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Bl
}

// WriteBlockHeaders does nothing.
func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
}

// WriteBlockHashes does nothing.
Expand Down
35 changes: 26 additions & 9 deletions p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type conn struct {
requests *Cache[uint64, common.Hash]
requestNum uint64

// parents tracks hashes of blocks requested as parents to mark them
// with IsParent=true when writing to the database.
parents *Cache[common.Hash, struct{}]

// conns provides access to the global connection manager, which includes
// the blocks cache shared across all peers.
conns *Conns
Expand Down Expand Up @@ -80,9 +84,9 @@ type EthProtocolOptions struct {
Head *HeadBlock
HeadMutex *sync.RWMutex

// Requests cache configuration
MaxRequests int
RequestsCacheTTL time.Duration
// Cache configurations
RequestsCache CacheOptions
ParentsCache CacheOptions
}

// HeadBlock contains the necessary head block data for the status message.
Expand All @@ -108,8 +112,9 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
logger: log.With().Str("peer", peerURL).Logger(),
rw: rw,
db: opts.Database,
requests: NewCache[uint64, common.Hash](opts.MaxRequests, opts.RequestsCacheTTL),
requests: NewCache[uint64, common.Hash](opts.RequestsCache),
requestNum: 0,
parents: NewCache[common.Hash, struct{}](opts.ParentsCache),
head: opts.Head,
headMutex: opts.HeadMutex,
counter: opts.MsgCounter,
Expand Down Expand Up @@ -277,8 +282,9 @@ func (c *conn) readStatus(packet *eth.StatusPacket) error {

// getBlockData will send GetBlockHeaders and/or GetBlockBodies requests to the
// peer based on what parts of the block we already have. It will return an error
// if sending either of the requests failed.
func (c *conn) getBlockData(hash common.Hash, cache BlockCache) error {
// if sending either of the requests failed. The isParent parameter indicates if
// this block is being fetched as a parent block.
func (c *conn) getBlockData(hash common.Hash, cache BlockCache, isParent bool) error {
// Only request header if we don't have it
if cache.Header == nil {
headersRequest := &GetBlockHeaders{
Expand All @@ -290,6 +296,10 @@ func (c *conn) getBlockData(hash common.Hash, cache BlockCache) error {
},
}

if isParent {
c.parents.Add(hash, struct{}{})
}

c.countMsgSent(headersRequest.Name(), 1)
if err := ethp2p.Send(c.rw, eth.GetBlockHeadersMsg, headersRequest); err != nil {
return err
Expand Down Expand Up @@ -343,7 +353,7 @@ func (c *conn) getParentBlock(ctx context.Context, header *types.Header) error {
Str("number", new(big.Int).Sub(header.Number, big.NewInt(1)).String()).
Msg("Fetching missing parent block")

return c.getBlockData(header.ParentHash, cache)
return c.getBlockData(header.ParentHash, cache, true)
}

func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
Expand All @@ -369,7 +379,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
}

// Request only the parts we don't have
if err := c.getBlockData(hash, cache); err != nil {
if err := c.getBlockData(hash, cache, false); err != nil {
return err
}

Expand Down Expand Up @@ -422,6 +432,10 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
tfs := time.Now()

headers := packet.BlockHeadersRequest
if len(headers) == 0 {
return nil
}

c.countMsgReceived(packet.Name(), float64(len(headers)))

for _, header := range headers {
Expand All @@ -430,7 +444,10 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
}
}

c.db.WriteBlockHeaders(ctx, headers, tfs)
// Check if any of these headers were requested as parent blocks
_, isParent := c.parents.Remove(headers[0].Hash())

c.db.WriteBlockHeaders(ctx, headers, tfs, isParent)

// Update cache to store headers
for _, header := range headers {
Expand Down
Loading