Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[network/p2p] Redesign Push Gossip #2772

Merged
merged 60 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
1714e84
remove accumulator from handler
patrick-ogrady Feb 25, 2024
9c07a02
add more comments
patrick-ogrady Feb 25, 2024
80fb2b4
add Has to set
patrick-ogrady Feb 25, 2024
dee16f9
add placeholder for has use
patrick-ogrady Feb 25, 2024
e7f25bf
spike on gossip push
patrick-ogrady Feb 25, 2024
646cadc
add TODO about memory use
patrick-ogrady Feb 25, 2024
f10ed76
add logic to handle discarded tx churn
patrick-ogrady Feb 25, 2024
5bb3d2e
revert discarded cache
patrick-ogrady Feb 25, 2024
0d48439
periodically prune gossipables
patrick-ogrady Feb 25, 2024
31de4f6
add discarded cache
patrick-ogrady Feb 25, 2024
8201106
add arg for regossip frequency
patrick-ogrady Feb 25, 2024
cd7aaf6
add a TODO
patrick-ogrady Feb 25, 2024
1955a67
add metric for tracking gossipables
patrick-ogrady Feb 25, 2024
4d75df0
nit
patrick-ogrady Feb 25, 2024
ff965db
add TODO for x/p
patrick-ogrady Feb 25, 2024
dc69586
prune in add
patrick-ogrady Feb 26, 2024
2c19996
remove unnecessary newline
patrick-ogrady Feb 26, 2024
7277f63
add more comments
patrick-ogrady Feb 26, 2024
cfd1a3e
pop first
patrick-ogrady Feb 26, 2024
dc2c18e
add more TODOs
patrick-ogrady Feb 26, 2024
09abd32
ensure we track size iteration separately
patrick-ogrady Feb 26, 2024
dae722e
remove prune
patrick-ogrady Feb 26, 2024
ee0356d
add log to gossip
patrick-ogrady Feb 26, 2024
55c65fe
Gossip cleanup suggestions (#2773)
patrick-ogrady Feb 26, 2024
a7e8bf7
cleanup some compilation issues
StephenButtolph Feb 27, 2024
06971d7
comment nits
StephenButtolph Feb 27, 2024
0ef1432
comment nits
StephenButtolph Feb 27, 2024
cd91a1d
nit remove useless continue
StephenButtolph Feb 27, 2024
bcfe2cf
implement Has in AVM
StephenButtolph Feb 27, 2024
9e40b64
push gossip with partial sync
StephenButtolph Feb 27, 2024
a3b824f
remove early gossip
StephenButtolph Feb 27, 2024
5031a6e
nit
StephenButtolph Feb 27, 2024
927cff2
nit move time logic out of the loop
StephenButtolph Feb 27, 2024
35fc54b
fix comment
StephenButtolph Feb 27, 2024
6cbd5b4
finalize P-chain push gossip
StephenButtolph Feb 27, 2024
e1da49f
nit default
StephenButtolph Feb 27, 2024
b616714
nit
StephenButtolph Feb 27, 2024
92fdc35
add comment
StephenButtolph Feb 27, 2024
c9620b9
rename method
StephenButtolph Feb 27, 2024
6ce907b
implement avm
StephenButtolph Feb 27, 2024
b603ee6
move param validation
joshua-kim Feb 27, 2024
ad847dc
nit
joshua-kim Feb 27, 2024
fe14e10
nit
joshua-kim Feb 27, 2024
5de194a
nit
joshua-kim Feb 27, 2024
b4d4d1d
nit
joshua-kim Feb 27, 2024
2b74cfd
nit
joshua-kim Feb 27, 2024
28f1664
remove network interface
joshua-kim Feb 27, 2024
876fc3f
fix typo
joshua-kim Feb 27, 2024
795f6d5
nit
joshua-kim Feb 27, 2024
e76f252
duplicate line
joshua-kim Feb 27, 2024
6d3dfaf
nits
StephenButtolph Feb 27, 2024
4450567
Add test for empty gossip
StephenButtolph Feb 27, 2024
63269d7
gomod
joshua-kim Feb 27, 2024
f2f0e11
lint
joshua-kim Feb 27, 2024
7257cc4
Update coreth to v0.13.1-rc.0
StephenButtolph Feb 28, 2024
5b04b7a
register tracking
patrick-ogrady Feb 28, 2024
648915c
Register metric
StephenButtolph Feb 28, 2024
ed41bef
Merge branch 'gossip-cleanup' of github.com:ava-labs/avalanchego into…
StephenButtolph Feb 28, 2024
1cbea42
Make metrics more fine-grained (#2778)
StephenButtolph Feb 28, 2024
18faf45
update coreth
StephenButtolph Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 184 additions & 61 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/buffer"
"github.com/ava-labs/avalanchego/utils/logging"
)
Expand All @@ -24,17 +26,17 @@ const (
typeLabel = "type"
pushType = "push"
pullType = "pull"

defaultGossipableCount = 64
)

var (
_ Gossiper = (*ValidatorGossiper)(nil)
_ Gossiper = (*PullGossiper[*testTx])(nil)
_ Gossiper = (*NoOpGossiper)(nil)
_ Gossiper = (*TestGossiper)(nil)

_ Accumulator[*testTx] = (*PushGossiper[*testTx])(nil)
_ Accumulator[*testTx] = (*NoOpAccumulator[*testTx])(nil)
_ Accumulator[*testTx] = (*TestAccumulator[*testTx])(nil)
_ Set[*testTx] = (*EmptySet[*testTx])(nil)
_ Set[*testTx] = (*FullSet[*testTx])(nil)

metricLabels = []string{typeLabel}
pushLabels = prometheus.Labels{
Expand All @@ -43,6 +45,11 @@ var (
pullLabels = prometheus.Labels{
typeLabel: pullType,
}

errEmptySetCantAdd = errors.New("empty set can not add")
ErrInvalidDiscardedSize = errors.New("discarded size cannot be negative")
ErrInvalidTargetGossipSize = errors.New("target gossip size cannot be negative")
ErrInvalidRegossipFrequency = errors.New("re-gossip frequency cannot be negative")
)

// Gossiper gossips Gossipables to other nodes
Expand All @@ -51,13 +58,6 @@ type Gossiper interface {
Gossip(ctx context.Context) error
}

// Accumulator allows a caller to accumulate gossipables to be gossiped
type Accumulator[T Gossipable] interface {
Gossiper
// Add queues gossipables to be gossiped
Add(gossipables ...T)
}

// ValidatorGossiper only calls [Gossip] if the given node is a validator
type ValidatorGossiper struct {
Gossiper
Expand All @@ -73,6 +73,7 @@ type Metrics struct {
sentBytes *prometheus.CounterVec
receivedCount *prometheus.CounterVec
receivedBytes *prometheus.CounterVec
tracking prometheus.Gauge
}

// NewMetrics returns a common set of metrics
Expand Down Expand Up @@ -101,6 +102,11 @@ func NewMetrics(
Name: "gossip_received_bytes",
Help: "amount of gossip received (bytes)",
}, metricLabels),
tracking: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "gossip_tracking",
Help: "number of gossipables being tracked",
}),
}
err := utils.Err(
metrics.Register(m.sentCount),
Expand Down Expand Up @@ -197,17 +203,17 @@ func (p *PullGossiper[_]) handleResponse(
continue
}

hash := gossipable.GossipID()
gossipID := gossipable.GossipID()
p.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
zap.Stringer("id", gossipID),
)
if err := p.set.Add(gossipable); err != nil {
p.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
zap.Stringer("id", gossipID),
zap.Error(err),
)
continue
Expand All @@ -231,84 +237,190 @@ func (p *PullGossiper[_]) handleResponse(
}

// NewPushGossiper returns an instance of PushGossiper
func NewPushGossiper[T Gossipable](marshaller Marshaller[T], client *p2p.Client, metrics Metrics, targetGossipSize int) *PushGossiper[T] {
return &PushGossiper[T]{
marshaller: marshaller,
client: client,
metrics: metrics,
targetGossipSize: targetGossipSize,
pending: buffer.NewUnboundedDeque[T](0),
func NewPushGossiper[T Gossipable](
marshaller Marshaller[T],
mempool Set[T],
client *p2p.Client,
metrics Metrics,
discardedSize int,
targetGossipSize int,
maxRegossipFrequency time.Duration,
) (*PushGossiper[T], error) {
if discardedSize < 0 {
return nil, ErrInvalidDiscardedSize
}

if targetGossipSize < 0 {
return nil, ErrInvalidTargetGossipSize
}

if maxRegossipFrequency < 0 {
return nil, ErrInvalidRegossipFrequency
}

return &PushGossiper[T]{
marshaller: marshaller,
set: mempool,
client: client,
metrics: metrics,
targetGossipSize: targetGossipSize,
maxRegossipFrequency: maxRegossipFrequency,

tracking: make(map[ids.ID]time.Time),
pending: buffer.NewUnboundedDeque[T](0),
issued: buffer.NewUnboundedDeque[T](0),
discarded: &cache.LRU[ids.ID, interface{}]{Size: discardedSize},
}, nil
}

// PushGossiper broadcasts gossip to peers randomly in the network
type PushGossiper[T Gossipable] struct {
marshaller Marshaller[T]
client *p2p.Client
metrics Metrics
targetGossipSize int
marshaller Marshaller[T]
set Set[T]
client *p2p.Client
metrics Metrics

targetGossipSize int
maxRegossipFrequency time.Duration

lock sync.Mutex
pending buffer.Deque[T]
lock sync.Mutex
tracking map[ids.ID]time.Time
pending buffer.Deque[T]
issued buffer.Deque[T]
discarded *cache.LRU[ids.ID, interface{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
}

// Gossip flushes any queued gossipables
// Gossip flushes any queued gossipables.
func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()

if p.pending.Len() == 0 {
if len(p.tracking) == 0 {
return nil
}

sentBytes := 0
gossip := make([][]byte, 0, p.pending.Len())
var (
sentBytes = 0
gossip = make([][]byte, 0, defaultGossipableCount)
now = time.Now()
)

// Iterate over all pending gossipables (never been sent before).
for sentBytes < p.targetGossipSize {
gossipable, ok := p.pending.PeekLeft()
gossipable, ok := p.pending.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
continue
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
// remove this item so we don't get stuck in a loop
_, _ = p.pending.PopLeft()
delete(p.tracking, gossipID)
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.pending.PopLeft()
p.issued.PushRight(gossipable)
p.tracking[gossipID] = now
}

maxLastGossipTimeToRegossip := now.Add(-p.maxRegossipFrequency)

// Iterate over all issued gossipables (have been sent before) to fill any
// remaining space in gossip batch.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.issued.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.discarded.Put(gossipID, nil) // only add to discarded if issued once
continue
}

// Ensure we don't attempt to send a gossipable too frequently.
lastGossipTime := p.tracking[gossipID]
if maxLastGossipTimeToRegossip.Before(lastGossipTime) {
// Put the gossipable on the front of the queue to keep items sorted
// by last issuance time.
p.issued.PushLeft(gossipable)
break
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
// Should never happen because we've already issued this once.
delete(p.tracking, gossipID)
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.issued.PushRight(gossipable)
p.tracking[gossipID] = now
}
p.metrics.tracking.Set(float64(len(p.tracking)))

// If there is nothing to gossip, we can exit early.
if len(gossip) == 0 {
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// Send gossipables to peers
msgBytes, err := MarshalAppGossip(gossip)
if err != nil {
return err
}

sentCountMetric, err := p.metrics.sentCount.GetMetricWith(pushLabels)
if err != nil {
return fmt.Errorf("failed to get sent count metric: %w", err)
}

sentBytesMetric, err := p.metrics.sentBytes.GetMetricWith(pushLabels)
if err != nil {
return fmt.Errorf("failed to get sent bytes metric: %w", err)
}

sentCountMetric.Add(float64(len(gossip)))
sentBytesMetric.Add(float64(sentBytes))

return p.client.AppGossip(ctx, msgBytes)
if err := p.client.AppGossip(ctx, msgBytes); err != nil {
return fmt.Errorf("failed to gossip: %w", err)
}
return nil
}

// Add enqueues new gossipables to be pushed. If a gossiable is already tracked,
// it is not added again.
func (p *PushGossiper[T]) Add(gossipables ...T) {
p.lock.Lock()
defer p.lock.Unlock()

// Add new gossipables to the pending queue.
now := time.Now()
for _, gossipable := range gossipables {
p.pending.PushRight(gossipable)
gossipID := gossipable.GossipID()
if _, ok := p.tracking[gossipID]; ok {
continue
}
if _, ok := p.discarded.Get(gossipID); ok {
// Pretend that recently discarded transactions were just gossiped.
p.tracking[gossipID] = now
p.issued.PushRight(gossipable)
} else {
p.tracking[gossipID] = time.Time{}
aaronbuchwald marked this conversation as resolved.
Show resolved Hide resolved
p.pending.PushRight(gossipable)
}
}
p.metrics.tracking.Set(float64(len(p.tracking)))
}

// Every calls [Gossip] every [frequency] amount of time.
Expand All @@ -335,39 +447,50 @@ func (NoOpGossiper) Gossip(context.Context) error {
return nil
}

type NoOpAccumulator[T Gossipable] struct{}
type TestGossiper struct {
GossipF func(ctx context.Context) error
}

func (NoOpAccumulator[_]) Gossip(context.Context) error {
func (t *TestGossiper) Gossip(ctx context.Context) error {
return t.GossipF(ctx)
}

type EmptySet[T Gossipable] struct{}

func (EmptySet[_]) Gossip(context.Context) error {
return nil
}

func (NoOpAccumulator[T]) Add(...T) {}
func (EmptySet[T]) Add(T) error {
return errEmptySetCantAdd
}

type TestGossiper struct {
GossipF func(ctx context.Context) error
func (EmptySet[T]) Has(ids.ID) bool {
return false
}

func (t *TestGossiper) Gossip(ctx context.Context) error {
return t.GossipF(ctx)
func (EmptySet[T]) Iterate(func(gossipable T) bool) {}

func (EmptySet[_]) GetFilter() ([]byte, []byte) {
return bloom.EmptyFilter.Marshal(), ids.Empty[:]
}

type TestAccumulator[T Gossipable] struct {
GossipF func(ctx context.Context) error
AddF func(...T)
type FullSet[T Gossipable] struct{}

func (FullSet[_]) Gossip(context.Context) error {
return nil
}

func (t TestAccumulator[T]) Gossip(ctx context.Context) error {
if t.GossipF == nil {
return nil
}
func (FullSet[T]) Add(T) error {
return nil
}

return t.GossipF(ctx)
func (FullSet[T]) Has(ids.ID) bool {
return true
}

func (t TestAccumulator[T]) Add(gossipables ...T) {
if t.AddF == nil {
return
}
func (FullSet[T]) Iterate(func(gossipable T) bool) {}

t.AddF(gossipables...)
func (FullSet[_]) GetFilter() ([]byte, []byte) {
return bloom.FullFilter.Marshal(), ids.Empty[:]
}
Loading
Loading