Skip to content

Commit

Permalink
op-node: Add a histogram to report current peer scores
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton committed Jun 2, 2023
1 parent 5a35df9 commit 6e91eb8
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 90 deletions.
44 changes: 44 additions & 0 deletions op-node/metrics/histogram_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package metrics

import (
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
)

type ReplaceableHistogramVec struct {
current *atomic.Value
opts prometheus.HistogramOpts
variableLabels []string
}

func NewReplaceableHistogramVec(registry *prometheus.Registry, opts prometheus.HistogramOpts, variableLabels []string) *ReplaceableHistogramVec {
metric := &ReplaceableHistogramVec{
current: &atomic.Value{},
opts: opts,
variableLabels: variableLabels,
}
metric.current.Store(prometheus.NewHistogramVec(opts, variableLabels))
registry.MustRegister(metric)
return metric
}

func (c *ReplaceableHistogramVec) Replace(updater func(h *prometheus.HistogramVec)) {
h := prometheus.NewHistogramVec(c.opts, c.variableLabels)
updater(h)
c.current.Store(h)
}

func (c *ReplaceableHistogramVec) Describe(ch chan<- *prometheus.Desc) {
collector, ok := c.current.Load().(prometheus.Collector)
if ok {
collector.Describe(ch)
}
}

func (c *ReplaceableHistogramVec) Collect(ch chan<- prometheus.Metric) {
collector, ok := c.current.Load().(prometheus.Collector)
if ok {
collector.Collect(ch)
}
}
44 changes: 32 additions & 12 deletions op-node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

ophttp "github.com/ethereum-optimism/optimism/op-node/http"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/metrics"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
Expand Down Expand Up @@ -66,7 +67,7 @@ type Metricer interface {
Document() []metrics.DocumentedMetric
RecordChannelInputBytes(num int)
// P2P Metrics
SetPeerScores(scores map[string]float64)
SetPeerScores(scores map[string]float64, allScores []store.PeerScores)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int)
Expand Down Expand Up @@ -132,15 +133,16 @@ type Metrics struct {
TransactionsSequencedTotal prometheus.Counter

// P2P Metrics
PeerCount prometheus.Gauge
StreamCount prometheus.Gauge
PeerScores *prometheus.GaugeVec
GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec
PeerUnbans prometheus.Counter
IPUnbans prometheus.Counter
Dials *prometheus.CounterVec
Accepts *prometheus.CounterVec
PeerCount prometheus.Gauge
StreamCount prometheus.Gauge
PeerScores *prometheus.GaugeVec
GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec
PeerUnbans prometheus.Counter
IPUnbans prometheus.Counter
Dials *prometheus.CounterVec
Accepts *prometheus.CounterVec
PeerScoresHistogram *ReplaceableHistogramVec

ChannelInputBytes prometheus.Counter

Expand All @@ -161,6 +163,7 @@ func NewMetrics(procName string) *Metrics {
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector())
factory := metrics.With(registry)

return &Metrics{
Info: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Expand Down Expand Up @@ -321,6 +324,12 @@ func NewMetrics(procName string) *Metrics {
}, []string{
"band",
}),
PeerScoresHistogram: NewReplaceableHistogramVec(registry, prometheus.HistogramOpts{
Namespace: ns,
Name: "peer_scores_histogram",
Help: "Histogram of currrently connected peer scores",
Buckets: []float64{-100, -40, -20, -10, -5, -2, -1, -0.5, -0.05, 0, 0.05, 0.5, 1, 2, 5, 10, 20, 40},
}, []string{"type"}),
StreamCount: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Expand Down Expand Up @@ -452,7 +461,18 @@ func NewMetrics(procName string) *Metrics {

// SetPeerScores updates the peer score [prometheus.GaugeVec].
// This takes a map of labels to scores.
func (m *Metrics) SetPeerScores(scores map[string]float64) {
func (m *Metrics) SetPeerScores(scores map[string]float64, allScores []store.PeerScores) {
m.PeerScoresHistogram.Replace(func(h *prometheus.HistogramVec) {
for _, scores := range allScores {
h.WithLabelValues("total").Observe(scores.Gossip.Total)
h.WithLabelValues("ipColocation").Observe(scores.Gossip.IPColocationFactor)
h.WithLabelValues("behavioralPenalty").Observe(scores.Gossip.BehavioralPenalty)
h.WithLabelValues("blocksFirstMessage").Observe(scores.Gossip.Blocks.FirstMessageDeliveries)
h.WithLabelValues("blocksTimeInMesh").Observe(scores.Gossip.Blocks.TimeInMesh)
h.WithLabelValues("blocksMessageDeliveries").Observe(scores.Gossip.Blocks.MeshMessageDeliveries)
h.WithLabelValues("blocksInvalidMessageDeliveries").Observe(scores.Gossip.Blocks.InvalidMessageDeliveries)
}
})
for label, score := range scores {
m.PeerScores.WithLabelValues(label).Set(score)
}
Expand Down Expand Up @@ -785,7 +805,7 @@ func (n *noopMetricer) RecordSequencerReset() {
func (n *noopMetricer) RecordGossipEvent(evType int32) {
}

func (n *noopMetricer) SetPeerScores(scores map[string]float64) {
func (n *noopMetricer) SetPeerScores(scores map[string]float64, allScores []store.PeerScores) {
}

func (n *noopMetricer) IncPeerCount() {
Expand Down
7 changes: 1 addition & 6 deletions op-node/p2p/mocks/GossipMetricer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 15 additions & 5 deletions op-node/p2p/mocks/Peerstore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions op-node/p2p/mocks/ScoreMetrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions op-node/p2p/peer_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Peerstore interface {
// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice

SetScore(id peer.ID, diff store.ScoreDiff) error
SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error)
}

// Scorer is a peer scorer that scores peers based on application-specific metrics.
Expand All @@ -106,8 +106,9 @@ type Scorer interface {
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
}

//go:generate mockery --name ScoreMetrics --output mocks/
type ScoreMetrics interface {
SetPeerScores(map[string]float64)
SetPeerScores(map[string]float64, []store.PeerScores)
}

// NewScorer returns a new peer scorer.
Expand All @@ -129,6 +130,7 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
blocksTopicName := blocksTopicV1(s.cfg)
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
scoreMap := make(map[string]float64)
allScores := make([]store.PeerScores, len(m))
// Zero out all bands.
for _, b := range s.bandScoreThresholds.bands {
scoreMap[b.band] = 0
Expand All @@ -147,15 +149,16 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
diff.Blocks.FirstMessageDeliveries = topSnap.FirstMessageDeliveries
diff.Blocks.InvalidMessageDeliveries = topSnap.InvalidMessageDeliveries
}
if err := s.peerStore.SetScore(id, &diff); err != nil {
if peerScores, err := s.peerStore.SetScore(id, &diff); err != nil {
s.log.Warn("Unable to update peer gossip score", "err", err)
allScores = append(allScores, peerScores)
}
}
for _, snap := range m {
band := s.bandScoreThresholds.Bucket(snap.Score)
scoreMap[band] += 1
}
s.metricer.SetPeerScores(scoreMap)
s.metricer.SetPeerScores(scoreMap, allScores)
}
}

Expand Down
43 changes: 7 additions & 36 deletions op-node/p2p/peer_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ type PeerScorerTestSuite struct {
suite.Suite

mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
mockMetricer *p2pMocks.ScoreMetrics
bandScorer *p2p.BandScoreThresholds
logger log.Logger
}

// SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
testSuite.mockMetricer = &p2pMocks.ScoreMetrics{}
bandScorer, err := p2p.NewBandScorer("-40:graylist;0:friend;")
testSuite.NoError(err)
testSuite.bandScorer = bandScorer
Expand Down Expand Up @@ -77,14 +77,15 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
)
inspectFn := scorer.SnapshotHook()

scores := store.PeerScores{}
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(nil).Once()
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(scores, nil).Once()

// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 0,
"graylist": 1,
}).Return(nil).Once()
}, []store.PeerScores{scores}).Return(nil).Once()

// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
Expand All @@ -95,13 +96,13 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
inspectFn(snapshotMap)

// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(nil).Once()
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(scores, nil).Once()

// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 1,
"graylist": 0,
}).Return(nil).Once()
}, []store.PeerScores{scores}).Return(nil).Once()

// Apply the snapshot
snapshotMap = map[peer.ID]*pubsub.PeerScoreSnapshot{
Expand All @@ -111,33 +112,3 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
}
inspectFn(snapshotMap)
}

// TestScorer_SnapshotHookBlocksPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold.
// This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
scorer := p2p.NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()

// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-101)}).Return(nil).Once()

// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 0,
"graylist": 1,
}).Return(nil)

// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): {
Score: -101,
},
}
inspectFn(snapshotMap)
}
6 changes: 3 additions & 3 deletions op-node/p2p/peer_scores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ type PeerScoresTestSuite struct {
suite.Suite

mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
mockMetricer *p2pMocks.ScoreMetrics
bandScorer BandScoreThresholds
logger log.Logger
}

// SetupTest sets up the test suite.
func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
testSuite.mockMetricer = &p2pMocks.ScoreMetrics{}
bandScorer, err := NewBandScorer("0:graylist;")
testSuite.NoError(err)
testSuite.bandScorer = *bandScorer
Expand Down Expand Up @@ -157,7 +157,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testSuite.mockMetricer.On("SetPeerScores", mock.Anything).Return(nil)
testSuite.mockMetricer.On("SetPeerScores", mock.Anything, mock.Anything).Return(nil)

// Construct 20 hosts using the [getNetHosts] function.
hosts := getNetHosts(testSuite, ctx, 20)
Expand Down
2 changes: 1 addition & 1 deletion op-node/p2p/store/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ScoreDatastore interface {
GetPeerScores(id peer.ID) (PeerScores, error)

// SetScore applies the given store diff to the specified peer
SetScore(id peer.ID, diff ScoreDiff) error
SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error)
}

// ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer.
Expand Down
Loading

0 comments on commit 6e91eb8

Please sign in to comment.