Skip to content

Commit

Permalink
refactored sentinel gossip and only connect to nimbus now (erigontech…
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored and calmbeing committed Apr 24, 2023
1 parent f72f760 commit 1be6e49
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 248 deletions.
29 changes: 29 additions & 0 deletions cl/fork/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,35 @@ func ComputeForkDigest(
return ComputeForkDigestForVersion(currentForkVersion, genesisConfig.GenesisValidatorRoot)
}

func ComputeNextForkDigest(
beaconConfig *clparams.BeaconChainConfig,
genesisConfig *clparams.GenesisConfig,
) ([4]byte, error) {
if genesisConfig.GenesisTime == 0 {
return [4]byte{}, errors.New("genesis time is not set")
}
if genesisConfig.GenesisValidatorRoot == (libcommon.Hash{}) {
return [4]byte{}, errors.New("genesis validators root is not set")
}

currentEpoch := utils.GetCurrentEpoch(genesisConfig.GenesisTime, beaconConfig.SecondsPerSlot, beaconConfig.SlotsPerEpoch)
// Retrieve next fork version.
nextForkIndex := 0
forkList := forkList(beaconConfig.ForkVersionSchedule)
for _, fork := range forkList {
if currentEpoch >= fork.epoch {
nextForkIndex++
continue
}
break
}
if nextForkIndex != len(forkList)-1 {
nextForkIndex++
}

return ComputeForkDigestForVersion(forkList[nextForkIndex].version, genesisConfig.GenesisValidatorRoot)
}

type fork struct {
epoch uint64
version [4]byte
Expand Down
147 changes: 24 additions & 123 deletions cmd/sentinel/sentinel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,14 @@ package sentinel
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/ledgerwatch/erigon/cl/fork"
"github.com/ledgerwatch/log/v3"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

const (
// overlay parameters
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark
gossipSubDhi = 12 // topic stable mesh high watermark

// gossip parameters
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about
gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs

// fanout ttl
gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds

// heartbeat interval
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds

// misc
rSubD = 8 // random gossip target
)

// Specifies the prefix for any pubsub topic.
const gossipTopicPrefix = "/eth2/"
const blockSubnetTopicFormat = "/eth2/%x/beacon_block"
const SSZSnappyCodec = "ssz_snappy"

type TopicName string
Expand Down Expand Up @@ -119,42 +93,12 @@ func (s *GossipManager) Recv() <-chan *pubsub.Message {
return s.ch
}

// closes a specific topic
func (s *GossipManager) CloseTopic(topic string) {
s.mu.Lock()
defer s.mu.Unlock()
if val, ok := s.subscriptions[topic]; ok {
val.Close()
delete(s.subscriptions, topic)
}
}

// reset'em
func (s *GossipManager) Reset() {
s.mu.Lock()
defer s.mu.Unlock()
for _, val := range s.subscriptions {
val.Close() // Close all.
}
s.subscriptions = map[string]*GossipSubscription{}
}

// get a specific topic
func (s *GossipManager) GetSubscription(topic string) (*GossipSubscription, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if val, ok := s.subscriptions[topic]; ok {
return val, true
}
return nil, false
}

func (s *GossipManager) GetMatchingSubscription(match string) *GossipSubscription {
s.mu.Lock()
defer s.mu.Unlock()
var sub *GossipSubscription
for topic, currSub := range s.subscriptions {
if strings.Contains(topic, string(BeaconBlockTopic)) {
if strings.Contains(topic, match) {
sub = currSub
}
}
Expand All @@ -167,79 +111,36 @@ func (s *GossipManager) AddSubscription(topic string, sub *GossipSubscription) {
s.subscriptions[topic] = sub
}

// starts listening to a specific topic (forwarding its messages to the gossip manager channel)
func (s *GossipManager) ListenTopic(topic string) error {
s.mu.Lock()
defer s.mu.Unlock()
if val, ok := s.subscriptions[topic]; ok {
return val.Listen()
}
return nil
}

// closes the gossip manager
func (s *GossipManager) Close() {
s.mu.Lock()
defer s.mu.Unlock()
for _, val := range s.subscriptions {
val.Close()
}
close(s.ch)
}

func (s *GossipManager) String() string {
s.mu.Lock()
defer s.mu.Unlock()
sb := strings.Builder{}
sb.Grow(len(s.subscriptions) * 4)

for _, v := range s.subscriptions {
sb.Write([]byte(v.topic.String()))
sb.WriteString("=")
sb.WriteString(strconv.Itoa(len(v.topic.ListPeers())))
sb.WriteString(" ")
}
return sb.String()
}

func (s *Sentinel) RestartTopics() {
// Reset all topics
s.subManager.Reset()
for _, topic := range s.gossipTopics {
s.SubscribeGossip(topic)
}
}

func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) {
sub = &GossipSubscription{
gossip_topic: topic,
ch: s.subManager.ch,
host: s.host.ID(),
ctx: s.ctx,
}
path := s.getTopic(topic)
sub.topic, err = s.pubsub.Join(path, opts...)
if err != nil {
return nil, fmt.Errorf("failed to join topic %s, err=%w", path, err)
}
s.subManager.AddSubscription(path, sub)
for _, t := range s.gossipTopics {
if t.CodecStr == topic.CodecStr {
return sub, nil
paths := s.getTopics(topic)
for _, path := range paths {
sub = &GossipSubscription{
gossip_topic: topic,
ch: s.subManager.ch,
host: s.host.ID(),
ctx: s.ctx,
}
sub.topic, err = s.pubsub.Join(path, opts...)
if err != nil {
return nil, fmt.Errorf("failed to join topic %s, err=%w", path, err)
}
s.subManager.AddSubscription(path, sub)
}
s.gossipTopics = append(s.gossipTopics, topic)
return sub, nil
}

func (s *Sentinel) LogTopicPeers() {
log.Info("[Gossip] Network Update", "topic peers", s.subManager.String())
return sub, nil
}

func (s *Sentinel) getTopic(topic GossipTopic) string {
o, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
func (s *Sentinel) getTopics(topic GossipTopic) []string {
digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
}
nextDigest, err := fork.ComputeNextForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
}
return fmt.Sprintf("/eth2/%x/%s/%s", o, topic.Name, topic.CodecStr)
return []string{
fmt.Sprintf("/eth2/%x/%s/%s", nextDigest, topic.Name, topic.CodecStr),
fmt.Sprintf("/eth2/%x/%s/%s", digest, topic.Name, topic.CodecStr),
}
}
84 changes: 0 additions & 84 deletions cmd/sentinel/sentinel/pubsub_test.go

This file was deleted.

8 changes: 1 addition & 7 deletions cmd/sentinel/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/fork"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/handlers"
Expand Down Expand Up @@ -53,7 +52,6 @@ type Sentinel struct {
discoverConfig discover.Config
pubsub *pubsub.PubSub
subManager *GossipManager
gossipTopics []GossipTopic
}

func (s *Sentinel) createLocalNode(
Expand Down Expand Up @@ -221,10 +219,6 @@ func New(
return s, nil
}

func (s *Sentinel) ChainConfigs() (clparams.BeaconChainConfig, clparams.GenesisConfig) {
return *s.cfg.BeaconConfig, *s.cfg.GenesisConfig
}

func (s *Sentinel) RecvGossip() <-chan *pubsub.Message {
return s.subManager.Recv()
}
Expand Down Expand Up @@ -262,7 +256,7 @@ func (s *Sentinel) HasTooManyPeers() bool {
}

func (s *Sentinel) GetPeersCount() int {
sub := s.subManager.GetMatchingSubscription(string(LightClientFinalityUpdateTopic))
sub := s.subManager.GetMatchingSubscription(string(LightClientOptimisticUpdateTopic))

if sub == nil {
return len(s.host.Network().Peers())
Expand Down
2 changes: 1 addition & 1 deletion cmd/sentinel/sentinel/service/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
maxSubscribers = 100 // only 100 lightclients per sentinel
maxSubscribers = 100 // only 100 clients per sentinel
)

type gossipObject struct {
Expand Down
Loading

0 comments on commit 1be6e49

Please sign in to comment.