Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gossip package to p2p SDK #1958

Merged
merged 20 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
126 changes: 126 additions & 0 deletions network/p2p/gossip/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"crypto/rand"
"encoding/binary"
"hash"

bloomfilter "github.com/holiman/bloomfilter/v2"

"github.com/ava-labs/avalanchego/ids"
)

var _ hash.Hash64 = (*hasher)(nil)

// NewBloomFilter returns a new instance of a bloom filter with at most
// [maxExpectedElements] elements anticipated at any moment, and a false
// positive probability of [falsePositiveProbability].
func NewBloomFilter(
maxExpectedElements uint64,
falsePositiveProbability float64,
) (*BloomFilter, error) {
bloom, err := bloomfilter.NewOptimal(
maxExpectedElements,
falsePositiveProbability,
)
if err != nil {
return nil, err
}

salt, err := randomSalt()
return &BloomFilter{
Bloom: bloom,
Salt: salt,
}, err
}

type BloomFilter struct {
Bloom *bloomfilter.Filter
// Salt is provided to eventually unblock collisions in Bloom. It's possible
// that conflicting Gossipable items collide in the bloom filter, so a salt
// is generated to eventually resolve collisions.
Salt ids.ID
}

func (b *BloomFilter) Add(gossipable Gossipable) {
h := gossipable.GetID()
salted := &hasher{
hash: h[:],
salt: b.Salt,
}
b.Bloom.Add(salted)
}

func (b *BloomFilter) Has(gossipable Gossipable) bool {
h := gossipable.GetID()
salted := &hasher{
hash: h[:],
salt: b.Salt,
}
return b.Bloom.Contains(salted)
}

// ResetBloomFilterIfNeeded resets a bloom filter if it breaches a target false
// positive probability. Returns true if the bloom filter was reset.
func ResetBloomFilterIfNeeded(
bloomFilter *BloomFilter,
falsePositiveProbability float64,
) (bool, error) {
if bloomFilter.Bloom.FalsePosititveProbability() < falsePositiveProbability {
return false, nil
}

// it's not possible for this to error assuming that the original
// bloom filter's parameters were valid
bloomFilter.Bloom, _ = bloomfilter.New(bloomFilter.Bloom.M(), bloomFilter.Bloom.K())

salt, err := randomSalt()
bloomFilter.Salt = salt

return true, err
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
}

func randomSalt() (ids.ID, error) {
salt := ids.ID{}
_, err := rand.Read(salt[:])
return salt, err
}

type hasher struct {
hash []byte
salt ids.ID
}

func (h *hasher) Write(p []byte) (n int, err error) {
h.hash = append(h.hash, p...)
return len(p), nil
}

func (h *hasher) Sum(b []byte) []byte {
h.hash = append(h.hash, b...)
return h.hash
}

func (h *hasher) Reset() {
h.hash = ids.Empty[:]
}

func (*hasher) BlockSize() int {
return ids.IDLen
}

func (h *hasher) Sum64() uint64 {
salted := ids.ID{}
for i := 0; i < len(h.hash) && i < ids.IDLen; i++ {
salted[i] = h.hash[i] ^ h.salt[i]
}

return binary.BigEndian.Uint64(salted[:])
}

func (h *hasher) Size() int {
return len(h.hash)
}
68 changes: 68 additions & 0 deletions network/p2p/gossip/bloom_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"testing"

bloomfilter "github.com/holiman/bloomfilter/v2"

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
)

func TestBloomFilterRefresh(t *testing.T) {
tests := []struct {
name string
falsePositiveProbability float64
add []*testTx
expected []*testTx
}{
{
name: "no refresh",
falsePositiveProbability: 1,
add: []*testTx{
{id: ids.ID{0}},
},
expected: []*testTx{
{id: ids.ID{0}},
},
},
{
name: "refresh",
falsePositiveProbability: 0.1,
add: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
},
expected: []*testTx{
{id: ids.ID{1}},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
b, err := bloomfilter.New(10, 1)
require.NoError(err)
bloom := BloomFilter{
Bloom: b,
}

for _, item := range tt.add {
_, err = ResetBloomFilterIfNeeded(&bloom, tt.falsePositiveProbability)
require.NoError(err)
bloom.Add(item)
}

require.Equal(uint64(len(tt.expected)), bloom.Bloom.N())

for _, expected := range tt.expected {
require.True(bloom.Has(expected))
}
})
}
}
141 changes: 141 additions & 0 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"context"
"time"

"go.uber.org/zap"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/logging"
)

// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
type GossipableAny[T any] interface {
*T
Gossipable
}

type Config struct {
Frequency time.Duration
PollSize int
}

func NewGossiper[T any, U GossipableAny[T]](
config Config,
log logging.Logger,
set Set[U],
client *p2p.Client,
) *Gossiper[T, U] {
return &Gossiper[T, U]{
config: config,
log: log,
set: set,
client: client,
}
}

type Gossiper[T any, U GossipableAny[T]] struct {
config Config
log logging.Logger
set Set[U]
client *p2p.Client
}

func (g *Gossiper[_, _]) Gossip(ctx context.Context) {
gossipTicker := time.NewTicker(g.config.Frequency)
defer gossipTicker.Stop()

for {
select {
case <-gossipTicker.C:
if err := g.gossip(ctx); err != nil {
g.log.Warn("failed to gossip", zap.Error(err))
}
case <-ctx.Done():
g.log.Debug("shutting down gossip")
return
}
}
}

func (g *Gossiper[_, _]) gossip(ctx context.Context) error {
bloom, salt, err := g.set.GetFilter()
if err != nil {
return err
}

request := &sdk.PullGossipRequest{
Filter: bloom,
Salt: salt,
}
msgBytes, err := proto.Marshal(request)
if err != nil {
return err
}

for i := 0; i < g.config.PollSize; i++ {
if err := g.client.AppRequestAny(ctx, msgBytes, g.handleResponse); err != nil {
return err
}
}

return nil
}

func (g *Gossiper[T, U]) handleResponse(
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
if err != nil {
g.log.Debug(
"failed gossip request",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
)
return
}

response := &sdk.PullGossipResponse{}
if err := proto.Unmarshal(responseBytes, response); err != nil {
g.log.Debug("failed to unmarshal gossip response", zap.Error(err))
return
}

for _, bytes := range response.Gossip {
gossipable := U(new(T))
if err := gossipable.Unmarshal(bytes); err != nil {
g.log.Debug(
"failed to unmarshal gossip",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
)
continue
}

hash := gossipable.GetID()
g.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
)
if err := g.set.Add(gossipable); err != nil {
g.log.Debug(
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
zap.Error(err),
)
continue
}
}
}